AI网关设计:构建企业统一AI访问层
AI网关设计:构建企业统一AI访问层
10个系统,10份账单,没有人知道钱花哪了
赵磊是某中型互联网公司的架构师,工作了5年,今年开始负责AI应用的基础设施建设。
2024年底,他们公司开始大规模推进AI化。半年之后,赵磊收到了一张让他倒吸一口凉气的账单:上个月 AI API 费用 47 万人民币。
比费用更让他头疼的是:他根本不知道这 47 万花在哪了。
翻了一遍公司的代码库,发现有这么几个问题:
问题一:API Key 满天飞
公司有 10 个业务系统在使用 AI 能力。智能客服系统有一个 OpenAI 的 Key,商品推荐系统有另一个 Key,内容审核系统又有一个……每个团队自己申请、自己保管,有的 Key 写在配置文件里,有的直接提交到了 Git(!),有的 Key 已经不知道被谁在用。
问题二:成本无法追踪
47 万的账单是几个 Provider 账单的合计。但每个 Provider 的账单里,只有总 token 数,不知道哪个业务线花了多少,哪个用户花了多少。产品经理问"我们的 AI 功能上线后 ROI 是多少",完全无法回答。
问题三:重复的基础功能
限流、重试、超时、日志……每个系统都自己实现了一遍,代码质量参差不齐。其中一个系统把限流实现错了,在高峰期疯狂重试,导致账单翻倍。
问题四:Provider 切换困难
某 Provider 临时涨价,想切换到另一个 Provider。需要修改 10 个系统,分批测试,担惊受怕地发布。最后因为改动太大,放弃了。
赵磊意识到:他们需要一个 AI 网关。
AI 网关的核心价值
AI 网关是所有业务系统与 AI Provider 之间的统一访问层。
一个好的 AI 网关能带来:
| 能力 | 没有网关时 | 有网关后 |
|---|---|---|
| API Key 管理 | 分散在各系统 | 集中管理,最小权限 |
| 成本追踪 | 无法分业务统计 | 按业务线/用户实时统计 |
| 限流 | 各系统自行实现 | 统一策略,多维度控制 |
| Provider 切换 | 修改所有系统 | 网关配置一处修改 |
| 请求日志 | 分散在各系统 | 统一审计日志 |
| 语义缓存 | 无 | 相似请求命中缓存 |
技术架构选型
基于 Spring Cloud Gateway 扩展,构建 AI 感知的网关:
项目结构
ai-gateway/
├── src/main/java/com/company/gateway/
│ ├── config/
│ │ ├── GatewayConfig.java # 路由配置
│ │ ├── SecurityConfig.java # 认证配置
│ │ └── RateLimitConfig.java # 限流配置
│ ├── filter/
│ │ ├── AuthenticationFilter.java # 认证过滤器
│ │ ├── RateLimitFilter.java # 限流过滤器
│ │ ├── RequestTransformFilter.java # 请求转换
│ │ ├── SemanticCacheFilter.java # 语义缓存
│ │ ├── CostTrackingFilter.java # 成本追踪
│ │ └── MetricsFilter.java # 监控指标
│ ├── router/
│ │ └── AiProviderRouter.java # Provider路由逻辑
│ ├── service/
│ │ ├── ApiKeyService.java # API Key管理
│ │ ├── CostService.java # 成本统计
│ │ └── SemanticCacheService.java # 语义缓存
│ └── model/
│ ├── BusinessLine.java # 业务线定义
│ └── ProviderConfig.java # Provider配置Maven 依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Token计数 -->
<dependency>
<groupId>com.knuddels</groupId>
<artifactId>jtokkit</artifactId>
<version>1.1.0</version>
</dependency>
<!-- 向量相似度计算(语义缓存用)-->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-redis-store-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>统一认证:内部系统的 API Key 管理
// src/main/java/com/company/gateway/filter/AuthenticationFilter.java
package com.company.gateway.filter;
import com.company.gateway.service.ApiKeyService;
import com.company.gateway.model.BusinessLine;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Component
@Slf4j
@RequiredArgsConstructor
public class AuthenticationFilter extends AbstractGatewayFilterFactory<AuthenticationFilter.Config> {
private final ApiKeyService apiKeyService;
public AuthenticationFilter() {
super(Config.class);
this.apiKeyService = null;
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
// 1. 提取内部 API Key(业务系统使用网关 Key,不是 Provider Key)
String internalKey = extractApiKey(exchange);
if (internalKey == null) {
return unauthorized(exchange, "缺少 Authorization 头");
}
// 2. 验证 Key 并获取业务线信息
return apiKeyService.validateAndGetBusinessLine(internalKey)
.flatMap(businessLine -> {
if (businessLine == null) {
return unauthorized(exchange, "无效的 API Key");
}
// 3. 将业务线信息注入到请求上下文,供后续 Filter 使用
ServerWebExchange mutatedExchange = exchange.mutate()
.request(r -> r.headers(headers -> {
headers.set("X-Business-Line", businessLine.getId());
headers.set("X-Business-Name", businessLine.getName());
// 移除内部 Key,后续 Filter 会注入真实的 Provider Key
headers.remove("Authorization");
}))
.build();
log.debug("认证通过: businessLine={}", businessLine.getId());
return chain.filter(mutatedExchange);
});
};
}
private String extractApiKey(ServerWebExchange exchange) {
String authHeader = exchange.getRequest().getHeaders().getFirst("Authorization");
if (authHeader != null && authHeader.startsWith("Bearer ")) {
return authHeader.substring(7);
}
return null;
}
private Mono<Void> unauthorized(ServerWebExchange exchange, String reason) {
log.warn("认证失败: {}, ip={}", reason,
exchange.getRequest().getRemoteAddress());
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
public static class Config {}
}// src/main/java/com/company/gateway/service/ApiKeyService.java
package com.company.gateway.service;
import com.company.gateway.model.BusinessLine;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
public class ApiKeyService {
private final ReactiveRedisTemplate<String, BusinessLine> redisTemplate;
private static final String KEY_PREFIX = "gateway:apikey:";
/**
* 验证内部 API Key 并返回对应的业务线信息
* 使用 Redis 缓存,避免每次请求都查数据库
*/
public Mono<BusinessLine> validateAndGetBusinessLine(String apiKey) {
return redisTemplate.opsForValue()
.get(KEY_PREFIX + apiKey);
}
/**
* 注册新的业务线 API Key
* 网关为每个业务系统生成独立的 Key,不暴露 Provider 真实 Key
*/
public Mono<String> registerBusinessLine(BusinessLine businessLine) {
String newKey = generateSecureKey();
businessLine.setApiKey(newKey);
return redisTemplate.opsForValue()
.set(KEY_PREFIX + newKey, businessLine)
.thenReturn(newKey);
}
/**
* 吊销 API Key(某业务线下线或 Key 泄露时使用)
*/
public Mono<Boolean> revokeKey(String apiKey) {
return redisTemplate.delete(KEY_PREFIX + apiKey)
.map(count -> count > 0);
}
private String generateSecureKey() {
byte[] bytes = new byte[32];
new java.security.SecureRandom().nextBytes(bytes);
return "ak-" + java.util.Base64.getUrlEncoder().withoutPadding().encodeToString(bytes);
}
}AI 路由:根据业务类型路由到不同 Provider
// src/main/java/com/company/gateway/router/AiProviderRouter.java
package com.company.gateway.router;
import com.company.gateway.model.ProviderConfig;
import lombok.Data;
import lombok.extern.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Slf4j
public class AiProviderRouter {
// 路由规则:业务线 -> Provider 优先级列表
private static final Map<String, List<ProviderConfig>> ROUTING_RULES = Map.of(
"customer-service", List.of(
ProviderConfig.of("qwen-max", "https://dashscope.aliyuncs.com/compatible-mode/v1", 70),
ProviderConfig.of("gpt-4o", "https://api.openai.com/v1", 30)
),
"content-review", List.of(
ProviderConfig.of("gpt-4o-mini", "https://api.openai.com/v1", 100)
),
"code-generation", List.of(
ProviderConfig.of("claude-3-5-sonnet", "https://api.anthropic.com/v1", 60),
ProviderConfig.of("gpt-4o", "https://api.openai.com/v1", 40)
),
"default", List.of(
ProviderConfig.of("gpt-4o", "https://api.openai.com/v1", 100)
)
);
// 加权轮询计数器
private final Map<String, AtomicInteger> roundRobinCounters = new java.util.concurrent.ConcurrentHashMap<>();
/**
* 根据业务线选择 Provider
* 支持加权随机策略
*/
public ProviderConfig route(String businessLineId, String requestedModel) {
// 如果请求指定了特定模型,直接路由到对应 Provider
if (requestedModel != null && !requestedModel.isBlank()) {
return routeByModel(requestedModel);
}
// 按业务线路由
List<ProviderConfig> providers = ROUTING_RULES.getOrDefault(
businessLineId,
ROUTING_RULES.get("default")
);
ProviderConfig selected = weightedRandom(providers);
log.debug("AI路由: businessLine={}, provider={}, model={}",
businessLineId, selected.getProviderName(), selected.getDefaultModel());
return selected;
}
/**
* 按模型名称直接路由到对应 Provider
*/
private ProviderConfig routeByModel(String model) {
if (model.startsWith("gpt-") || model.startsWith("o1") || model.startsWith("o3")) {
return ProviderConfig.of(model, "https://api.openai.com/v1", 100);
} else if (model.startsWith("claude-")) {
return ProviderConfig.of(model, "https://api.anthropic.com/v1", 100);
} else if (model.startsWith("qwen-")) {
return ProviderConfig.of(model, "https://dashscope.aliyuncs.com/compatible-mode/v1", 100);
} else if (model.startsWith("gemini-")) {
return ProviderConfig.of(model, "https://generativelanguage.googleapis.com/v1beta/openai", 100);
}
// 默认
return ProviderConfig.of(model, "https://api.openai.com/v1", 100);
}
/**
* 加权随机选择 Provider
*/
private ProviderConfig weightedRandom(List<ProviderConfig> providers) {
if (providers.size() == 1) return providers.get(0);
int totalWeight = providers.stream().mapToInt(ProviderConfig::getWeight).sum();
int random = (int) (Math.random() * totalWeight);
int cumulative = 0;
for (ProviderConfig provider : providers) {
cumulative += provider.getWeight();
if (random < cumulative) {
return provider;
}
}
return providers.get(0);
}
}限流策略:多维度限流
// src/main/java/com/company/gateway/filter/RateLimitFilter.java
package com.company.gateway.filter;
import lombok.RequiredArgsConstructor;
import lombok.extern.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
@Component
@Slf4j
@RequiredArgsConstructor
public class RateLimitFilter extends AbstractGatewayFilterFactory<RateLimitFilter.Config> {
private final ReactiveRedisTemplate<String, String> redisTemplate;
// Lua 脚本:令牌桶算法(原子操作,防止并发问题)
private static final String TOKEN_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local refill_period = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local current_tokens = tonumber(redis.call('GET', key))
if current_tokens == nil then
current_tokens = capacity
end
-- 计算需要补充的令牌
local last_refill = tonumber(redis.call('GET', key .. ':last_refill'))
local now = tonumber(ARGV[5])
if last_refill ~= nil then
local elapsed = now - last_refill
local tokens_to_add = math.floor(elapsed / refill_period * refill_rate)
current_tokens = math.min(capacity, current_tokens + tokens_to_add)
end
redis.call('SET', key .. ':last_refill', now, 'EX', 3600)
if current_tokens >= requested then
redis.call('SET', key, current_tokens - requested, 'EX', 3600)
return 1 -- 允许通过
else
redis.call('SET', key, current_tokens, 'EX', 3600)
return 0 -- 拒绝
end
""";
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
String businessLine = exchange.getRequest().getHeaders().getFirst("X-Business-Line");
String userId = exchange.getRequest().getHeaders().getFirst("X-User-Id");
// 三层限流:全局 -> 业务线 -> 用户
return checkRateLimit("global", 10000, 1000, 1) // 全局每秒10000次
.flatMap(allowed -> {
if (!allowed) return tooManyRequests(exchange, "全局限流");
return checkRateLimit("biz:" + businessLine, 1000, 100, 1); // 业务线每秒1000次
})
.flatMap(allowed -> {
if (!allowed) return tooManyRequests(exchange, "业务线限流: " + businessLine);
if (userId == null) return Mono.just(true);
return checkRateLimit("user:" + userId, 60, 10, 1); // 用户每分钟60次
})
.flatMap(allowed -> {
if (!allowed) return tooManyRequests(exchange, "用户限流");
return chain.filter(exchange);
});
};
}
/**
* 基于令牌桶的限流检查
*
* @param key 限流维度key
* @param capacity 桶容量
* @param refillRate 每个周期补充的令牌数
* @param requested 本次请求消耗的令牌数
*/
private Mono<Boolean> checkRateLimit(String key, int capacity, int refillRate, int requested) {
String redisKey = "ratelimit:" + key;
long now = System.currentTimeMillis();
return redisTemplate.execute(
RedisScript.of(TOKEN_BUCKET_SCRIPT, Long.class),
List.of(redisKey),
String.valueOf(capacity),
String.valueOf(refillRate),
String.valueOf(1000), // 1秒补充周期
String.valueOf(requested),
String.valueOf(now)
).map(result -> result != null && result == 1L).next();
}
private Mono<Void> tooManyRequests(ServerWebExchange exchange, String reason) {
log.warn("限流触发: {}, path={}", reason, exchange.getRequest().getPath());
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
exchange.getResponse().getHeaders().add("Retry-After", "1");
return exchange.getResponse().setComplete();
}
public static class Config {}
}请求转换:统一化不同 Provider 的请求格式
// src/main/java/com/company/gateway/filter/RequestTransformFilter.java
package com.company.gateway.filter;
import com.company.gateway.model.ProviderConfig;
import com.company.gateway.router.AiProviderRouter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@Component
@Slf4j
@RequiredArgsConstructor
public class RequestTransformFilter extends AbstractGatewayFilterFactory<RequestTransformFilter.Config> {
private final AiProviderRouter router;
private final ApiKeyVaultService apiKeyVault;
private final ObjectMapper objectMapper;
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
String businessLine = exchange.getRequest().getHeaders().getFirst("X-Business-Line");
return exchange.getRequest().getBody()
.collectList()
.flatMap(dataBuffers -> {
// 读取请求体
byte[] bodyBytes = readBytes(dataBuffers);
String bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
try {
JsonNode requestBody = objectMapper.readTree(bodyStr);
String requestedModel = requestBody.path("model").asText(null);
// 路由到目标 Provider
ProviderConfig provider = router.route(businessLine, requestedModel);
// 转换请求(统一化 -> Provider 特定格式)
String transformedBody = transformRequest(requestBody, provider);
// 注入 Provider 真实 API Key
String providerKey = apiKeyVault.getKey(provider.getProviderName());
// 重写请求目标 URI
ServerWebExchange modifiedExchange = exchange.mutate()
.request(mutateRequest(exchange.getRequest(), provider, transformedBody, providerKey))
.attribute("selectedProvider", provider)
.attribute("businessLine", businessLine)
.build();
// 更新路由目标
exchange.getAttributes().put(
ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR,
URI.create(provider.getBaseUrl() + exchange.getRequest().getPath())
);
return chain.filter(modifiedExchange);
} catch (Exception e) {
log.error("请求转换失败", e);
return Mono.error(e);
}
});
};
}
/**
* 将统一格式转换为 Provider 特定格式
* 目前主流 Provider 都支持 OpenAI 兼容格式,所以转换工作较少
* 但需要处理一些 Provider 特有的参数
*/
private String transformRequest(JsonNode request, ProviderConfig provider) throws Exception {
ObjectNode transformed = request.deepCopy();
// 确保使用 Provider 的正确模型名
if (!transformed.has("model") || transformed.get("model").asText().isBlank()) {
transformed.put("model", provider.getDefaultModel());
}
// Anthropic Claude 需要特殊处理:system message 需要单独提取
if (provider.getProviderName().startsWith("claude")) {
transformed = transformForClaude(transformed);
}
// 移除业务层传入的自定义字段(避免 Provider 报错)
transformed.remove("businessMetadata");
transformed.remove("cacheStrategy");
return objectMapper.writeValueAsString(transformed);
}
/**
* Claude 请求格式转换:
* OpenAI 格式的 system message 在 messages 数组中
* Claude 需要 system 作为独立字段
*/
private ObjectNode transformForClaude(ObjectNode request) {
// ... Claude 特定转换逻辑
return request;
}
private ServerHttpRequest mutateRequest(ServerHttpRequest original, ProviderConfig provider,
String newBody, String apiKey) {
byte[] newBodyBytes = newBody.getBytes(StandardCharsets.UTF_8);
return new ServerHttpRequestDecorator(original) {
@Override
public Flux<DataBuffer> getBody() {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(newBodyBytes);
return Flux.just(buffer);
}
@Override
public org.springframework.http.HttpHeaders getHeaders() {
var headers = new org.springframework.http.HttpHeaders();
headers.putAll(super.getHeaders());
headers.setContentLength(newBodyBytes.length);
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBearerAuth(apiKey);
// 添加用于追踪的请求ID
headers.set("X-Request-Id", java.util.UUID.randomUUID().toString());
return headers;
}
};
}
private byte[] readBytes(java.util.List<DataBuffer> dataBuffers) {
int totalSize = dataBuffers.stream().mapToInt(DataBuffer::readableByteCount).sum();
byte[] result = new byte[totalSize];
int offset = 0;
for (DataBuffer buffer : dataBuffers) {
int size = buffer.readableByteCount();
buffer.read(result, offset, size);
offset += size;
}
return result;
}
public static class Config {}
}成本追踪:在网关层统计所有 AI 调用成本
// src/main/java/com/company/gateway/filter/CostTrackingFilter.java
package com.company.gateway.filter;
import com.company.gateway.model.ProviderConfig;
import com.company.gateway.service.CostService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
@Component
@Slf4j
@RequiredArgsConstructor
public class CostTrackingFilter extends AbstractGatewayFilterFactory<CostTrackingFilter.Config> {
private final CostService costService;
private final ObjectMapper objectMapper;
// 定价表(每1000 token 的费用,单位:美元分)
private static final java.util.Map<String, ModelPricing> PRICING = java.util.Map.of(
"gpt-4o", new ModelPricing(250, 1000), // 输入$2.5, 输出$10/1M token
"gpt-4o-mini", new ModelPricing(15, 60), // 输入$0.15, 输出$0.6/1M token
"claude-3-5-sonnet",new ModelPricing(300, 1500), // 输入$3, 输出$15/1M token
"qwen-max", new ModelPricing(40, 120), // 输入约$0.04, 输出$0.12/1M token
"gemini-1.5-pro", new ModelPricing(125, 500) // 输入$1.25, 输出$5/1M token
);
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
long startTime = System.currentTimeMillis();
String businessLine = (String) exchange.getAttributes().get("businessLine");
ProviderConfig provider = (ProviderConfig) exchange.getAttributes().get("selectedProvider");
// 包装响应,在响应返回时解析 usage 信息
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return Flux.from(body)
.collectList()
.flatMap(buffers -> {
byte[] responseBytes = readBytes(buffers);
String responseStr = new String(responseBytes, StandardCharsets.UTF_8);
// 异步解析 usage 信息,不阻塞响应
Mono.fromRunnable(() -> {
try {
parseAndRecordCost(
responseStr, businessLine, provider,
System.currentTimeMillis() - startTime
);
} catch (Exception e) {
log.error("成本记录失败", e);
}
}).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe();
// 重新写入响应
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(responseBytes);
return super.writeWith(Flux.just(buffer));
});
}
};
return chain.filter(exchange.mutate().response(responseDecorator).build());
};
}
private void parseAndRecordCost(String responseBody, String businessLine,
ProviderConfig provider, long latencyMs) {
try {
JsonNode response = objectMapper.readTree(responseBody);
if (!response.has("usage")) return;
JsonNode usage = response.get("usage");
int promptTokens = usage.path("prompt_tokens").asInt(0);
int completionTokens = usage.path("completion_tokens").asInt(0);
String model = response.path("model").asText(provider.getDefaultModel());
// 计算费用
ModelPricing pricing = PRICING.getOrDefault(model,
PRICING.get("gpt-4o")); // 默认按 gpt-4o 计价
double costCents = (promptTokens * pricing.inputCentsPerMillionTokens() / 1_000_000.0)
+ (completionTokens * pricing.outputCentsPerMillionTokens() / 1_000_000.0);
CostRecord record = CostRecord.builder()
.businessLine(businessLine)
.provider(provider.getProviderName())
.model(model)
.promptTokens(promptTokens)
.completionTokens(completionTokens)
.totalTokens(promptTokens + completionTokens)
.costUsdCents(costCents)
.latencyMs(latencyMs)
.timestamp(java.time.Instant.now())
.build();
costService.record(record);
log.debug("成本记录: businessLine={}, model={}, tokens={}+{}={}, cost={:.4f}美分",
businessLine, model, promptTokens, completionTokens,
promptTokens + completionTokens, costCents);
} catch (Exception e) {
log.error("解析响应成本信息失败", e);
}
}
private byte[] readBytes(java.util.List<DataBuffer> buffers) {
int total = buffers.stream().mapToInt(DataBuffer::readableByteCount).sum();
byte[] result = new byte[total];
int offset = 0;
for (DataBuffer buf : buffers) {
int size = buf.readableByteCount();
buf.read(result, offset, size);
offset += size;
}
return result;
}
record ModelPricing(double inputCentsPerMillionTokens, double outputCentsPerMillionTokens) {}
public static class Config {}
}成本统计服务
// src/main/java/com/company/gateway/service/CostService.java
package com.company.gateway.service;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Map;
@Service
@RequiredArgsConstructor
public class CostService {
private final ReactiveRedisTemplate<String, String> redisTemplate;
private final CostRepository costRepository;
/**
* 记录成本(先写 Redis,定期刷新到数据库)
*/
public void record(CostRecord record) {
String today = LocalDate.now().format(DateTimeFormatter.ISO_DATE);
// 按业务线累计 token 数和费用
String costKey = String.format("cost:%s:%s:%s",
today, record.businessLine(), record.model());
redisTemplate.opsForHash()
.increment(costKey, "promptTokens", record.promptTokens())
.subscribe();
redisTemplate.opsForHash()
.increment(costKey, "completionTokens", record.completionTokens())
.subscribe();
redisTemplate.opsForHash()
.increment(costKey, "requests", 1)
.subscribe();
// 使用 increment 处理浮点数(转为微分计算)
long costMicro = (long) (record.costUsdCents() * 10000);
redisTemplate.opsForHash()
.increment(costKey, "costMicro", costMicro)
.subscribe();
// 设置 TTL 2天(避免 Redis 无限增长)
redisTemplate.expire(costKey, java.time.Duration.ofDays(2)).subscribe();
}
/**
* 查询今日各业务线的费用汇总
*/
public Mono<Map<String, BusinessLineCostSummary>> getDailyCostByBusinessLine(String date) {
return costRepository.findByDate(date)
.collectMultimap(
CostRecord::businessLine,
record -> record
)
.map(groups -> {
Map<String, BusinessLineCostSummary> result = new java.util.HashMap<>();
groups.forEach((bizLine, records) -> {
long totalTokens = records.stream()
.mapToLong(r -> r.promptTokens() + r.completionTokens()).sum();
double totalCost = records.stream()
.mapToDouble(CostRecord::costUsdCents).sum();
long requests = records.size();
result.put(bizLine, new BusinessLineCostSummary(bizLine, totalTokens, totalCost, requests));
});
return result;
});
}
/**
* 每小时将 Redis 数据刷新到数据库(持久化)
*/
@Scheduled(fixedDelay = 3600000)
public void flushToDatabase() {
// 将 Redis 中的小时汇总数据写入数据库
// ...
}
public record BusinessLineCostSummary(
String businessLine,
long totalTokens,
double totalCostUsdCents,
long totalRequests
) {}
}响应缓存:网关层的语义缓存
// src/main/java/com/company/gateway/filter/SemanticCacheFilter.java
package com.company.gateway.filter;
import com.company.gateway.service.SemanticCacheService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
@Component
@Slf4j
@RequiredArgsConstructor
public class SemanticCacheFilter extends AbstractGatewayFilterFactory<SemanticCacheFilter.Config> {
private final SemanticCacheService semanticCacheService;
private final ObjectMapper objectMapper;
// 语义相似度阈值:超过此值认为是"相同"问题,返回缓存
private static final double SIMILARITY_THRESHOLD = 0.95;
// 不缓存的场景:有 temperature > 0 的创意生成请求
private static final double MAX_CACHEABLE_TEMPERATURE = 0.1;
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
return exchange.getRequest().getBody()
.collectList()
.flatMap(buffers -> {
byte[] bodyBytes = readBytes(buffers);
String bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
try {
JsonNode request = objectMapper.readTree(bodyStr);
// 判断是否可以缓存
if (!isCacheable(request)) {
return chain.filter(exchange);
}
// 提取用户消息(用于语义相似度计算)
String userMessage = extractUserMessage(request);
if (userMessage == null) {
return chain.filter(exchange);
}
// 查询语义缓存
return semanticCacheService.findSimilar(userMessage, SIMILARITY_THRESHOLD)
.flatMap(cachedResponse -> {
if (cachedResponse != null) {
log.debug("语义缓存命中: similarity={:.3f}",
cachedResponse.similarity());
return writeResponse(exchange, cachedResponse.content());
}
// 缓存未命中,继续请求并缓存结果
return chainAndCache(exchange, chain, userMessage, bodyBytes);
});
} catch (Exception e) {
return chain.filter(exchange);
}
});
};
}
private boolean isCacheable(JsonNode request) {
// 有 stream=true 的请求不缓存
if (request.path("stream").asBoolean(false)) return false;
// temperature > 0.1 的创意生成不缓存
double temperature = request.path("temperature").asDouble(1.0);
if (temperature > MAX_CACHEABLE_TEMPERATURE) return false;
return true;
}
private String extractUserMessage(JsonNode request) {
JsonNode messages = request.get("messages");
if (messages == null || !messages.isArray()) return null;
// 找最后一条 user 消息
for (int i = messages.size() - 1; i >= 0; i--) {
JsonNode message = messages.get(i);
if ("user".equals(message.path("role").asText())) {
return message.path("content").asText();
}
}
return null;
}
private Mono<Void> chainAndCache(ServerWebExchange exchange,
org.springframework.cloud.gateway.filter.GatewayFilterChain chain,
String userMessage, byte[] requestBytes) {
// 需要拦截响应并缓存
// 这里简化处理,实际实现需要 Response Decorator
return chain.filter(exchange);
}
private Mono<Void> writeResponse(ServerWebExchange exchange, String content) {
exchange.getResponse().setStatusCode(HttpStatus.OK);
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
exchange.getResponse().getHeaders().add("X-Cache", "HIT");
DataBuffer buffer = exchange.getResponse().bufferFactory()
.wrap(content.getBytes(StandardCharsets.UTF_8));
return exchange.getResponse().writeWith(Mono.just(buffer));
}
private byte[] readBytes(java.util.List<DataBuffer> buffers) {
int total = buffers.stream().mapToInt(DataBuffer::readableByteCount).sum();
byte[] result = new byte[total];
int offset = 0;
for (DataBuffer buf : buffers) {
int size = buf.readableByteCount();
buf.read(result, offset, size);
offset += size;
}
return result;
}
public static class Config {}
}监控看板配置
# prometheus.yml(追加以下配置)
scrape_configs:
- job_name: 'ai-gateway'
static_configs:
- targets: ['ai-gateway:8080']
metrics_path: '/actuator/prometheus'关键监控指标:
// src/main/java/com/company/gateway/metrics/AiGatewayMetrics.java
package com.company.gateway.metrics;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.time.Duration;
@Component
@RequiredArgsConstructor
public class AiGatewayMetrics {
private final MeterRegistry registry;
/**
* 记录 AI 请求延迟
*/
public void recordLatency(String businessLine, String provider, String model,
long latencyMs, boolean isSuccess) {
Timer.builder("ai.request.duration")
.tag("businessLine", businessLine)
.tag("provider", provider)
.tag("model", model)
.tag("success", String.valueOf(isSuccess))
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry)
.record(Duration.ofMillis(latencyMs));
}
/**
* 记录 Token 使用量
*/
public void recordTokenUsage(String businessLine, String model,
int promptTokens, int completionTokens) {
registry.counter("ai.tokens.prompt",
"businessLine", businessLine, "model", model
).increment(promptTokens);
registry.counter("ai.tokens.completion",
"businessLine", businessLine, "model", model
).increment(completionTokens);
}
/**
* 记录缓存命中率
*/
public void recordCacheResult(String businessLine, boolean hit) {
registry.counter("ai.cache.result",
"businessLine", businessLine,
"result", hit ? "hit" : "miss"
).increment();
}
/**
* 记录限流触发次数
*/
public void recordRateLimit(String businessLine, String limitType) {
registry.counter("ai.ratelimit.triggered",
"businessLine", businessLine,
"type", limitType
).increment();
}
}Grafana 看板包含的关键面板:
- 各业务线 QPS 实时趋势
- P95/P99 响应时间
- 各 Provider 错误率
- 今日累计费用(按业务线分组)
- Token 使用速率(Token/分钟)
- 语义缓存命中率
- 限流触发次数
FAQ
Q1:AI 网关和普通 API 网关有什么本质区别?
普通 API 网关主要处理 HTTP 协议层面的路由、认证、限流。AI 网关还需要理解 LLM 特有的概念:Token 计费、语义缓存(相似请求复用)、流式输出(SSE)的透传、Provider 差异化路由。这些是普通网关不具备的能力。
Q2:语义缓存会不会导致 AI 回答不准确?
只对相似度极高(阈值 0.95 以上)且 temperature=0 的请求做缓存。对于要求创意性的生成(temperature>0.1),不走缓存。缓存命中的前提是"本质上是同一个问题",这种情况下回答相同是合理的。
Q3:多个 Provider 的账单还是分开的,网关的成本统计能真的代替账单吗?
网关的成本统计是业务视角的(知道哪个业务线花了多少),Provider 账单是真实计费的(最终结算依据)。两者可以对比验证。如果偏差持续超过 5%,说明网关的 Token 计数逻辑有问题。
Q4:网关本身挂了怎么办?所有 AI 调用都不可用了。
网关需要高可用部署:多实例 + 负载均衡 + 健康检查。对于极端情况(网关集群整体故障),可以在各业务系统内置 Emergency Bypass 机制:检测到网关不可用时,直接调用 Provider API(使用备用密钥)。但这种 bypass 需要严格控制权限和审计。
Q5:流式输出(SSE)如何在网关层透传?
SSE 流式输出不能被 Gateway 完整缓冲后再转发(否则失去实时性)。Spring Cloud Gateway 天然支持流式代理,只需确保 CostTrackingFilter 在 SSE 的 [DONE] 事件后解析最终的 usage 信息即可。语义缓存对 stream=true 的请求禁用。
总结
AI 网关的价值,随着公司 AI 使用规模的增长越来越大:
| 规模 | 没有网关的痛点 | 网关解决的问题 |
|---|---|---|
| 1-3个系统 | 还好,可以接受 | 收益有限 |
| 5-10个系统 | API Key 管理开始混乱 | 统一认证价值显现 |
| 10+个系统 | 成本失控,无法优化 | 成本追踪和语义缓存价值巨大 |
| 20+个系统 | 任何 Provider 变更都是灾难 | 统一路由价值爆发 |
赵磊的团队在引入 AI 网关后的第一个月:语义缓存命中率 31%,节省了约 14 万 Token;成本追踪发现一个业务线的 bug 导致了无效重试,修复后节省每月 8 万;3 次 Provider 切换,每次都是网关配置修改,业务系统零变更。
那 47 万的账单,三个月后降到了 28 万。
