企业级AI网关:统一管控所有AI调用的基础设施
企业级AI网关:统一管控所有AI调用的基础设施
开篇故事:10个团队各自乱接OpenAI,公司账单混乱直到AI网关诞生
2025年8月,某头部电商公司的基础设施总监林峰收到了一张让他倒吸一口凉气的账单:本月OpenAI API费用:47万元。
问题不是钱多,而是没人说得清这47万是怎么花的。
公司有10个业务团队,每个团队都自己申请了OpenAI的API Key,自己写代码直接调用。商品描述生成团队一个Key,客服机器人团队一个Key,内容审核团队一个Key……10个团队,8个活跃的Key。
林峰花了一周时间盘点,发现了以下问题:
成本归因:这47万里,哪个团队花了多少?没有数据。财务要求分摊AI成本到各业务线,根本做不到。
Key泄露风险:有一个Key出现在了公司某内部工具的前端JS代码里(!),这个Key的历史调用里包含了大量用户数据。
没有任何审计:所有AI调用都绕过了公司的安全审计系统,违反了内部合规要求。
没有限流:某团队一次线上bug导致死循环调用AI,1小时内烧掉了8万元,直到OpenAI账户余额耗尽才停止。
重复调用:3个团队在做相似的文本分类任务,同样的文本被调用了3遍,浪费了约30%的成本。
林峰用了两个月,带领团队从零搭建了企业级AI网关,将每月AI成本控制在26万,实现了完整的审计、限流和成本分摊。
这篇文章,带你复现这套AI网关架构。
一、AI网关的核心职责
二、项目架构搭建
2.1 Maven依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.5</version>
</parent>
<artifactId>ai-gateway</artifactId>
<dependencies>
<!-- Spring Cloud Gateway -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!-- Spring AI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Redis(限流+缓存)-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<!-- JWT认证 -->
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
<version>0.12.3</version>
</dependency>
<!-- 监控 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- MyBatis Plus(审计日志存储)-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
</project>2.2 核心配置文件
# application.yml
server:
port: 8080
spring:
application:
name: ai-gateway
# Redis配置
data:
redis:
host: ${REDIS_HOST:localhost}
port: 6379
password: ${REDIS_PASSWORD:}
# Spring Cloud Gateway路由配置
cloud:
gateway:
routes:
# OpenAI路由
- id: openai-route
uri: https://api.openai.com
predicates:
- Path=/v1/chat/completions,/v1/embeddings
filters:
- name: ApiKeyRewrite # 内部Key替换为外部Key
- name: RateLimit # 限流过滤器
- name: AuditLog # 审计日志
- name: CostTracking # 成本追踪
- StripPrefix=0
# Anthropic路由
- id: anthropic-route
uri: https://api.anthropic.com
predicates:
- Path=/v1/messages
filters:
- name: ApiKeyRewrite
- name: RateLimit
- name: AuditLog
- name: CostTracking
- StripPrefix=0
# AI网关自定义配置
ai-gateway:
# 内部Key配置(各团队使用这些Key)
internal-keys:
enabled: true
key-prefix: "aigateway-"
# Provider配置
providers:
openai:
api-key: ${OPENAI_API_KEY}
base-url: https://api.openai.com/v1
enabled: true
priority: 1
anthropic:
api-key: ${ANTHROPIC_API_KEY}
base-url: https://api.anthropic.com
enabled: true
priority: 2
ollama:
base-url: http://ollama:11434
enabled: true
priority: 3 # 最高优先级用于低敏感任务
# 成本配置(每1000 Token的价格,美元)
cost:
openai:
gpt-4o:
input: 0.005
output: 0.015
gpt-4o-mini:
input: 0.00015
output: 0.0006
anthropic:
claude-3-5-sonnet:
input: 0.003
output: 0.015
# 限流配置
rate-limit:
default-rpm: 60 # 默认每分钟60次
default-tpm: 100000 # 默认每分钟10万Token
# 语义缓存配置
cache:
enabled: true
similarity-threshold: 0.92 # 相似度阈值
ttl-hours: 24
max-response-tokens: 500 # 只缓存短响应三、统一认证:内部Key管理
3.1 内部API Key实体
@Entity
@Table(name = "gateway_api_key")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GatewayApiKey {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
// 内部Key(分发给各团队)
@Column(name = "internal_key", unique = true, nullable = false, length = 100)
private String internalKey;
// 团队/应用信息
@Column(name = "team_name", nullable = false)
private String teamName;
@Column(name = "app_name", nullable = false)
private String appName;
@Column(name = "contact_email")
private String contactEmail;
// 权限配置
@Column(name = "allowed_models", columnDefinition = "JSON")
@Convert(converter = StringListConverter.class)
private List<String> allowedModels; // 允许使用的模型列表
@Column(name = "allowed_scenes", columnDefinition = "JSON")
@Convert(converter = StringListConverter.class)
private List<String> allowedScenes; // 允许的业务场景
// 限流配置(覆盖默认配置)
@Column(name = "rpm_limit")
private Integer rpmLimit; // 每分钟请求数上限
@Column(name = "tpm_limit")
private Integer tpmLimit; // 每分钟Token上限
@Column(name = "daily_cost_limit")
private BigDecimal dailyCostLimit; // 每日费用上限(美元)
// 月度预算
@Column(name = "monthly_budget")
private BigDecimal monthlyBudget;
// Key状态
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private KeyStatus status; // ACTIVE/SUSPENDED/EXPIRED
// 有效期
@Column(name = "expires_at")
private LocalDateTime expiresAt;
// 统计信息
@Column(name = "total_requests")
private Long totalRequests = 0L;
@Column(name = "total_cost")
private BigDecimal totalCost = BigDecimal.ZERO;
@CreationTimestamp
@Column(name = "created_at")
private LocalDateTime createdAt;
@UpdateTimestamp
@Column(name = "updated_at")
private LocalDateTime updatedAt;
}3.2 认证过滤器实现
@Component
@Slf4j
@RequiredArgsConstructor
public class ApiKeyAuthenticationFilter implements GlobalFilter, Ordered {
private static final String INTERNAL_KEY_HEADER = "X-AI-Gateway-Key";
private static final String TEAM_ID_ATTR = "teamId";
private static final String KEY_INFO_ATTR = "keyInfo";
private final GatewayApiKeyService apiKeyService;
private final MeterRegistry meterRegistry;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String internalKey = exchange.getRequest().getHeaders()
.getFirst(INTERNAL_KEY_HEADER);
// 健康检查接口不需要认证
if (exchange.getRequest().getPath().toString().startsWith("/actuator")) {
return chain.filter(exchange);
}
if (internalKey == null || internalKey.isBlank()) {
log.warn("Request rejected: missing API key from {}",
exchange.getRequest().getRemoteAddress());
return rejectWithUnauthorized(exchange, "Missing API key");
}
return apiKeyService.validateKey(internalKey)
.flatMap(keyInfo -> {
if (keyInfo == null) {
meterRegistry.counter("gateway.auth.failure", "reason", "invalid_key").increment();
return rejectWithUnauthorized(exchange, "Invalid API key");
}
if (keyInfo.getStatus() == KeyStatus.SUSPENDED) {
return rejectWithForbidden(exchange, "API key is suspended");
}
if (keyInfo.getExpiresAt() != null
&& keyInfo.getExpiresAt().isBefore(LocalDateTime.now())) {
return rejectWithForbidden(exchange, "API key has expired");
}
// 将Key信息注入到请求上下文
exchange.getAttributes().put(TEAM_ID_ATTR, keyInfo.getTeamName());
exchange.getAttributes().put(KEY_INFO_ATTR, keyInfo);
// 记录认证成功指标
meterRegistry.counter("gateway.auth.success",
"team", keyInfo.getTeamName()).increment();
return chain.filter(exchange);
})
.onErrorResume(e -> {
log.error("Authentication error", e);
return rejectWithUnauthorized(exchange, "Authentication failed");
});
}
private Mono<Void> rejectWithUnauthorized(ServerWebExchange exchange, String message) {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
byte[] body = ("{\"error\":\"" + message + "\"}").getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(body);
return exchange.getResponse().writeWith(Mono.just(buffer));
}
private Mono<Void> rejectWithForbidden(ServerWebExchange exchange, String message) {
exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
byte[] body = ("{\"error\":\"" + message + "\"}").getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(body);
return exchange.getResponse().writeWith(Mono.just(buffer));
}
@Override
public int getOrder() {
return -200; // 最高优先级,在其他过滤器之前执行
}
}3.3 内部Key到外部Key的映射
@Component
@Slf4j
@RequiredArgsConstructor
public class ApiKeyRewriteFilter implements GatewayFilter, Ordered {
private final ProviderKeyManager providerKeyManager;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String targetProvider = determineProvider(exchange.getRequest().getPath().toString());
// 获取该Provider的外部API Key(支持多Key轮转)
String externalKey = providerKeyManager.getNextKey(targetProvider);
// 移除内部Key头,注入外部Key
ServerHttpRequest modifiedRequest = exchange.getRequest().mutate()
.headers(headers -> {
headers.remove("X-AI-Gateway-Key");
// 根据不同Provider设置不同的认证头
switch (targetProvider) {
case "openai" -> headers.set("Authorization", "Bearer " + externalKey);
case "anthropic" -> {
headers.set("x-api-key", externalKey);
headers.set("anthropic-version", "2023-06-01");
}
}
})
.build();
return chain.filter(exchange.mutate().request(modifiedRequest).build());
}
private String determineProvider(String path) {
if (path.contains("/v1/chat/completions") || path.contains("/v1/embeddings")) {
return "openai";
} else if (path.contains("/v1/messages")) {
return "anthropic";
}
return "openai"; // 默认
}
@Override
public int getOrder() {
return -100;
}
}@Service
@Slf4j
public class ProviderKeyManager {
// 每个Provider支持多个Key轮转,避免单Key限流
private final Map<String, List<String>> providerKeys = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> keyIndexes = new ConcurrentHashMap<>();
@Value("${ai-gateway.providers.openai.api-key}")
private String openaiKey;
@PostConstruct
public void init() {
// 可以配置多个Key,轮转使用
providerKeys.put("openai", Arrays.asList(openaiKey.split(",")));
keyIndexes.put("openai", new AtomicInteger(0));
}
public String getNextKey(String provider) {
List<String> keys = providerKeys.getOrDefault(provider, Collections.emptyList());
if (keys.isEmpty()) {
throw new IllegalStateException("No API keys configured for provider: " + provider);
}
// Round-robin轮转
int index = keyIndexes.get(provider).getAndIncrement() % keys.size();
return keys.get(index).trim();
}
}四、智能路由:根据成本/可用性自动选择Provider
@Service
@Slf4j
@RequiredArgsConstructor
public class IntelligentRoutingService {
private final ProviderHealthChecker healthChecker;
private final CostOptimizer costOptimizer;
private final RedisTemplate<String, String> redisTemplate;
/**
* 根据请求特征选择最优Provider
*/
public RoutingDecision selectProvider(AiGatewayRequest request) {
// 1. 获取可用的Provider列表
List<ProviderInfo> availableProviders = healthChecker.getHealthyProviders();
if (availableProviders.isEmpty()) {
throw new ServiceUnavailableException("No available AI providers");
}
// 2. 根据策略过滤
List<ProviderInfo> candidates = filterByPolicy(request, availableProviders);
// 3. 选择最优Provider
ProviderInfo selected = selectOptimal(request, candidates);
log.debug("Routing decision: request from team={}, selected provider={}, model={}",
request.getTeamName(), selected.getProviderName(), selected.getModelName());
return RoutingDecision.builder()
.provider(selected)
.reason(buildRoutingReason(request, selected))
.build();
}
private List<ProviderInfo> filterByPolicy(
AiGatewayRequest request, List<ProviderInfo> providers) {
return providers.stream()
.filter(p -> {
// 过滤1:检查团队是否有权限使用该Provider
GatewayApiKey keyInfo = request.getKeyInfo();
if (keyInfo.getAllowedModels() != null
&& !keyInfo.getAllowedModels().isEmpty()) {
return keyInfo.getAllowedModels().contains(p.getModelName());
}
return true;
})
.filter(p -> {
// 过滤2:根据请求内容类型筛选
String contentType = request.getContentType();
if ("EMBEDDING".equals(contentType)) {
return p.supportsEmbedding();
}
if ("VISION".equals(contentType)) {
return p.supportsVision();
}
return true;
})
.collect(Collectors.toList());
}
private ProviderInfo selectOptimal(
AiGatewayRequest request, List<ProviderInfo> candidates) {
// 评分模型:根据多个维度综合打分
return candidates.stream()
.max(Comparator.comparingDouble(p -> calculateScore(request, p)))
.orElse(candidates.get(0));
}
private double calculateScore(AiGatewayRequest request, ProviderInfo provider) {
double score = 0;
// 维度1:可用性(40%权重)
double availabilityScore = provider.getSuccessRate5min() * 0.4;
// 维度2:成本(30%权重,成本越低分越高)
double estimatedCost = costOptimizer.estimateCost(
provider, request.getEstimatedInputTokens(), request.getEstimatedOutputTokens());
double costScore = (1.0 / (estimatedCost + 0.001)) * 0.01 * 0.3;
// 维度3:延迟(20%权重)
double latencyScore = (1.0 / (provider.getAvgLatencyMs() + 1)) * 1000 * 0.2;
// 维度4:优先级(10%权重)
double priorityScore = (1.0 / provider.getPriority()) * 0.1;
score = availabilityScore + costScore + latencyScore + priorityScore;
// 特殊规则:如果团队今日成本超过80%预算,强制使用便宜模型
if (isNearBudgetLimit(request.getTeamName()) && !provider.isCheapModel()) {
score = 0; // 排除贵的模型
}
return score;
}
private boolean isNearBudgetLimit(String teamName) {
String key = "daily:cost:" + teamName + ":" + LocalDate.now();
String costStr = redisTemplate.opsForValue().get(key);
if (costStr == null) return false;
// TODO: 从数据库获取该团队的每日预算上限
double dailyBudget = 100.0; // 示例:100美元/天
double currentCost = Double.parseDouble(costStr);
return currentCost / dailyBudget > 0.8;
}
}五、多维限流:Redis实现复合限流
@Component
@Slf4j
@RequiredArgsConstructor
public class MultiDimensionRateLimiter {
private static final String RATE_LIMIT_SCRIPT_KEY = "ai_gateway:rate_limit:script";
private final ReactiveRedisTemplate<String, String> redisTemplate;
private final RedisScript<List<Long>> rateLimitScript;
/**
* 复合限流检查
* 按照:用户 → 团队 → 全局 的顺序检查
*/
public Mono<RateLimitResult> checkRateLimit(RateLimitRequest request) {
return Mono.zip(
checkUserLimit(request),
checkTeamLimit(request),
checkGlobalTokenLimit(request),
checkDailyBudgetLimit(request)
).map(tuple -> {
RateLimitResult userResult = tuple.getT1();
RateLimitResult teamResult = tuple.getT2();
RateLimitResult tokenResult = tuple.getT3();
RateLimitResult budgetResult = tuple.getT4();
// 任何一个维度超限,都拒绝请求
if (!userResult.isAllowed()) return userResult;
if (!teamResult.isAllowed()) return teamResult;
if (!tokenResult.isAllowed()) return tokenResult;
if (!budgetResult.isAllowed()) return budgetResult;
return RateLimitResult.allowed();
});
}
/**
* 用户级别限流:防止单个用户占用过多资源
*/
private Mono<RateLimitResult> checkUserLimit(RateLimitRequest request) {
String key = String.format("rate:user:%s:%s",
request.getUserId(), getCurrentMinuteBucket());
return redisTemplate.opsForValue()
.increment(key)
.flatMap(count -> {
if (count == 1) {
// 第一次设置TTL
return redisTemplate.expire(key, Duration.ofMinutes(2))
.then(Mono.just(count));
}
return Mono.just(count);
})
.map(count -> {
int limit = request.getKeyInfo().getRpmLimit() != null
? request.getKeyInfo().getRpmLimit() / 10 // 用户只能用团队总量的1/10
: 10;
if (count > limit) {
log.warn("User rate limit exceeded: userId={}, count={}, limit={}",
request.getUserId(), count, limit);
return RateLimitResult.rejected("User rate limit exceeded",
"X-RateLimit-User-Limit", String.valueOf(limit));
}
return RateLimitResult.allowed();
});
}
/**
* 团队级别限流
*/
private Mono<RateLimitResult> checkTeamLimit(RateLimitRequest request) {
String key = String.format("rate:team:%s:%s",
request.getTeamName(), getCurrentMinuteBucket());
int teamRpmLimit = request.getKeyInfo().getRpmLimit() != null
? request.getKeyInfo().getRpmLimit() : 60;
return redisTemplate.opsForValue()
.increment(key)
.flatMap(count -> {
if (count == 1) {
return redisTemplate.expire(key, Duration.ofMinutes(2))
.then(Mono.just(count));
}
return Mono.just(count);
})
.map(count -> {
if (count > teamRpmLimit) {
log.warn("Team rate limit exceeded: team={}, count={}, limit={}",
request.getTeamName(), count, teamRpmLimit);
return RateLimitResult.rejected("Team RPM limit exceeded",
"X-RateLimit-Team-Limit", String.valueOf(teamRpmLimit));
}
return RateLimitResult.allowed();
});
}
/**
* Token级别限流:防止大量长文本请求
*/
private Mono<RateLimitResult> checkGlobalTokenLimit(RateLimitRequest request) {
String key = String.format("rate:tokens:%s:%s",
request.getTeamName(), getCurrentMinuteBucket());
int tpmLimit = request.getKeyInfo().getTpmLimit() != null
? request.getKeyInfo().getTpmLimit() : 100_000;
// 预估Token数(实际Token数在响应后才知道,这里用估算值做预检)
int estimatedTokens = estimateTokens(request.getRequestBody());
return redisTemplate.opsForValue()
.increment(key, estimatedTokens)
.flatMap(count -> {
if (count == estimatedTokens) { // 第一次写入
return redisTemplate.expire(key, Duration.ofMinutes(2))
.then(Mono.just(count));
}
return Mono.just(count);
})
.map(count -> {
if (count > tpmLimit) {
return RateLimitResult.rejected("Team TPM limit exceeded",
"X-RateLimit-Token-Limit", String.valueOf(tpmLimit));
}
return RateLimitResult.allowed();
});
}
/**
* 每日预算限流
*/
private Mono<RateLimitResult> checkDailyBudgetLimit(RateLimitRequest request) {
if (request.getKeyInfo().getDailyCostLimit() == null) {
return Mono.just(RateLimitResult.allowed());
}
String key = "cost:daily:" + request.getTeamName() + ":" + LocalDate.now();
return redisTemplate.opsForValue()
.get(key)
.defaultIfEmpty("0")
.map(costStr -> {
BigDecimal currentCost = new BigDecimal(costStr);
BigDecimal limit = request.getKeyInfo().getDailyCostLimit();
if (currentCost.compareTo(limit) >= 0) {
log.warn("Daily budget limit reached: team={}, cost={}, limit={}",
request.getTeamName(), currentCost, limit);
return RateLimitResult.rejected(
"Daily budget limit reached: $" + currentCost + " / $" + limit,
"X-Budget-Used", currentCost.toString());
}
return RateLimitResult.allowed();
});
}
private int estimateTokens(String requestBody) {
// 粗略估算:1 Token ≈ 4个字符(英文)或 2个字符(中文)
if (requestBody == null) return 100;
return requestBody.length() / 3 + 50; // 保守估算
}
private String getCurrentMinuteBucket() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmm"));
}
}@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RateLimitResult {
private boolean allowed;
private String rejectReason;
private String limitHeaderName;
private String limitHeaderValue;
public static RateLimitResult allowed() {
return RateLimitResult.builder().allowed(true).build();
}
public static RateLimitResult rejected(String reason, String headerName, String headerValue) {
return RateLimitResult.builder()
.allowed(false)
.rejectReason(reason)
.limitHeaderName(headerName)
.limitHeaderValue(headerValue)
.build();
}
}六、请求审计:完整记录所有AI调用
@Entity
@Table(name = "ai_audit_log", indexes = {
@Index(name = "idx_team_created", columnList = "team_name, created_at"),
@Index(name = "idx_trace_id", columnList = "trace_id"),
@Index(name = "idx_status", columnList = "status")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AiAuditLog {
@Id
private String id; // UUID
@Column(name = "trace_id", nullable = false)
private String traceId;
@Column(name = "team_name", nullable = false)
private String teamName;
@Column(name = "app_name")
private String appName;
@Column(name = "user_id")
private String userId;
// 请求信息
@Column(name = "provider", nullable = false)
private String provider;
@Column(name = "model_name")
private String modelName;
@Column(name = "request_path")
private String requestPath;
@Column(name = "request_method")
private String requestMethod;
// 内容摘要(不存完整内容,防止数据泄露)
@Column(name = "prompt_hash")
private String promptHash; // 请求内容的SHA256哈希
@Column(name = "prompt_length")
private Integer promptLength;
// Token使用量
@Column(name = "input_tokens")
private Integer inputTokens;
@Column(name = "output_tokens")
private Integer outputTokens;
@Column(name = "total_tokens")
private Integer totalTokens;
// 成本
@Column(name = "cost_usd", precision = 10, scale = 6)
private BigDecimal costUsd;
// 性能
@Column(name = "latency_ms")
private Long latencyMs;
@Column(name = "ttfb_ms") // Time to first byte
private Long ttfbMs;
// 状态
@Column(name = "status")
private String status; // SUCCESS/FAILED/RATE_LIMITED/TIMEOUT
@Column(name = "error_code")
private String errorCode;
@Column(name = "error_message")
private String errorMessage;
// 路由信息
@Column(name = "selected_provider")
private String selectedProvider;
@Column(name = "routing_reason")
private String routingReason;
// 缓存命中
@Column(name = "cache_hit")
private Boolean cacheHit = false;
@CreationTimestamp
@Column(name = "created_at")
private LocalDateTime createdAt;
}@Component
@Slf4j
@RequiredArgsConstructor
public class AuditLogFilter implements GlobalFilter, Ordered {
private final AuditLogRepository auditLogRepository;
private final MeterRegistry meterRegistry;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String traceId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
exchange.getAttributes().put("traceId", traceId);
exchange.getAttributes().put("startTime", startTime);
// 添加TraceId到响应头(便于问题追踪)
exchange.getResponse().getHeaders().add("X-Trace-Id", traceId);
return chain.filter(exchange)
.doFinally(signalType -> {
// 异步写入审计日志,不影响响应时间
Mono.fromRunnable(() -> writeAuditLog(exchange, startTime, traceId))
.subscribeOn(Schedulers.boundedElastic())
.subscribe(
null,
e -> log.error("Failed to write audit log for traceId={}", traceId, e)
);
});
}
private void writeAuditLog(ServerWebExchange exchange, long startTime, String traceId) {
try {
long latencyMs = System.currentTimeMillis() - startTime;
HttpStatus status = exchange.getResponse().getStatusCode() instanceof HttpStatus
? (HttpStatus) exchange.getResponse().getStatusCode() : HttpStatus.OK;
GatewayApiKey keyInfo = exchange.getAttribute("keyInfo");
String teamName = keyInfo != null ? keyInfo.getTeamName() : "unknown";
AiAuditLog auditLog = AiAuditLog.builder()
.id(UUID.randomUUID().toString())
.traceId(traceId)
.teamName(teamName)
.appName(keyInfo != null ? keyInfo.getAppName() : "unknown")
.provider((String) exchange.getAttribute("selectedProvider"))
.modelName((String) exchange.getAttribute("modelName"))
.requestPath(exchange.getRequest().getPath().toString())
.requestMethod(exchange.getRequest().getMethod().name())
.inputTokens((Integer) exchange.getAttribute("inputTokens"))
.outputTokens((Integer) exchange.getAttribute("outputTokens"))
.costUsd((BigDecimal) exchange.getAttribute("requestCost"))
.latencyMs(latencyMs)
.status(status.is2xxSuccessful() ? "SUCCESS" : "FAILED")
.cacheHit(Boolean.TRUE.equals(exchange.getAttribute("cacheHit")))
.build();
auditLogRepository.save(auditLog);
// 记录Prometheus指标
meterRegistry.timer("gateway.request.duration",
"team", teamName,
"provider", auditLog.getProvider() != null ? auditLog.getProvider() : "unknown",
"status", auditLog.getStatus())
.record(latencyMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("Error writing audit log", e);
}
}
@Override
public int getOrder() {
return -50;
}
}七、成本归因:精确的费用分摊
@Service
@Slf4j
@RequiredArgsConstructor
public class CostTrackingService {
private final RedisTemplate<String, String> redisTemplate;
private final AiAuditLogRepository auditLogRepository;
// 成本定价表(每1000 Token,美元)
private static final Map<String, double[]> PRICING = Map.of(
"gpt-4o", new double[]{0.005, 0.015},
"gpt-4o-mini", new double[]{0.00015, 0.0006},
"claude-3-5-sonnet",new double[]{0.003, 0.015},
"claude-3-haiku", new double[]{0.00025, 0.00125}
);
/**
* 计算并记录请求成本
*/
public BigDecimal calculateAndRecordCost(
String teamName, String modelName,
int inputTokens, int outputTokens) {
double[] pricing = PRICING.getOrDefault(modelName, new double[]{0.005, 0.015});
double inputCost = inputTokens / 1000.0 * pricing[0];
double outputCost = outputTokens / 1000.0 * pricing[1];
BigDecimal totalCost = BigDecimal.valueOf(inputCost + outputCost)
.setScale(6, RoundingMode.HALF_UP);
// 实时累加团队今日成本(Redis)
String dailyKey = "cost:daily:" + teamName + ":" + LocalDate.now();
redisTemplate.opsForValue().increment(dailyKey,
totalCost.multiply(BigDecimal.valueOf(1_000_000)).longValue());
redisTemplate.expire(dailyKey, Duration.ofDays(2));
// 累加月度成本
String monthlyKey = "cost:monthly:" + teamName + ":" +
YearMonth.now().toString();
redisTemplate.opsForValue().increment(monthlyKey,
totalCost.multiply(BigDecimal.valueOf(1_000_000)).longValue());
redisTemplate.expire(monthlyKey, Duration.ofDays(35));
return totalCost;
}
/**
* 生成月度成本报告
*/
public CostReport generateMonthlyReport(YearMonth yearMonth) {
List<TeamCostSummary> teamSummaries = auditLogRepository
.getTeamCostSummary(
yearMonth.atDay(1).atStartOfDay(),
yearMonth.atEndOfMonth().atTime(23, 59, 59));
BigDecimal totalCost = teamSummaries.stream()
.map(TeamCostSummary::getTotalCostUsd)
.reduce(BigDecimal.ZERO, BigDecimal::add);
return CostReport.builder()
.period(yearMonth.toString())
.totalCostUsd(totalCost)
.teamBreakdown(teamSummaries)
.topModels(getTopModelsByUsage(yearMonth))
.dailyTrend(getDailyTrend(yearMonth))
.build();
}
/**
* 成本预警:当团队接近月度预算时发送告警
*/
@Scheduled(cron = "0 0 * * * *") // 每小时检查一次
public void checkBudgetAlerts() {
List<GatewayApiKey> activeKeys = apiKeyRepository.findByStatus(KeyStatus.ACTIVE);
for (GatewayApiKey key : activeKeys) {
if (key.getMonthlyBudget() == null) continue;
String monthlyKey = "cost:monthly:" + key.getTeamName() + ":" +
YearMonth.now().toString();
String costStr = redisTemplate.opsForValue().get(monthlyKey);
if (costStr != null) {
BigDecimal currentCost = new BigDecimal(costStr)
.divide(BigDecimal.valueOf(1_000_000), 6, RoundingMode.HALF_UP);
double usageRatio = currentCost.doubleValue() /
key.getMonthlyBudget().doubleValue();
if (usageRatio > 0.8 && usageRatio <= 0.9) {
sendAlert(key, currentCost, "80%预算告警");
} else if (usageRatio > 0.9 && usageRatio <= 1.0) {
sendAlert(key, currentCost, "90%预算紧急告警");
} else if (usageRatio > 1.0) {
// 超预算,自动暂停
suspendKey(key, "月度预算已耗尽");
}
}
}
}
private void sendAlert(GatewayApiKey key, BigDecimal currentCost, String alertType) {
log.warn("Budget alert [{}]: team={}, currentCost=${}, budget=${}",
alertType, key.getTeamName(), currentCost, key.getMonthlyBudget());
// TODO: 发送钉钉/飞书告警
}
private void suspendKey(GatewayApiKey key, String reason) {
key.setStatus(KeyStatus.SUSPENDED);
apiKeyRepository.save(key);
log.error("API key suspended: team={}, reason={}", key.getTeamName(), reason);
}
}八、网关级语义缓存
@Service
@Slf4j
@RequiredArgsConstructor
public class SemanticCacheService {
private static final String CACHE_KEY_PREFIX = "ai:semantic:cache:";
private final EmbeddingModel embeddingModel;
private final RedisTemplate<String, String> redisTemplate;
private final ObjectMapper objectMapper;
@Value("${ai-gateway.cache.similarity-threshold:0.92}")
private double similarityThreshold;
@Value("${ai-gateway.cache.ttl-hours:24}")
private int ttlHours;
/**
* 查询语义缓存
* 不是精确匹配,而是语义相似性匹配
*/
public Optional<CachedResponse> findSimilarCache(String userPrompt, String model) {
try {
// 1. 计算当前请求的嵌入向量
float[] queryEmbedding = embeddingModel.embed(userPrompt);
// 2. 从Redis获取该模型的缓存键列表
String indexKey = CACHE_KEY_PREFIX + "index:" + model;
Set<String> cacheKeys = redisTemplate.opsForSet().members(indexKey);
if (cacheKeys == null || cacheKeys.isEmpty()) {
return Optional.empty();
}
// 3. 计算与所有缓存项的相似度(生产环境应用向量数据库替代Redis做ANN)
String bestMatchKey = null;
double bestSimilarity = 0;
for (String cacheKey : cacheKeys) {
String cachedEmbeddingStr = (String) redisTemplate.opsForHash()
.get(cacheKey, "embedding");
if (cachedEmbeddingStr == null) continue;
float[] cachedEmbedding = parseEmbedding(cachedEmbeddingStr);
double similarity = cosineSimilarity(queryEmbedding, cachedEmbedding);
if (similarity > bestSimilarity) {
bestSimilarity = similarity;
bestMatchKey = cacheKey;
}
}
// 4. 相似度超过阈值,命中缓存
if (bestSimilarity >= similarityThreshold && bestMatchKey != null) {
String responseStr = (String) redisTemplate.opsForHash()
.get(bestMatchKey, "response");
if (responseStr != null) {
log.debug("Semantic cache hit: similarity={:.3f}, key={}",
bestSimilarity, bestMatchKey);
CachedResponse response = objectMapper.readValue(
responseStr, CachedResponse.class);
response.setCacheHit(true);
response.setSimilarity(bestSimilarity);
return Optional.of(response);
}
}
return Optional.empty();
} catch (Exception e) {
log.warn("Semantic cache lookup failed", e);
return Optional.empty(); // 缓存失败不影响主流程
}
}
/**
* 写入语义缓存
*/
public void putCache(String userPrompt, String model, String response,
int inputTokens, int outputTokens) {
try {
// 只缓存较短的响应(避免缓存大量数据)
if (outputTokens > 500) {
log.debug("Response too long to cache: {} tokens", outputTokens);
return;
}
float[] embedding = embeddingModel.embed(userPrompt);
String cacheKey = CACHE_KEY_PREFIX + UUID.randomUUID();
Map<String, Object> cacheEntry = new HashMap<>();
cacheEntry.put("embedding", serializeEmbedding(embedding));
cacheEntry.put("response", response);
cacheEntry.put("prompt", userPrompt);
cacheEntry.put("model", model);
cacheEntry.put("inputTokens", inputTokens);
cacheEntry.put("outputTokens", outputTokens);
cacheEntry.put("cachedAt", LocalDateTime.now().toString());
// 转换Map值为String
Map<String, String> stringMap = cacheEntry.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().toString()
));
redisTemplate.opsForHash().putAll(cacheKey, stringMap);
redisTemplate.expire(cacheKey, Duration.ofHours(ttlHours));
// 将Key添加到索引
String indexKey = CACHE_KEY_PREFIX + "index:" + model;
redisTemplate.opsForSet().add(indexKey, cacheKey);
redisTemplate.expire(indexKey, Duration.ofHours(ttlHours + 1));
} catch (Exception e) {
log.warn("Failed to put semantic cache", e);
}
}
private double cosineSimilarity(float[] a, float[] b) {
if (a.length != b.length) return 0;
double dotProduct = 0, normA = 0, normB = 0;
for (int i = 0; i < a.length; i++) {
dotProduct += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
}
private String serializeEmbedding(float[] embedding) {
StringBuilder sb = new StringBuilder("[");
for (int i = 0; i < embedding.length; i++) {
if (i > 0) sb.append(",");
sb.append(embedding[i]);
}
sb.append("]");
return sb.toString();
}
private float[] parseEmbedding(String str) {
String[] parts = str.substring(1, str.length() - 1).split(",");
float[] result = new float[parts.length];
for (int i = 0; i < parts.length; i++) {
result[i] = Float.parseFloat(parts[i].trim());
}
return result;
}
}九、监控大盘:Grafana关键指标
9.1 Prometheus指标定义
@Configuration
public class GatewayMetricsConfiguration {
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCustomizer() {
return registry -> {
// 注册自定义指标
Gauge.builder("gateway.active.keys",
apiKeyRepository, repo -> repo.countByStatus(KeyStatus.ACTIVE))
.description("Number of active API keys")
.register(registry);
};
}
}
// 在各处埋点的完整指标列表:
// gateway_request_total{team, provider, model, status}
// gateway_request_duration_seconds{team, provider, model}
// gateway_tokens_total{team, provider, model, type="input|output"}
// gateway_cost_usd_total{team, provider, model}
// gateway_cache_hit_total{model}
// gateway_rate_limit_total{team, dimension="user|team|token|budget"}
// gateway_provider_availability{provider} 0 or 1
// gateway_active_keys9.2 Grafana Dashboard配置
{
"dashboard": {
"title": "AI Gateway Overview",
"panels": [
{
"title": "总请求量(每分钟)",
"type": "timeseries",
"targets": [{
"expr": "sum(rate(gateway_request_total[1m])) by (team)"
}]
},
{
"title": "各团队今日成本(美元)",
"type": "bargauge",
"targets": [{
"expr": "sum(increase(gateway_cost_usd_total[1d])) by (team)"
}]
},
{
"title": "缓存命中率",
"type": "stat",
"targets": [{
"expr": "sum(rate(gateway_cache_hit_total[5m])) / sum(rate(gateway_request_total[5m])) * 100"
}]
},
{
"title": "P99延迟(毫秒)",
"type": "timeseries",
"targets": [{
"expr": "histogram_quantile(0.99, rate(gateway_request_duration_seconds_bucket[5m])) * 1000"
}]
},
{
"title": "限流触发次数",
"type": "timeseries",
"targets": [{
"expr": "sum(rate(gateway_rate_limit_total[5m])) by (team, dimension)"
}]
},
{
"title": "Provider可用性",
"type": "stat",
"targets": [{
"expr": "gateway_provider_availability"
}]
}
]
}
}十、性能数据与改造效果
10.1 林峰团队的改造效果
| 指标 | 改造前 | 改造后6个月 |
|---|---|---|
| 月均AI成本 | 47万元 | 26万元(-44%) |
| 成本归因准确率 | 0% | 100% |
| 每月Key泄露事件 | 未知(无审计) | 0次 |
| 限流保护覆盖 | 0% | 100% |
| 重复调用节省 | 0% | 23%(缓存) |
| 审计记录完整性 | 0% | 100% |
10.2 网关性能基准
网关额外延迟:
- 认证:< 2ms(Redis缓存)
- 限流检查:< 5ms(Redis原子操作)
- 语义缓存查询:< 30ms(本地向量计算)
- 审计日志写入:0ms(异步写入)
网关吞吐量(8核16G):
- 无缓存场景:5000 req/s
- 缓存命中场景:20000 req/s
可用性:99.99%(双活部署)FAQ
Q1:AI网关和普通API网关有什么区别?
A:普通API网关处理的是通用HTTP请求;AI网关需要额外处理Token计量、语义缓存(非精确匹配)、流式响应审计、跨Provider路由等AI特有场景。可以在Spring Cloud Gateway基础上扩展,也可以使用专门的AI网关产品(如LiteLLM)。
Q2:语义缓存的命中率一般是多少?
A:取决于业务场景。客服类场景(高重复询问)命中率可以达到40-60%;创意生成类(每次不同)命中率通常只有5-15%。整体建议先从相似度阈值0.95开始调,避免给出错误的缓存结果。
Q3:如何处理流式输出(Server-Sent Events)的审计?
A:流式输出的Token数要在流结束后才能确定。可以在Filter中收集流式响应的所有chunk,在流结束后统一计算Token和成本,写入审计日志。注意响应时间记录要用第一个chunk到达时间(TTFB)而非全部内容接收完成。
Q4:AI网关的高可用如何做?
A:至少部署2个实例,前面挂Nginx/LB。限流状态用Redis集群存储,确保多实例共享限流计数。审计日志写入可以先写消息队列,再由消费者写DB,避免DB故障影响主流程。
Q5:开源的AI网关有哪些可以参考?
A:LiteLLM是最成熟的开源AI网关,用Python实现,支持100+Provider。如果团队是Java技术栈,建议基于Spring Cloud Gateway自建,可控性更高。
