第2112篇:AI网关与模型路由——企业级LLM请求调度的工程实践
2026/4/30大约 11 分钟
第2112篇:AI网关与模型路由——企业级LLM请求调度的工程实践
适读人群:管理多个LLM服务的平台工程师 | 阅读时长:约20分钟 | 核心价值:掌握AI网关的核心功能设计,实现智能模型路由、成本控制和故障转移
当一个企业同时在用GPT-4o、Claude 3.5 Sonnet、本地Llama,同时还有几十个业务团队都在调LLM,就会产生一个新问题:谁来统一管理这些调用?
没有统一管理会出现什么情况:
- 成本爆炸,不知道哪个团队用了多少
- 某个模型挂了,依赖它的所有业务都挂
- 不同团队用同样的功能,但Prompt质量参差不齐
- 审计和合规没法做,不知道发了什么内容出去
AI网关就是解决这些问题的。这篇文章从工程师视角,把AI网关的核心模块拆开讲清楚。
AI网关的核心功能
/**
* AI网关需要解决的问题
*
* 1. 统一入口
* 所有团队的LLM请求都通过同一个网关
* 方便管控和审计
*
* 2. 智能路由
* 根据请求类型、成本、延迟需求选择最合适的模型
* 简单问题 → 便宜小模型
* 复杂问题 → 强力大模型
*
* 3. 成本管控
* 按团队/用户设置预算
* 超额限流或告警
*
* 4. 故障转移
* 主模型挂了自动切备用
* 多个服务商互为备用
*
* 5. 请求缓存
* 相同或相似的请求不重复调
*
* 6. 安全拦截
* 注入攻击检测
* 敏感信息过滤
*
* 7. 可观测性
* 每次调用的延迟、token、费用全部记录
*/网关架构设计
/**
* AI网关请求处理流水线
*
* 请求进来 → 认证 → 预处理 → 路由选择 → 调用LLM → 后处理 → 返回
*
* 每个阶段都是可插拔的Middleware
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class AiGateway {
private final List<GatewayMiddleware> middlewares;
private final ModelRouter router;
private final LlmClientFactory clientFactory;
private final GatewayMetricsService metrics;
/**
* 处理LLM请求
*/
public GatewayResponse process(GatewayRequest request) {
String requestId = UUID.randomUUID().toString();
MDC.put("requestId", requestId);
long startTime = System.currentTimeMillis();
GatewayContext ctx = GatewayContext.builder()
.requestId(requestId)
.request(request)
.startTimeMs(startTime)
.build();
try {
// 1. 执行前置Middleware(认证、限流、安全检查等)
for (GatewayMiddleware middleware : middlewares) {
if (!middleware.supports(ctx)) continue;
MiddlewareResult result = middleware.processRequest(ctx);
if (result.isBlocked()) {
metrics.recordBlocked(request.getTeamId(), middleware.getName());
return GatewayResponse.blocked(result.getBlockReason());
}
}
// 2. 路由选择
ModelRoute route = router.route(ctx);
ctx.setSelectedRoute(route);
log.debug("路由选择: requestId={}, model={}, provider={}",
requestId, route.getModelId(), route.getProvider());
// 3. 调用LLM(含重试和故障转移)
LlmResponse llmResponse = callWithFallback(ctx, route);
ctx.setLlmResponse(llmResponse);
// 4. 执行后置Middleware(输出过滤、成本记录等)
for (GatewayMiddleware middleware : middlewares) {
if (!middleware.supports(ctx)) continue;
middleware.processResponse(ctx);
}
long duration = System.currentTimeMillis() - startTime;
metrics.recordSuccess(request.getTeamId(), route.getModelId(),
llmResponse.getTokensUsed(), duration);
return GatewayResponse.success(llmResponse.getContent(), ctx);
} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;
metrics.recordError(request.getTeamId(), e.getClass().getSimpleName(), duration);
log.error("网关处理失败: requestId={}", requestId, e);
return GatewayResponse.error("服务暂时不可用");
} finally {
MDC.remove("requestId");
}
}
/**
* 带故障转移的LLM调用
*/
private LlmResponse callWithFallback(GatewayContext ctx, ModelRoute primaryRoute) {
List<ModelRoute> candidates = new ArrayList<>();
candidates.add(primaryRoute);
candidates.addAll(primaryRoute.getFallbackRoutes());
Exception lastException = null;
for (ModelRoute route : candidates) {
try {
LlmClient client = clientFactory.getClient(route.getProvider());
return client.call(ctx.getRequest(), route);
} catch (Exception e) {
lastException = e;
log.warn("模型调用失败,尝试下一个: model={}, error={}",
route.getModelId(), e.getMessage());
// 记录故障
ctx.addFallbackAttempt(route.getModelId(), e.getMessage());
}
}
throw new RuntimeException("所有模型都调用失败", lastException);
}
// 接口和数据类定义
public interface GatewayMiddleware {
String getName();
boolean supports(GatewayContext ctx);
MiddlewareResult processRequest(GatewayContext ctx);
default void processResponse(GatewayContext ctx) {}
}
public record MiddlewareResult(boolean isBlocked, String blockReason) {
public static MiddlewareResult pass() { return new MiddlewareResult(false, null); }
public static MiddlewareResult block(String reason) { return new MiddlewareResult(true, reason); }
}
}智能模型路由
/**
* 模型路由器
*
* 根据请求特征自动选择最合适的模型
* 核心逻辑:用便宜的能做好就不用贵的
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ModelRouter {
private final ModelRegistryService registry;
/**
* 路由规则(优先级从高到低)
*
* 1. 显式指定:请求里明确指定了模型,直接用
* 2. 任务类型路由:根据任务类型(代码/分析/闲聊)路由
* 3. 成本优化路由:在满足质量要求的前提下用最便宜的
* 4. 默认模型:兜底
*/
public ModelRoute route(GatewayContext ctx) {
GatewayRequest request = ctx.getRequest();
// Rule 1: 明确指定了模型
if (request.getPreferredModel() != null) {
ModelInfo model = registry.findModel(request.getPreferredModel());
if (model != null && model.isAvailable()) {
return buildRoute(model, "explicit_preference");
}
}
// Rule 2: 按任务类型路由
TaskType taskType = classifyTask(request);
ModelInfo taskModel = routeByTaskType(taskType, request);
if (taskModel != null) {
return buildRoute(taskModel, "task_type_routing:" + taskType);
}
// Rule 3: 成本优化(用能完成任务的最便宜模型)
ModelInfo costOptimal = routeByCostOptimization(request);
if (costOptimal != null) {
return buildRoute(costOptimal, "cost_optimization");
}
// Rule 4: 默认模型
ModelInfo defaultModel = registry.getDefaultModel();
return buildRoute(defaultModel, "default");
}
/**
* 任务分类
*
* 这里用启发式规则,也可以用小分类模型来做(更准确)
*/
private TaskType classifyTask(GatewayRequest request) {
String input = request.getUserMessage().toLowerCase();
// 代码相关
if (containsAny(input, "代码", "函数", "bug", "compile", "syntax", "program",
"class", "method", "algorithm")) {
return TaskType.CODE;
}
// 数据分析
if (containsAny(input, "分析", "统计", "数据", "计算", "excel", "sql", "图表")) {
return TaskType.ANALYSIS;
}
// 长文档处理(输入超过3000字)
if (request.getUserMessage().length() > 3000 ||
(request.getContextDocuments() != null && !request.getContextDocuments().isEmpty())) {
return TaskType.LONG_DOCUMENT;
}
// 创意写作
if (containsAny(input, "写作", "创意", "故事", "文章", "写一篇")) {
return TaskType.CREATIVE;
}
return TaskType.GENERAL;
}
private ModelInfo routeByTaskType(TaskType taskType, GatewayRequest request) {
// 各任务类型的推荐模型
String preferredModelId = switch (taskType) {
case CODE -> "claude-3-5-sonnet"; // Claude代码能力强
case ANALYSIS -> "gpt-4o"; // GPT-4o分析能力强
case LONG_DOCUMENT -> "claude-3-opus"; // Claude上下文窗口大
case CREATIVE -> "gpt-4o"; // GPT-4o创意写作好
case GENERAL -> null; // 不指定,走成本优化
};
if (preferredModelId == null) return null;
ModelInfo model = registry.findModel(preferredModelId);
return (model != null && model.isAvailable()) ? model : null;
}
/**
* 成本优化路由
*
* 策略:
* - 短问题(<200字)用便宜小模型
* - 中等问题(200-1000字)用中档模型
* - 长/复杂问题用强模型
*/
private ModelInfo routeByCostOptimization(GatewayRequest request) {
int inputLength = request.getUserMessage().length();
// 质量等级
QualityTier requiredTier = request.getQualityTier() != null ?
request.getQualityTier() : QualityTier.STANDARD;
if (requiredTier == QualityTier.ECONOMY ||
(requiredTier == QualityTier.STANDARD && inputLength < 200)) {
// 小模型:GPT-4o-mini、Claude Haiku
return registry.findCheapestAvailableModel(ModelTier.SMALL);
}
if (inputLength < 1000) {
return registry.findCheapestAvailableModel(ModelTier.MEDIUM);
}
return registry.findCheapestAvailableModel(ModelTier.LARGE);
}
private ModelRoute buildRoute(ModelInfo model, String routingReason) {
List<ModelRoute> fallbacks = registry.getFallbackModels(model.getModelId()).stream()
.map(m -> ModelRoute.builder()
.modelId(m.getModelId())
.provider(m.getProvider())
.build())
.toList();
return ModelRoute.builder()
.modelId(model.getModelId())
.provider(model.getProvider())
.routingReason(routingReason)
.fallbackRoutes(fallbacks)
.build();
}
private boolean containsAny(String text, String... keywords) {
for (String kw : keywords) {
if (text.contains(kw)) return true;
}
return false;
}
public enum TaskType { CODE, ANALYSIS, LONG_DOCUMENT, CREATIVE, GENERAL }
public enum QualityTier { ECONOMY, STANDARD, PREMIUM }
public enum ModelTier { SMALL, MEDIUM, LARGE }
}成本控制Middleware
/**
* 成本控制Middleware
*
* 按团队/用户的预算配额管理
* 超额后限流或降级到便宜模型
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class CostControlMiddleware implements AiGateway.GatewayMiddleware {
private final BudgetService budgetService;
private final TokenEstimator tokenEstimator;
@Override
public String getName() { return "cost_control"; }
@Override
public boolean supports(GatewayContext ctx) { return true; }
@Override
public AiGateway.MiddlewareResult processRequest(GatewayContext ctx) {
String teamId = ctx.getRequest().getTeamId();
// 1. 检查团队预算状态
BudgetStatus status = budgetService.getBudgetStatus(teamId);
if (status.isBlocked()) {
log.warn("团队预算超限,请求被阻断: teamId={}, spent={}, limit={}",
teamId, status.getSpentAmount(), status.getLimitAmount());
return AiGateway.MiddlewareResult.block(
"团队本月预算已用完,请联系管理员提升配额");
}
// 2. 估算本次请求的token消耗
int estimatedTokens = tokenEstimator.estimateTokens(
ctx.getRequest().getUserMessage(),
ctx.getRequest().getSystemPrompt()
);
double estimatedCost = calculateCost(estimatedTokens, ctx.getRequest().getPreferredModel());
// 3. 如果预算不足,但还有余额,降级到便宜模型
if (status.getAvailableAmount() < estimatedCost && status.getAvailableAmount() > 0.001) {
log.info("预算不足,降级到经济模式: teamId={}, available={}, estimated={}",
teamId, status.getAvailableAmount(), estimatedCost);
// 强制使用经济模型
ctx.getRequest().setQualityTier(ModelRouter.QualityTier.ECONOMY);
ctx.addNote("cost_downgraded", "true");
}
// 4. 记录预估消耗(请求完成后会更新实际消耗)
ctx.setEstimatedCost(estimatedCost);
return AiGateway.MiddlewareResult.pass();
}
@Override
public void processResponse(GatewayContext ctx) {
if (ctx.getLlmResponse() == null) return;
String teamId = ctx.getRequest().getTeamId();
int actualTokens = ctx.getLlmResponse().getTokensUsed();
String modelId = ctx.getSelectedRoute().getModelId();
// 记录实际成本
double actualCost = calculateCost(actualTokens, modelId);
budgetService.recordUsage(teamId, actualCost, actualTokens, modelId);
// 如果接近预算上限,发送告警
BudgetStatus afterStatus = budgetService.getBudgetStatus(teamId);
if (afterStatus.getUsagePercent() >= 0.8 && !afterStatus.isAlertSent()) {
budgetService.sendBudgetAlert(teamId, afterStatus.getUsagePercent());
}
}
private double calculateCost(int tokens, String modelId) {
// 各模型的定价(每1000 tokens的价格,美元)
Map<String, Double> pricing = Map.of(
"gpt-4o", 0.005,
"gpt-4o-mini", 0.00015,
"claude-3-5-sonnet", 0.003,
"claude-3-haiku", 0.00025
);
double pricePerK = pricing.getOrDefault(modelId, 0.003);
return tokens / 1000.0 * pricePerK;
}
}限流Middleware
/**
* 限流Middleware
*
* 多维度限流:
* 1. 全局限流:保护下游LLM API
* 2. 团队限流:防止单个团队打满配额
* 3. 用户限流:防止单个用户滥用
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class RateLimitMiddleware implements AiGateway.GatewayMiddleware {
private final RedisTemplate<String, String> redisTemplate;
// 限流配置
private static final int GLOBAL_RPM = 1000; // 全局每分钟请求数
private static final int TEAM_RPM = 200; // 团队每分钟请求数
private static final int USER_RPM = 20; // 用户每分钟请求数
@Override
public String getName() { return "rate_limit"; }
@Override
public boolean supports(GatewayContext ctx) { return true; }
@Override
public AiGateway.MiddlewareResult processRequest(GatewayContext ctx) {
String teamId = ctx.getRequest().getTeamId();
String userId = ctx.getRequest().getUserId();
// 1. 全局限流
if (!checkRateLimit("global", GLOBAL_RPM)) {
log.warn("全局限流触发");
return AiGateway.MiddlewareResult.block("服务繁忙,请稍后重试");
}
// 2. 团队限流
if (!checkRateLimit("team:" + teamId, TEAM_RPM)) {
log.warn("团队限流触发: teamId={}", teamId);
return AiGateway.MiddlewareResult.block("团队请求频率超限,请稍后重试");
}
// 3. 用户限流
if (userId != null && !checkRateLimit("user:" + userId, USER_RPM)) {
log.warn("用户限流触发: userId={}", userId);
return AiGateway.MiddlewareResult.block("请求太频繁,请稍后重试");
}
return AiGateway.MiddlewareResult.pass();
}
/**
* 滑动窗口限流
*
* 比固定窗口更平滑,不会出现窗口切换时的请求突增
*/
private boolean checkRateLimit(String key, int maxRequests) {
String redisKey = "ratelimit:" + key;
long now = System.currentTimeMillis();
long windowStart = now - 60000; // 1分钟窗口
// 使用Redis Sorted Set实现滑动窗口
// Score = 时间戳,Member = 请求ID
redisTemplate.opsForZSet().removeRangeByScore(redisKey, 0, windowStart);
Long count = redisTemplate.opsForZSet().zCard(redisKey);
if (count != null && count >= maxRequests) {
return false;
}
// 记录本次请求
redisTemplate.opsForZSet().add(redisKey, UUID.randomUUID().toString(), now);
redisTemplate.expire(redisKey, Duration.ofMinutes(2));
return true;
}
}模型注册中心
/**
* 模型注册中心
*
* 管理所有可用的LLM模型配置
* 包括:健康状态、定价、能力描述
*/
@Service
@Slf4j
public class ModelRegistryService {
// 模型配置(实际应该从配置中心加载)
private final Map<String, ModelInfo> models = new ConcurrentHashMap<>();
// 模型健康状态(定时检查更新)
private final Map<String, ModelHealthStatus> healthStatuses = new ConcurrentHashMap<>();
@PostConstruct
public void initModels() {
// 注册所有支持的模型
registerModel(ModelInfo.builder()
.modelId("gpt-4o")
.provider("openai")
.tier(ModelRouter.ModelTier.LARGE)
.maxContextTokens(128000)
.inputPricePerKToken(0.005)
.outputPricePerKToken(0.015)
.capabilities(Set.of("text", "vision", "json_mode"))
.fallbackModelIds(List.of("claude-3-5-sonnet", "gpt-4o-mini"))
.build());
registerModel(ModelInfo.builder()
.modelId("gpt-4o-mini")
.provider("openai")
.tier(ModelRouter.ModelTier.SMALL)
.maxContextTokens(128000)
.inputPricePerKToken(0.00015)
.outputPricePerKToken(0.0006)
.capabilities(Set.of("text", "json_mode"))
.fallbackModelIds(List.of("claude-3-haiku"))
.build());
registerModel(ModelInfo.builder()
.modelId("claude-3-5-sonnet")
.provider("anthropic")
.tier(ModelRouter.ModelTier.LARGE)
.maxContextTokens(200000)
.inputPricePerKToken(0.003)
.outputPricePerKToken(0.015)
.capabilities(Set.of("text", "vision", "code"))
.fallbackModelIds(List.of("gpt-4o"))
.build());
registerModel(ModelInfo.builder()
.modelId("claude-3-haiku")
.provider("anthropic")
.tier(ModelRouter.ModelTier.SMALL)
.maxContextTokens(200000)
.inputPricePerKToken(0.00025)
.outputPricePerKToken(0.00125)
.capabilities(Set.of("text"))
.fallbackModelIds(List.of("gpt-4o-mini"))
.build());
}
public void registerModel(ModelInfo model) {
models.put(model.getModelId(), model);
healthStatuses.put(model.getModelId(), ModelHealthStatus.UNKNOWN);
}
public ModelInfo findModel(String modelId) {
return models.get(modelId);
}
public boolean isAvailable(String modelId) {
ModelHealthStatus status = healthStatuses.getOrDefault(modelId, ModelHealthStatus.UNKNOWN);
return status != ModelHealthStatus.DOWN;
}
public ModelInfo findCheapestAvailableModel(ModelRouter.ModelTier tier) {
return models.values().stream()
.filter(m -> m.getTier() == tier)
.filter(m -> isAvailable(m.getModelId()))
.min(Comparator.comparingDouble(ModelInfo::getInputPricePerKToken))
.orElse(null);
}
public ModelInfo getDefaultModel() {
return models.get("gpt-4o-mini");
}
public List<ModelInfo> getFallbackModels(String modelId) {
ModelInfo model = models.get(modelId);
if (model == null || model.getFallbackModelIds() == null) return List.of();
return model.getFallbackModelIds().stream()
.map(models::get)
.filter(m -> m != null && isAvailable(m.getModelId()))
.toList();
}
/**
* 定期检查模型健康状态
*/
@Scheduled(fixedDelay = 30000) // 每30秒检查一次
public void checkModelHealth() {
models.forEach((modelId, model) -> {
try {
// 发一个最小的测试请求
// 实际实现中调用各provider的健康检查接口
healthStatuses.put(modelId, ModelHealthStatus.HEALTHY);
} catch (Exception e) {
log.warn("模型健康检查失败: modelId={}, error={}", modelId, e.getMessage());
healthStatuses.put(modelId, ModelHealthStatus.DOWN);
}
});
}
public enum ModelHealthStatus { UNKNOWN, HEALTHY, DEGRADED, DOWN }
@Data
@Builder
public static class ModelInfo {
private String modelId;
private String provider;
private ModelRouter.ModelTier tier;
private int maxContextTokens;
private double inputPricePerKToken;
private double outputPricePerKToken;
private Set<String> capabilities;
private List<String> fallbackModelIds;
public boolean isAvailable() {
return true; // 实际应检查healthStatuses
}
}
}统一可观测性
/**
* 网关指标服务
*
* 记录每次调用的关键指标,支持按团队、模型、时间维度分析
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class GatewayMetricsService {
private final MeterRegistry meterRegistry;
private final JdbcTemplate jdbc;
/**
* 记录成功调用
*/
public void recordSuccess(String teamId, String modelId, int tokensUsed, long latencyMs) {
// Micrometer指标(实时监控用)
meterRegistry.counter("gateway.requests.success",
"team", teamId, "model", modelId).increment();
meterRegistry.timer("gateway.latency",
"team", teamId, "model", modelId)
.record(latencyMs, java.util.concurrent.TimeUnit.MILLISECONDS);
meterRegistry.counter("gateway.tokens.total",
"team", teamId, "model", modelId)
.increment(tokensUsed);
// 持久化(用于账单和详细分析)
saveToDatabase(teamId, modelId, tokensUsed, latencyMs, "SUCCESS", null);
}
public void recordBlocked(String teamId, String reason) {
meterRegistry.counter("gateway.requests.blocked",
"team", teamId, "reason", reason).increment();
}
public void recordError(String teamId, String errorType, long latencyMs) {
meterRegistry.counter("gateway.requests.error",
"team", teamId, "error_type", errorType).increment();
}
private void saveToDatabase(String teamId, String modelId, int tokens,
long latencyMs, String status, String error) {
try {
jdbc.update("""
INSERT INTO gateway_usage_log
(team_id, model_id, tokens_used, latency_ms, status, error_msg, created_at)
VALUES (?, ?, ?, ?, ?, ?, NOW())
""",
teamId, modelId, tokens, latencyMs, status, error
);
} catch (Exception e) {
log.warn("使用日志写入失败: {}", e.getMessage());
}
}
/**
* 团队使用量报告(用于计费和分析)
*/
public TeamUsageReport getTeamReport(String teamId, LocalDate startDate, LocalDate endDate) {
Map<String, Object> stats = jdbc.queryForMap("""
SELECT
SUM(tokens_used) as total_tokens,
COUNT(*) as total_requests,
COUNT(CASE WHEN status = 'SUCCESS' THEN 1 END) as success_count,
AVG(latency_ms) as avg_latency,
SUM(tokens_used * m.price_per_k_token / 1000.0) as estimated_cost
FROM gateway_usage_log l
LEFT JOIN model_pricing m ON m.model_id = l.model_id
WHERE l.team_id = ?
AND l.created_at BETWEEN ? AND ?
""",
teamId, startDate, endDate
);
return new TeamUsageReport(
teamId,
(long)(Long) stats.get("total_tokens"),
(long)(Long) stats.get("total_requests"),
(double)(Double) stats.get("avg_latency"),
(double)(Double) stats.get("estimated_cost")
);
}
record TeamUsageReport(String teamId, long totalTokens, long totalRequests,
double avgLatencyMs, double estimatedCost) {}
}实践建议
网关从小做起,不要一开始就搞复杂
很多团队看到AI网关的功能列表就想一次性全部实现,结果工程量大,上线慢,等到功能做完业务早就自己乱搞了一套。我的建议是先实现核心三件套:统一入口(认证)+ 成本记录 + 故障转移,这三个功能两周内就能落地,解决最紧迫的问题,其他功能迭代加。
路由规则要可配置,不要硬编码
任务类型 → 模型的映射关系应该放在配置中心,而不是代码里。模型的能力和价格变化很快,如果硬编码在代码里,每次调整都要发布,很烦。把路由规则做成YAML或数据库配置,业务方可以自助调整。
成本告警要早,不要等超额才发现
我在一个项目里吃过这个亏:设置了10000元/月的预算上限,但告警只在超额时触发。结果某个团队写了个循环测试,一天就把预算打完了,然后全公司的服务都被限流。正确做法是在到达50%、80%、95%时都发告警,让团队有足够的反应时间。
