第1977篇:AI网关的国产替代——自建模型调度层替换OpenAI客户端的方案
第1977篇:AI网关的国产替代——自建模型调度层替换OpenAI客户端的方案
我们团队去年年初做了一件事:把系统里所有 OpenAI SDK 的调用,全部替换成自建 AI 网关的调用。整个过程花了两个礼拜,替换完之后,模型切换、A/B 测试、成本监控这些事情一下子好做了很多。今天把这套方案系统地讲一遍。
为什么要自建 AI 网关
直接用 OpenAI SDK 或者各家厂商 SDK 有几个问题:
问题一:多模型管理混乱。 一旦用了三家以上的模型,各种 SDK 各自配置,代码里到处是 if(useGPT4) else if(useQwen),维护噩梦。
问题二:切换模型改动大。 产品要求把某个功能从 GPT-4 换成通义千问,需要改好几处代码,还要改配置,测试一遍才能上线。
问题三:没有统一监控。 各家 SDK 的指标格式不同,想知道各个模型的调用量、延迟、成本,要拼好几个数据源。
问题四:限流重试各自为政。 每个 SDK 自己处理,没法做统一的限流策略,也没法在不同模型之间智能切换。
自建 AI 网关解决的正是这些问题:对外暴露统一的 OpenAI 兼容接口,对内灵活调度各家模型。
网关的核心设计
调用方只需要知道网关的地址和一个统一的 API Key,完全不感知底层用的是哪家模型。
核心代码实现
用 Spring Boot 构建,基本框架:
@SpringBootApplication
public class AIGatewayApplication {
public static void main(String[] args) {
SpringApplication.run(AIGatewayApplication.class, args);
}
}
// 对外暴露 OpenAI 兼容接口
@RestController
@RequestMapping("/v1")
@Slf4j
public class OpenAICompatibleController {
@Autowired
private GatewayRoutingService routingService;
@Autowired
private AuthService authService;
/**
* Chat Completions 接口
*/
@PostMapping("/chat/completions")
public ResponseEntity<?> chatCompletions(
@RequestHeader("Authorization") String authHeader,
@RequestBody ChatCompletionRequest request,
HttpServletRequest httpRequest
) {
// 1. 认证
GatewayUser user = authService.authenticate(authHeader);
// 2. 请求增强(注入默认参数、配额检查等)
ChatCompletionRequest enriched = routingService.enrichRequest(request, user);
// 3. 路由到具体模型
if (Boolean.TRUE.equals(request.getStream())) {
// 流式响应
Flux<String> stream = routingService.streamChat(enriched, user);
return ResponseEntity.ok()
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.body(stream);
} else {
// 同步响应
ChatCompletionResponse response = routingService.chat(enriched, user);
return ResponseEntity.ok(response);
}
}
/**
* Embeddings 接口
*/
@PostMapping("/embeddings")
public EmbeddingResponse embeddings(
@RequestHeader("Authorization") String authHeader,
@RequestBody EmbeddingRequest request
) {
GatewayUser user = authService.authenticate(authHeader);
return routingService.embed(request, user);
}
/**
* 模型列表(方便调用方查询可用模型)
*/
@GetMapping("/models")
public ModelsResponse listModels(
@RequestHeader("Authorization") String authHeader
) {
GatewayUser user = authService.authenticate(authHeader);
return routingService.listAvailableModels(user);
}
}路由引擎:核心逻辑
路由引擎是网关的心脏,决定每个请求去哪个模型:
@Service
@Slf4j
public class GatewayRoutingService {
@Autowired
private ModelRegistry modelRegistry;
@Autowired
private RoutingRuleEngine ruleEngine;
@Autowired
private ModelBackendPool backendPool;
@Autowired
private SemanticCacheService cacheService;
public ChatCompletionResponse chat(ChatCompletionRequest request, GatewayUser user) {
// 检查语义缓存
Optional<ChatCompletionResponse> cached = cacheService.get(request);
if (cached.isPresent()) {
log.info("Cache hit for user: {}", user.getId());
return cached.get();
}
// 路由决策
RoutingResult routing = ruleEngine.route(request, user);
log.info("Request routed to: {} (reason: {})", routing.getModel(), routing.getReason());
// 实际调用
ModelBackend backend = backendPool.getBackend(routing.getModel());
try {
ChatCompletionResponse response = backend.chat(
adaptRequest(request, routing)
);
// 写缓存
if (routing.isCacheable()) {
cacheService.put(request, response);
}
return response;
} catch (ModelBackendException e) {
// 触发故障转移
return handleFailover(request, user, routing, e);
}
}
private ChatCompletionResponse handleFailover(
ChatCompletionRequest request,
GatewayUser user,
RoutingResult originalRouting,
Exception cause) {
log.warn("Primary model {} failed, attempting failover", originalRouting.getModel(), cause);
// 获取备用模型
List<String> fallbackModels = ruleEngine.getFallbackModels(originalRouting.getModel());
for (String fallbackModel : fallbackModels) {
try {
ModelBackend backend = backendPool.getBackend(fallbackModel);
log.info("Failover to: {}", fallbackModel);
return backend.chat(adaptRequest(request, RoutingResult.of(fallbackModel)));
} catch (Exception e) {
log.warn("Fallback model {} also failed", fallbackModel, e);
}
}
throw new AllModelsUnavailableException("所有可用模型均不可达");
}
}路由规则引擎(支持灵活配置):
@Component
public class RoutingRuleEngine {
@Autowired
private RoutingRulesConfig rulesConfig;
public RoutingResult route(ChatCompletionRequest request, GatewayUser user) {
String requestedModel = request.getModel();
// 1. 查找模型映射(允许用别名,比如 "gpt-4" 映射到 "qwen-max")
String actualModel = rulesConfig.getModelMapping()
.getOrDefault(requestedModel, requestedModel);
// 2. 用户级别路由(VIP 用户用更好的模型)
if (user.isPremium() && rulesConfig.hasPremiumOverride(actualModel)) {
actualModel = rulesConfig.getPremiumModel(actualModel);
}
// 3. 功能路由(某些功能强制用特定模型)
String feature = (String) request.getMetadata().getOrDefault("feature", "");
if (!feature.isEmpty() && rulesConfig.hasFeatureBinding(feature)) {
actualModel = rulesConfig.getFeatureModel(feature);
return RoutingResult.of(actualModel, "feature-binding: " + feature);
}
// 4. A/B 测试路由
Optional<ABTestConfig> abTest = rulesConfig.findActiveABTest(actualModel);
if (abTest.isPresent()) {
actualModel = abTest.get().selectModel(user.getId());
return RoutingResult.of(actualModel, "ab-test: " + abTest.get().getName());
}
return RoutingResult.of(actualModel, "default-routing");
}
public List<String> getFallbackModels(String primaryModel) {
return rulesConfig.getFallbackChain().getOrDefault(
primaryModel,
List.of("qwen-plus", "qwen-turbo") // 默认降级链
);
}
}路由规则的 YAML 配置:
ai-gateway:
routing:
# 模型别名映射
model-mappings:
"gpt-4": "qwen-max"
"gpt-4-turbo": "qwen-max"
"gpt-3.5-turbo": "qwen-plus"
"claude-3-opus": "deepseek-reasoner"
"text-embedding-ada-002": "text-embedding-v3"
# 功能绑定
feature-bindings:
"code-review": "qwen-coder-plus"
"translation": "qwen-max"
"customer-service": "qwen-plus"
"complex-analysis": "deepseek-reasoner"
# 降级链
fallback-chains:
"qwen-max": ["qwen-plus", "qwen-turbo"]
"deepseek-reasoner": ["qwen-max", "qwen-plus"]
"qwen-coder-plus": ["qwen-plus"]
# A/B 测试配置
ab-tests:
- name: "qwen-vs-deepseek-coding"
active: true
target-model: "code-review"
variants:
- model: "qwen-coder-plus"
weight: 70
- model: "deepseek-chat"
weight: 30模型后端适配器
每家模型的接口细节不同,需要适配器层:
public interface ModelBackend {
ChatCompletionResponse chat(ChatCompletionRequest request);
Flux<String> streamChat(ChatCompletionRequest request);
String getName();
boolean isHealthy();
}
// 阿里云 DashScope 适配器
@Component("dashscope")
public class DashScopeBackend implements ModelBackend {
@Autowired
private DashScopeChatModel dashScopeChatModel;
@Override
public ChatCompletionResponse chat(ChatCompletionRequest request) {
// 将通用格式转换为 DashScope 格式
List<Message> messages = request.getMessages().stream()
.map(this::convertMessage)
.toList();
Prompt prompt = new Prompt(messages,
DashScopeChatOptions.builder()
.withModel(request.getModel())
.withTemperature(request.getTemperature() != null
? request.getTemperature().floatValue() : 0.7f)
.withMaxToken(request.getMaxTokens())
.build()
);
ChatResponse response = dashScopeChatModel.call(prompt);
// 转换回通用格式
return convertToOpenAIFormat(response, request.getModel());
}
@Override
public Flux<String> streamChat(ChatCompletionRequest request) {
// 流式处理,转换为 SSE 格式的字符串
Prompt prompt = buildPrompt(request);
return dashScopeChatModel.stream(prompt)
.map(response -> {
String content = response.getResult().getOutput().getContent();
return buildSSEChunk(content, request.getModel());
});
}
private String buildSSEChunk(String content, String model) {
// 构建 OpenAI 格式的 SSE chunk
return "data: " + objectMapper.writeValueAsString(Map.of(
"id", "chatcmpl-" + UUID.randomUUID(),
"object", "chat.completion.chunk",
"model", model,
"choices", List.of(Map.of(
"delta", Map.of("content", content),
"index", 0
))
)) + "\n\n";
}
@Override
public boolean isHealthy() {
try {
// 发送一个轻量 ping
chat(ChatCompletionRequest.simple("ping", "qwen-turbo"));
return true;
} catch (Exception e) {
return false;
}
}
}
// DeepSeek 适配器
@Component("deepseek")
public class DeepSeekBackend implements ModelBackend {
private final OpenAiChatModel deepSeekModel;
@Autowired
public DeepSeekBackend(
@Value("${deepseek.api-key}") String apiKey,
@Value("${deepseek.base-url:https://api.deepseek.com}") String baseUrl
) {
OpenAiApi api = OpenAiApi.builder()
.baseUrl(baseUrl)
.apiKey(apiKey)
.build();
this.deepSeekModel = new OpenAiChatModel(api);
}
// 实现类似 DashScope,略...
}认证与多租户
企业里多个团队共用 AI 网关,需要隔离:
@Service
public class AuthService {
@Autowired
private ApiKeyRepository apiKeyRepository;
public GatewayUser authenticate(String authHeader) {
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
throw new UnauthorizedException("缺少认证信息");
}
String apiKey = authHeader.substring(7);
ApiKeyEntity entity = apiKeyRepository.findByKey(apiKey)
.orElseThrow(() -> new UnauthorizedException("无效的 API Key"));
if (!entity.isActive()) {
throw new UnauthorizedException("API Key 已被禁用");
}
return GatewayUser.builder()
.id(entity.getUserId())
.team(entity.getTeam())
.isPremium(entity.isPremium())
.quotaPerDay(entity.getDailyQuota())
.allowedModels(entity.getAllowedModels())
.build();
}
}
@Service
public class QuotaService {
@Autowired
private RedisTemplate<String, Long> redisTemplate;
public void checkAndDecrementQuota(GatewayUser user, int estimatedTokens) {
String key = "quota:" + user.getId() + ":" + LocalDate.now();
Long used = redisTemplate.opsForValue().increment(key, estimatedTokens);
redisTemplate.expire(key, Duration.ofDays(2)); // 保留2天用于对账
if (used != null && used > user.getQuotaPerDay()) {
throw new QuotaExceededException(
"今日 token 配额已用完,已用: " + used + ",限额: " + user.getQuotaPerDay()
);
}
}
}监控面板:知道每个模型在干什么
@Component
@Slf4j
public class GatewayMetricsCollector {
@Autowired
private MeterRegistry meterRegistry;
public void recordRequest(
String userId, String team,
String requestedModel, String actualModel,
long latencyMs, int inputTokens, int outputTokens,
boolean isCache, boolean isSuccess
) {
Tags tags = Tags.of(
"user", userId,
"team", team,
"requested_model", requestedModel,
"actual_model", actualModel,
"cache_hit", String.valueOf(isCache),
"success", String.valueOf(isSuccess)
);
// 延迟分布
meterRegistry.timer("gateway.request.latency", tags)
.record(latencyMs, TimeUnit.MILLISECONDS);
// Token 消耗
meterRegistry.counter("gateway.tokens.input", tags)
.increment(inputTokens);
meterRegistry.counter("gateway.tokens.output", tags)
.increment(outputTokens);
// 请求计数
meterRegistry.counter("gateway.requests.total", tags)
.increment();
// 估算成本(简化版,实际需要各模型实时价格)
double cost = estimateCost(actualModel, inputTokens, outputTokens);
meterRegistry.gauge("gateway.estimated.cost",
Tags.of("team", team, "model", actualModel),
cost);
}
private double estimateCost(String model, int inputTokens, int outputTokens) {
// 简化的价格表,实际应从配置中心读取
Map<String, double[]> prices = Map.of(
"qwen-max", new double[]{0.04, 0.12}, // [input/1K, output/1K] 元
"qwen-plus", new double[]{0.004, 0.012},
"qwen-turbo", new double[]{0.0003, 0.0006},
"deepseek-reasoner", new double[]{0.004, 0.016}
);
double[] price = prices.getOrDefault(model, new double[]{0.01, 0.03});
return (inputTokens / 1000.0 * price[0]) + (outputTokens / 1000.0 * price[1]);
}
}配合 Grafana Dashboard,可以实时看到:
- 各模型的调用量和成本分布
- 各团队的 token 消耗趋势
- 缓存命中率(直接反映省了多少钱)
- P50/P95/P99 延迟
A/B 测试:安全地验证新模型
网关做 A/B 测试很自然,不需要改任何业务代码:
@Component
public class ABTestService {
@Autowired
private ABTestRepository testRepository;
/**
* 根据用户 ID 和 AB 测试配置,选择模型
* 使用一致性哈希,保证同一用户每次进同一个桶
*/
public String selectVariant(ABTestConfig config, String userId) {
// 一致性哈希保证分组稳定
int hash = Math.abs((userId + config.getName()).hashCode()) % 100;
int cumulativeWeight = 0;
for (ABTestConfig.Variant variant : config.getVariants()) {
cumulativeWeight += variant.getWeight();
if (hash < cumulativeWeight) {
return variant.getModel();
}
}
return config.getVariants().get(0).getModel(); // 默认返回第一个
}
/**
* 记录 AB 测试结果(用于后续分析)
*/
public void recordResult(String testName, String model, String userId,
long latencyMs, int tokens, boolean userSatisfied) {
testRepository.save(ABTestResult.builder()
.testName(testName)
.model(model)
.userId(userId)
.latencyMs(latencyMs)
.tokensUsed(tokens)
.userSatisfied(userSatisfied)
.timestamp(Instant.now())
.build());
}
}接入方的零改动迁移
这是自建网关最核心的价值:接入方代码完全不用改。
原来直接用 OpenAI:
spring:
ai:
openai:
api-key: sk-openai-xxxxx
base-url: https://api.openai.com换成自建网关:
spring:
ai:
openai:
api-key: gw-your-team-key # 网关分配的 key
base-url: http://your-gateway-server:8080就这一行地址改动,业务代码完全不动,但底层可以灵活调度任意模型。
网关的高可用部署
生产环境的网关不能单点:
# docker-compose.yml(简化版)
version: '3.8'
services:
gateway-1:
image: ai-gateway:latest
ports:
- "8081:8080"
environment:
- INSTANCE_ID=gateway-1
- REDIS_URL=redis://redis:6379
gateway-2:
image: ai-gateway:latest
ports:
- "8082:8080"
environment:
- INSTANCE_ID=gateway-2
- REDIS_URL=redis://redis:6379
nginx:
image: nginx:latest
ports:
- "8080:80"
# 负载均衡到两个网关实例
redis:
image: redis:7
# 共享缓存和限流计数器小结
自建 AI 网关是规模化使用国产 AI 模型的必经之路。一旦你的系统里有三家以上模型,或者有多个团队共用 AI 能力,网关的价值就完全显现出来了。
最大的好处不是技术上的,而是组织上的:业务团队不需要关心用哪家模型,平台团队可以灵活切换,成本和监控统一管理。
