AI 网关的实现——统一管理公司所有大模型调用
AI 网关的实现——统一管理公司所有大模型调用
公司发展到一定规模,你就会发现一个让人头疼的问题:大模型调用入口失控了。
A 团队在用 OpenAI,自己申请了 API Key,硬编码在代码里;B 团队在用文心一言,搞了个自己的封装;C 团队在测试 Claude,用的是个人账号的 Key。不同团队各自为政,公司层面完全不知道总共花了多少钱,调用了哪些模型,有没有安全合规风险。
某天要做 AI 成本审计,或者某个 API Key 泄露需要紧急替换,你会发现你根本不知道去哪里找、改哪里。
这就是为什么需要 AI 网关——一个统一的大模型调用代理层。
注意我说的不是普通的 API 网关。API 网关(比如 Kong、Nginx)处理的是业务 HTTP 请求的路由和鉴权。AI 网关专门处理大模型调用,需要额外关注 Token 流式响应、成本计量、Prompt 审计、多模型路由等 AI 特有的需求。这是两件不同的事。
AI 网关 vs 普通 API 网关
先把差异说清楚,不然上来就用 Kong 或者 Nginx 做 AI 网关,你会发现很多场景处理不了。
| 维度 | 普通 API 网关 | AI 网关 |
|---|---|---|
| 响应处理 | 转发 HTTP 响应 | 需要处理 SSE 流式响应,理解 Token 边界 |
| 限流单位 | QPS/并发数 | QPS + Token 每分钟消耗量(双重维度) |
| 日志内容 | URL、状态码、耗时 | URL + Prompt 摘要 + Token 数 + 成本 |
| 路由策略 | 基于路径/Header 路由 | 基于功能类型、成本预算、模型能力路由 |
| 成本控制 | 不涉及 | 核心功能,按用户/租户/功能限额 |
| 安全审计 | 传输加密、认证 | Prompt 内容合规检查、敏感信息检测 |
| 缓存策略 | URL 级别缓存 | 语义相似请求缓存(完全不同的问题) |
看到这个表你就明白,AI 网关不是在 API 网关上套一层,而是需要理解 AI 调用语义的专用组件。
核心功能设计
我们的 AI 网关基于 Spring Cloud Gateway 实现,主要处理以下功能:
- 统一认证:所有下游服务不再各自持有模型 API Key,统一由网关管理
- 多模型路由:根据请求类型和配置,把请求分发给合适的模型
- 限流和配额:Token 级别的速率限制和用户/租户配额
- 成本记账:实时记录每次调用的 Token 消耗和成本
- Prompt 日志:对调用内容做审计日志(脱敏后)
- 流式响应代理:正确处理 SSE 流式响应,不把流截断
架构设计
核心代码实现
网关配置
spring:
cloud:
gateway:
routes:
# OpenAI 代理路由
- id: openai-proxy
uri: https://api.openai.com
predicates:
- Path=/ai-gateway/openai/**
filters:
- StripPrefix=2
- name: AiAuthentication
- name: TokenRateLimit
args:
replenishRate: 100000 # 每秒补充 Token 桶
burstCapacity: 200000
- name: AiCostAccounting
- name: PromptAudit
- RewritePath=/(?<segment>.*), /${segment}
- AddRequestHeader=Authorization, Bearer ${ai.gateway.openai.api-key}
# Claude 代理路由
- id: claude-proxy
uri: https://api.anthropic.com
predicates:
- Path=/ai-gateway/claude/**
filters:
- StripPrefix=2
- name: AiAuthentication
- name: TokenRateLimit
- name: AiCostAccounting
- name: PromptAudit
- AddRequestHeader=x-api-key, ${ai.gateway.claude.api-key}
- AddRequestHeader=anthropic-version, 2023-06-01认证过滤器
@Component
@Slf4j
public class AiAuthenticationFilter implements GatewayFilter, Ordered {
@Autowired
private AppKeyService appKeyService;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 从 Header 获取应用凭证
String appKey = request.getHeaders().getFirst("X-App-Key");
String appSecret = request.getHeaders().getFirst("X-App-Secret");
if (appKey == null || appSecret == null) {
return unauthorized(exchange, "Missing X-App-Key or X-App-Secret header");
}
// 验证 AppKey/AppSecret
return appKeyService.validate(appKey, appSecret)
.flatMap(appInfo -> {
if (appInfo == null) {
return unauthorized(exchange, "Invalid credentials");
}
// 检查应用是否有权限访问目标模型
String targetModel = extractTargetModel(request.getPath().value());
if (!appInfo.isAllowed(targetModel)) {
return forbidden(exchange, "App not allowed to access model: " + targetModel);
}
// 将应用信息写入 exchange 属性,供后续过滤器使用
exchange.getAttributes().put("appInfo", appInfo);
exchange.getAttributes().put("tenantId", appInfo.getTenantId());
exchange.getAttributes().put("appId", appInfo.getAppId());
// 移除下游不应该看到的认证 Header
ServerHttpRequest modifiedRequest = request.mutate()
.headers(headers -> {
headers.remove("X-App-Key");
headers.remove("X-App-Secret");
})
.build();
return chain.filter(exchange.mutate().request(modifiedRequest).build());
});
}
private Mono<Void> unauthorized(ServerWebExchange exchange, String message) {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().writeWith(
Mono.just(exchange.getResponse().bufferFactory()
.wrap(("{\"error\":\"" + message + "\"}").getBytes())));
}
private Mono<Void> forbidden(ServerWebExchange exchange, String message) {
exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
return exchange.getResponse().writeWith(
Mono.just(exchange.getResponse().bufferFactory()
.wrap(("{\"error\":\"" + message + "\"}").getBytes())));
}
@Override
public int getOrder() {
return -100; // 最先执行
}
}Token 双维度限流
@Component
public class TokenRateLimitFilter implements GatewayFilter, Ordered {
@Autowired
private ReactiveRedisTemplate<String, Long> redisTemplate;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String appId = (String) exchange.getAttributes().get("appId");
String tenantId = (String) exchange.getAttributes().get("tenantId");
// 1. 检查 QPS 限制(使用 Redis 滑动窗口)
String qpsKey = "rate:qps:" + appId + ":" + (System.currentTimeMillis() / 1000);
return redisTemplate.opsForValue()
.increment(qpsKey)
.flatMap(count -> {
if (count == 1) {
// 新的一秒,设置过期
redisTemplate.expire(qpsKey, Duration.ofSeconds(2)).subscribe();
}
AppRateLimitConfig config = getConfig(appId);
if (count > config.getQpsLimit()) {
return tooManyRequests(exchange, "QPS limit exceeded");
}
// 2. 检查分钟级 Token 限制
String tokenMinuteKey = "rate:token:min:" + appId + ":" + (System.currentTimeMillis() / 60000);
return redisTemplate.opsForValue()
.get(tokenMinuteKey)
.defaultIfEmpty(0L)
.flatMap(tokenCount -> {
if (tokenCount > config.getTokensPerMinuteLimit()) {
return tooManyRequests(exchange, "Token per minute limit exceeded");
}
// 通过限流,记录本次请求的 Token 消耗(在响应拦截时更新)
exchange.getAttributes().put("tokenMinuteKey", tokenMinuteKey);
return chain.filter(exchange);
});
});
}
private Mono<Void> tooManyRequests(ServerWebExchange exchange, String message) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
exchange.getResponse().getHeaders().add("Retry-After", "1");
return exchange.getResponse().writeWith(
Mono.just(exchange.getResponse().bufferFactory()
.wrap(("{\"error\":\"" + message + "\"}").getBytes())));
}
@Override
public int getOrder() {
return -80;
}
}流式响应的成本计账
这是最复杂的部分。流式响应(SSE)不是一次性返回所有内容,是边生成边传输的。你不能等到响应结束才计账,因为:
- 流式响应可能持续几十秒
- 客户端可能中途断开连接
我们的方案是:拦截流式响应,在每个 SSE 事件中累加 Token 数,流结束(或连接断开)时写入账本。
@Component
@Slf4j
public class AiCostAccountingFilter implements GatewayFilter, Ordered {
@Autowired
private CostAccountingService costAccountingService;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 包装响应,拦截响应体
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
private final AtomicInteger outputTokenCount = new AtomicInteger(0);
private final AtomicInteger inputTokenCount = new AtomicInteger(0);
private volatile boolean accountingDone = false;
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
String contentType = getHeaders().getFirst(HttpHeaders.CONTENT_TYPE);
if (contentType != null && contentType.contains("text/event-stream")) {
// 流式响应处理
Flux<DataBuffer> modifiedBody = Flux.from(body)
.doOnNext(dataBuffer -> {
// 解析 SSE 事件,提取 Token 数
String data = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer()).toString();
updateTokenCount(data);
})
.doOnComplete(() -> {
if (!accountingDone) {
accountingDone = true;
doAccounting(exchange, inputTokenCount.get(), outputTokenCount.get());
}
})
.doOnError(e -> {
if (!accountingDone) {
accountingDone = true;
doAccounting(exchange, inputTokenCount.get(), outputTokenCount.get());
}
});
return super.writeWith(modifiedBody);
} else {
// 非流式响应
return super.writeWith(body);
}
}
private void updateTokenCount(String sseData) {
// 解析 OpenAI 格式的 SSE 数据
// data: {"id":"...","choices":[{"delta":{"content":"..."},"finish_reason":null}],"usage":null}
try {
if (sseData.startsWith("data: ") && !sseData.startsWith("data: [DONE]")) {
String json = sseData.substring(6).trim();
// 简单解析,生产环境建议用 Jackson
if (json.contains("\"usage\"") && !json.contains("\"usage\":null")) {
// 最后一个包含 usage 信息的 chunk
// 提取 completion_tokens 和 prompt_tokens
// 这里简化处理,实际要做 JSON 解析
outputTokenCount.addAndGet(estimateTokensFromChunk(json));
}
}
} catch (Exception e) {
log.debug("Error parsing SSE chunk: {}", e.getMessage());
}
}
private int estimateTokensFromChunk(String json) {
// 简化估算:按字符数估算 token(实际应该用 tiktoken 或 API 返回的 usage)
return 1; // 每个 delta chunk 约 1 个 token
}
};
return chain.filter(exchange.mutate().response(responseDecorator).build());
}
private void doAccounting(ServerWebExchange exchange, int inputTokens, int outputTokens) {
String appId = (String) exchange.getAttributes().get("appId");
String tenantId = (String) exchange.getAttributes().get("tenantId");
String targetModel = (String) exchange.getAttributes().get("targetModel");
costAccountingService.record(CostRecord.builder()
.appId(appId)
.tenantId(tenantId)
.model(targetModel)
.inputTokens(inputTokens)
.outputTokens(outputTokens)
.timestamp(System.currentTimeMillis())
.build());
}
@Override
public int getOrder() {
return -50;
}
}多模型路由决策器
@Service
public class ModelRoutingDecider {
@Autowired
private ModelHealthChecker healthChecker;
@Autowired
private BudgetService budgetService;
/**
* 根据请求特征决定路由到哪个模型
*/
public ModelRoute decide(RoutingRequest request) {
ModelPreference preference = request.getPreference();
// 策略1:强制指定模型
if (preference.getForceModel() != null) {
return ModelRoute.of(preference.getForceModel());
}
// 策略2:按能力要求选择
ModelCapability required = preference.getRequiredCapability();
List<ModelConfig> candidates = getCapableModels(required);
if (candidates.isEmpty()) {
throw new NoSuitableModelException("No model supports capability: " + required);
}
// 策略3:排除健康状态异常的模型
candidates = candidates.stream()
.filter(m -> healthChecker.isHealthy(m.getModelId()))
.collect(Collectors.toList());
// 策略4:如果有成本限制,优先选便宜的
if (preference.isCostOptimized()) {
candidates.sort(Comparator.comparingDouble(ModelConfig::getCostPerKToken));
}
// 策略5:检查租户预算,如果快超了就切到便宜模型
String tenantId = request.getTenantId();
if (budgetService.isNearLimit(tenantId)) {
candidates = filterToCheapModels(candidates);
}
return ModelRoute.of(candidates.get(0).getModelId());
}
}一些工程细节
API Key 的安全管理
所有模型的 API Key 存在 Vault 或者云厂商的 KMS 里,网关启动时注入,不落磁盘,不写日志。定期轮换 Key 只需要更新 Vault,业务代码完全无感知。
Prompt 日志脱敏
网关层的审计日志要做脱敏,用户发来的 Prompt 里可能包含手机号、身份证号等隐私信息,不能原文落库。我们在日志过滤器里做了正则脱敏,关键信息用 *** 替换后再落库。
流式响应不能缓存
普通 API 网关可以做响应缓存,AI 网关的流式响应不能缓存整个 body(太大,时效性也不对)。但可以做语义缓存:对于非流式的简单查询,如果有已有相似问题的缓存结果,可以直接返回。这需要用向量相似度判断,是个独立的功能模块。
连接保持
AI 调用耗时长,网关层的 HTTP 连接超时要设置合理,不能用默认的 30 秒。我们对不同模型设置了不同的超时:GPT-4o 普通调用 60 秒,流式调用 300 秒;Claude 类似。
为什么值得投入
搭这套 AI 网关,前期需要 2-3 周的开发工作量,维护也需要持续投入。有些团队觉得"我们就几个服务,用不着这么复杂"。
我觉得这要看公司 AI 化的深度。如果只有 1-2 个 AI 功能,确实可以简单做。但只要 AI 调用超过 10 个不同的服务,没有统一管控,迟早会出问题:
- 某个 Key 被滥用,整个账期的成本飙升,根本查不到源头
- 某个服务有 Bug 死循环调用 AI,把 Rate Limit 打爆,影响所有服务
- 要做安全审计,发现没有统一的 Prompt 日志
- 要换个成本更低的模型,发现要改十几个服务
AI 网关把这些问题一次性解决,后续任何 AI 相关的横切关注点,都只需要在网关层加一个过滤器就行。
