AI 中间件的设计——公司内部的 AI 能力平台怎么做
AI 中间件的设计——公司内部的 AI 能力平台怎么做
两年前,我在一家 500 人的科技公司工作。那时候公司里大概有七八个团队在做跟 AI 相关的事情:客服团队在接 OpenAI 做智能客服,推荐团队在用 Embedding 做个性化推荐,研发效能团队在接 GitHub Copilot 的 API 做代码审查助手,还有两个业务团队在"自研"大模型应用(其实就是套壳调用)。
每个团队都自己管 API Key,自己写 HTTP 调用,自己处理错误重试,自己看账单。我有次开会,财务同事拿着一张账单找到我问:"这个月 OpenAI 的费用比上个月多了 40%,你知道是哪个团队搞的吗?"
我说不知道。她说她也不知道。
这就是没有统一 AI 能力平台的真实写照:钱不知道花在哪,出了问题不知道是谁的锅,同样的能力在七八个地方重复实现了七八遍。
那次开完会我就开始推动建一套 AI 中间件平台。这篇文章把整个设计思路讲清楚。
为什么需要 AI 中间件
这个问题值得先说清楚,不然你去推动的时候,CTO 会问你:直接调 API 不行吗?为什么要多加一层?
理由一:统一密钥管理 各团队自持 API Key,一旦有人离职或者 Key 泄露,排查难度极大。中间件层统一持有所有 Key,业务层只需要内部 Token。
理由二:成本可见性 每个请求必须携带团队标识,中间件记录每笔消耗,财务随时可以出按团队、按项目维度的账单报告。这个在大公司里是刚需。
理由三:模型路由和降级 今天 GPT-4 贵了,切到 Claude?今天 OpenAI 抽风了,自动切到 Azure OpenAI 的备份节点?这些逻辑写在每个团队各自的代码里是噩梦,写在中间件里是常规操作。
理由四:安全审计 某些行业(金融、医疗)对 AI 的输入输出有合规要求,必须留存审计日志。统一中间件让这件事成本趋近于零。
理由五:能力复用 Embedding、向量召回、Prompt 模板管理——这些不是每个团队都要自己做一遍的,统一提供 SDK 就好。
整体架构设计
核心接口设计
中间件对外暴露的 API 要尽量简单,业务团队不需要关心底层复杂性:
/**
* AI 能力平台对外的统一接口
*/
public interface AICapabilityService {
/**
* 文本生成(支持流式)
*/
ChatResponse chat(ChatRequest request);
Flux<String> chatStream(ChatRequest request);
/**
* Embedding 生成
*/
EmbeddingResponse embed(EmbeddingRequest request);
List<EmbeddingResponse> embedBatch(List<EmbeddingRequest> requests);
/**
* 使用 Prompt 模板(避免硬编码 Prompt)
*/
ChatResponse chatWithTemplate(String templateId, Map<String, Object> variables);
/**
* 语义搜索(RAG 召回,中间件内部处理向量检索)
*/
List<SearchResult> semanticSearch(SemanticSearchRequest request);
}
/**
* Chat 请求对象——业务层只需要填这些
*/
@Data
@Builder
public class ChatRequest {
// 必填:调用方标识
private String appId;
private String projectId;
// 必填:消息内容
private String userMessage;
private List<Message> history; // 可选,对话历史
// 可选:模型偏好(不填则走路由策略)
private ModelPreference modelPreference; // BEST / BALANCED / CHEAPEST / FAST
// 可选:自定义参数(中间件有合理默认值)
private Float temperature;
private Integer maxTokens;
// 可选:RAG 上下文(如果业务层自己做了检索)
private List<String> contextDocuments;
public enum ModelPreference {
BEST, // 最佳质量,不计成本
BALANCED, // 质量和成本平衡
CHEAPEST, // 最低成本
FAST // 最低延迟
}
}内部实现的核心流程:
@Service
public class AICapabilityServiceImpl implements AICapabilityService {
@Autowired
private AppAuthService authService;
@Autowired
private QuotaService quotaService;
@Autowired
private ModelRouter modelRouter;
@Autowired
private AuditService auditService;
@Autowired
private SemanticCacheService cacheService;
@Override
public ChatResponse chat(ChatRequest request) {
// 1. 认证鉴权
AppContext appCtx = authService.validateAndLoad(request.getAppId());
// 2. 配额检查
quotaService.checkAndPreDeduct(appCtx, estimateTokens(request));
// 3. 检查语义缓存
Optional<ChatResponse> cached = cacheService.get(request);
if (cached.isPresent()) {
auditService.recordCacheHit(appCtx, request);
return cached.get();
}
// 4. 模型路由选择
ModelEndpoint endpoint = modelRouter.route(request, appCtx);
// 5. 调用(含重试)
ChatResponse response = callWithRetry(endpoint, request, appCtx);
// 6. 更新实际配额消耗
quotaService.reconcile(appCtx, response.getUsage().getTotalTokens());
// 7. 写入缓存
cacheService.put(request, response);
// 8. 审计日志(异步)
auditService.recordAsync(appCtx, request, response);
return response;
}
private ChatResponse callWithRetry(ModelEndpoint endpoint,
ChatRequest request,
AppContext appCtx) {
int maxRetries = 3;
int attempt = 0;
while (attempt < maxRetries) {
try {
return endpoint.getClient().call(buildPrompt(request));
} catch (RateLimitException e) {
// OpenAI 限速,切换到备用节点
endpoint = modelRouter.fallback(endpoint, request, appCtx);
attempt++;
if (attempt >= maxRetries) throw e;
// 指数退避
sleepWithJitter(attempt * 1000L);
} catch (ServiceUnavailableException e) {
// 服务不可用,切换备用模型
endpoint = modelRouter.fallback(endpoint, request, appCtx);
attempt++;
if (attempt >= maxRetries) throw new AIServiceException("All endpoints failed", e);
}
}
throw new AIServiceException("Exhausted all retry attempts");
}
private void sleepWithJitter(long baseMs) {
long jitter = (long)(Math.random() * baseMs * 0.3);
try {
Thread.sleep(baseMs + jitter);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}模型路由器:智能选模型
模型路由器是 AI 中间件里最有价值的模块之一。一个好的路由策略能显著降低成本,同时保证质量:
@Service
public class ModelRouter {
// 模型端点配置(可以热更新)
private final List<ModelEndpoint> endpoints;
// 健康状态缓存
private final Map<String, ModelHealth> healthCache = new ConcurrentHashMap<>();
public ModelEndpoint route(ChatRequest request, AppContext appCtx) {
List<ModelEndpoint> candidates = filterHealthyEndpoints();
// 根据请求的偏好策略选择模型
return switch (request.getModelPreference()) {
case BEST -> selectBest(candidates);
case BALANCED -> selectBalanced(candidates, request);
case CHEAPEST -> selectCheapest(candidates, request);
case FAST -> selectFastest(candidates);
case null -> selectBalanced(candidates, request); // 默认
};
}
private ModelEndpoint selectBalanced(List<ModelEndpoint> candidates, ChatRequest request) {
int estimatedTokens = estimateTokens(request);
// 简单逻辑:短请求用便宜模型,长请求用好模型
if (estimatedTokens < 500) {
return candidates.stream()
.filter(e -> e.getCostPerToken() < 0.002) // 低成本阈值
.findFirst()
.orElse(candidates.get(0));
} else {
return selectBest(candidates);
}
}
private ModelEndpoint selectCheapest(List<ModelEndpoint> candidates, ChatRequest request) {
return candidates.stream()
.min(Comparator.comparingDouble(ModelEndpoint::getCostPerToken))
.orElseThrow();
}
private ModelEndpoint selectFastest(List<ModelEndpoint> candidates) {
return candidates.stream()
.min(Comparator.comparingDouble(e ->
healthCache.getOrDefault(e.getId(), ModelHealth.unknown()).getP50LatencyMs()))
.orElseThrow();
}
private ModelEndpoint selectBest(List<ModelEndpoint> candidates) {
// 质量排名硬编码(或者从配置中心读)
List<String> qualityRanking = List.of("gpt-4o", "claude-3-5-sonnet", "gpt-4-turbo", "gpt-3.5-turbo");
for (String model : qualityRanking) {
Optional<ModelEndpoint> match = candidates.stream()
.filter(e -> e.getModelId().equals(model))
.findFirst();
if (match.isPresent()) return match.get();
}
return candidates.get(0);
}
public ModelEndpoint fallback(ModelEndpoint failed, ChatRequest request, AppContext appCtx) {
// 标记失败节点
healthCache.computeIfPresent(failed.getId(), (k, v) -> v.markFailed());
// 选一个不同提供商的节点
return filterHealthyEndpoints().stream()
.filter(e -> !e.getProvider().equals(failed.getProvider()))
.findFirst()
.orElseThrow(() -> new AIServiceException("No fallback available"));
}
private List<ModelEndpoint> filterHealthyEndpoints() {
return endpoints.stream()
.filter(e -> {
ModelHealth health = healthCache.get(e.getId());
return health == null || health.isHealthy();
})
.collect(Collectors.toList());
}
}审计日志:不是记日志,是出报告
很多人以为审计日志就是 log.info("调用了 AI") 这种级别的。不是。生产环境的 AI 审计日志需要能支撑这些查询:
- 查:这个月客服团队花了多少钱?
- 查:昨天下午 3 点,用户 12345 的那次对话,AI 说了什么?
- 查:我们用了 GPT-4 的请求里,平均 token 消耗是多少?
@Data
@Builder
@Entity
@Table(name = "ai_audit_log")
public class AIAuditLog {
@Id
private String requestId;
// 调用方信息
private String appId;
private String projectId;
private String teamId;
private String userId;
// 请求信息
@Column(columnDefinition = "TEXT")
private String requestContent; // JSON 格式
private String modelId;
private String modelProvider;
// 响应信息
@Column(columnDefinition = "TEXT")
private String responseContent; // JSON 格式
private int promptTokens;
private int completionTokens;
private int totalTokens;
// 性能指标
private long latencyMs;
private long ttftMs; // Time to First Token
private boolean cacheHit;
// 成本信息
private double costUsd;
// 时间
private Instant createdAt;
private Instant completedAt;
// 状态
private String status; // SUCCESS / FAILED / RATE_LIMITED
private String errorCode;
private String errorMessage;
}
// 成本报告服务
@Service
public class CostReportService {
@Autowired
private AIAuditLogRepository logRepo;
public CostReport generateTeamReport(String teamId, YearMonth month) {
LocalDate start = month.atDay(1);
LocalDate end = month.atEndOfMonth();
List<AIAuditLog> logs = logRepo.findByTeamIdAndDateRange(
teamId, start.atStartOfDay(), end.atTime(23, 59, 59)
);
Map<String, TeamProjectCost> projectCosts = logs.stream()
.collect(Collectors.groupingBy(
AIAuditLog::getProjectId,
Collectors.collectingAndThen(
Collectors.toList(),
this::calculateProjectCost
)
));
double totalCost = projectCosts.values().stream()
.mapToDouble(TeamProjectCost::getTotalCostUsd)
.sum();
long totalTokens = logs.stream().mapToLong(AIAuditLog::getTotalTokens).sum();
return CostReport.builder()
.teamId(teamId)
.month(month)
.totalCostUsd(totalCost)
.totalTokens(totalTokens)
.projectBreakdown(projectCosts)
.cacheHitRate(calculateCacheHitRate(logs))
.build();
}
private double calculateCacheHitRate(List<AIAuditLog> logs) {
if (logs.isEmpty()) return 0.0;
long cacheHits = logs.stream().filter(AIAuditLog::isCacheHit).count();
return (double) cacheHits / logs.size();
}
}那次推动 AI 中间件的真实经历
讲一段真实的事情。当我提出要建 AI 中间件的时候,遇到的第一个阻力不是技术,是人。
有个团队的技术 Lead 直接在会上说:"我不需要一个统一平台,我需要的是灵活性。你搞一个中间件出来,我要用一个新模型还得等你排期?"
这个反驳是有道理的。我当时的回答是:"你现在接入一个新模型,需要自己处理:API Key 申请(走采购流程)、错误重试逻辑、速率限制处理、账单对账、审计合规。换到中间件之后,你只需要在配置文件里改一个 model_preference 字段。"
他沉默了一下,问了一个问题:"如果中间件挂了怎么办?"
这是最核心的质疑。我的解决方案是:
- 中间件做成高可用的(多实例 + 负载均衡,同城双活)
- SDK 里内置 Circuit Breaker,中间件不可用时自动降级到直连模式
- 保证中间件的 SLA 不低于上游模型提供商的 SLA
最后这个团队同意了,但有一个条件:他们要保留直连模式的能力作为备用。这个要求是合理的,我接受了。
推动这件事花了三个月,从提方案到上线。第一个季度上线之后,公司的 AI 成本因为语义缓存降低了 28%,因为智能路由(把可以用便宜模型处理的请求切到 GPT-3.5)降低了又 19%。财务部门非常高兴,CTO 批了我组建专门的 AI 平台团队。
这是我做过的最值得的一个技术决策。
SDK 设计:降低业务团队的接入成本
中间件的价值,一半在平台本身,一半在 SDK 的易用性。如果接入要写两百行代码,没人愿意用。
// 目标:业务团队 5 行代码完成接入
@Configuration
public class AIConfig {
@Bean
public AICapabilityService aiService() {
return AIClientBuilder.builder()
.appId("your-app-id")
.appSecret("your-app-secret")
.gatewayUrl("https://ai-gateway.internal.company.com")
.build();
}
}
// 使用:就这么简单
@Service
public class CustomerService {
@Autowired
private AICapabilityService aiService;
public String answerQuestion(String question, String customerId) {
return aiService.chat(ChatRequest.builder()
.appId("customer-service-app")
.projectId("faq-bot")
.userMessage(question)
.modelPreference(ModelPreference.BALANCED)
.build())
.getContent();
}
}监控体系:你必须知道 AI 系统在做什么
中间件层是天然的监控采集点。至少要监控这几个维度:
P50/P95/P99 延迟(按模型、按团队)
Token 消耗速率(实时)
错误率(按错误类型:限速、超时、内容过滤)
缓存命中率
成本消耗速率(USD/hour)
模型可用性(各端点健康状态)我用的是 Micrometer + Prometheus + Grafana 这套,在中间件里埋点:
@Component
public class AIMetrics {
private final MeterRegistry registry;
// 请求计数器(带标签)
private final Counter requestCounter;
// 延迟直方图
private final Timer latencyTimer;
// Token 计数器
private final Counter tokenCounter;
// 成本计数器
private final Counter costCounter;
public AIMetrics(MeterRegistry registry) {
this.registry = registry;
this.requestCounter = Counter.builder("ai.requests.total")
.description("Total AI requests")
.register(registry);
this.latencyTimer = Timer.builder("ai.request.latency")
.description("AI request latency")
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry);
this.tokenCounter = Counter.builder("ai.tokens.total")
.description("Total tokens consumed")
.register(registry);
this.costCounter = Counter.builder("ai.cost.usd")
.description("Total cost in USD")
.register(registry);
}
public void record(AIAuditLog log) {
Tags tags = Tags.of(
"app_id", log.getAppId(),
"team_id", log.getTeamId(),
"model_id", log.getModelId(),
"status", log.getStatus(),
"cache_hit", String.valueOf(log.isCacheHit())
);
requestCounter.increment(tags);
latencyTimer.record(log.getLatencyMs(), TimeUnit.MILLISECONDS, tags);
tokenCounter.increment(log.getTotalTokens(), tags);
costCounter.increment(log.getCostUsd(), tags);
}
}小结
一个完整的 AI 中间件平台,核心要解决四个问题:
- 统一接入:屏蔽底层模型差异,业务团队只和 SDK 打交道
- 智能路由:根据请求特征和模型健康状态,自动选择最合适的端点
- 成本可见:每笔消耗都有记录,按团队/项目维度出报告
- 安全审计:全量留存调用记录,支持合规和事后排查
推动这件事最难的不是技术,是让每个团队相信"一个公共平台比自己搞一套成本更低"。这需要用数据说话,需要把接入成本做得足够低,需要保证平台的可靠性不拖团队的后腿。
