AI中台架构设计:从零构建企业级AI能力共享平台
2026/4/30大约 8 分钟
AI中台架构设计:从零构建企业级AI能力共享平台
适读人群:架构师、技术负责人,需要在公司层面推进AI能力建设的工程师 阅读时长:约22分钟
那场让我反思整晚的复盘会
去年年底,我们公司做了一次AI项目年度复盘。复盘会上,各个业务团队汇报了一年的AI建设成果:
- 客服团队:自研了一套FAQ问答系统,用的OpenAI
- 研发团队:接了一个代码审查工具,用的Claude
- 数据团队:做了个报表解读功能,用的文心一言
- 营销团队:搞了个文案生成器,用的通义千问
一年下来,公司总共上线了7个AI功能。然后CTO问了一个问题,把大家都问沉默了:
"我们这七个功能,有哪些是可以复用的?"
沉默了二十秒,没有人说话。
因为大家心里都清楚:这七个功能几乎是独立建设的,各自有各自的LLM调用逻辑、各自的向量库、各自的会话管理,代码之间没有任何复用。更糟的是,这七个功能分散在七个不同的代码库里,连监控都是分开的,完全不知道全公司一个月花了多少钱在LLM上。
那天晚上,我反思了很久。当AI功能数量超过3个,就必须考虑AI中台了。不然就是在重复造轮子,而且是高成本的轮子。
今天,我来聊聊如何从零设计一个企业级AI中台。
AI中台要解决什么问题
AI中台核心价值:能力复用、成本可控、风险统一管控。
AI中台整体架构设计
核心模块实现
1. LLM路由服务
/**
* LLM路由服务
* 根据策略自动选择最优的LLM提供商
* 支持:成本优先、速度优先、质量优先、故障自动切换
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class LlmRouterService {
private final Map<String, ChatClient> chatClients; // Bean名称 -> ChatClient
private final LlmProviderConfigRepository providerConfigRepo;
private final LlmHealthChecker healthChecker;
private final CostCalculator costCalculator;
@Getter
@Builder
public static class RouteRequest {
private String tenantId; // 租户ID(哪个业务系统)
private String scenarioCode; // 场景码(如:customer_service, code_review)
private RoutingStrategy strategy; // 路由策略
private String requiredCapability; // 必需能力(如:function_calling, vision)
private Integer estimatedTokens; // 预估Token数
}
public enum RoutingStrategy {
COST_OPTIMAL, // 成本最优
SPEED_OPTIMAL, // 速度最优
QUALITY_OPTIMAL, // 质量最优
SCENARIO_BOUND // 场景绑定(由配置决定)
}
/**
* 核心路由逻辑:根据策略选择ChatClient
*/
public ChatClient route(RouteRequest request) {
// 1. 获取场景绑定配置
if (request.getStrategy() == RoutingStrategy.SCENARIO_BOUND) {
String boundProvider = getScenarioBoundProvider(
request.getTenantId(), request.getScenarioCode());
if (boundProvider != null) {
ChatClient client = chatClients.get(boundProvider);
if (client != null && healthChecker.isHealthy(boundProvider)) {
log.debug("场景绑定路由: scenario={}, provider={}",
request.getScenarioCode(), boundProvider);
return client;
}
}
}
// 2. 过滤掉不健康的提供商
List<String> availableProviders = chatClients.keySet().stream()
.filter(healthChecker::isHealthy)
.filter(p -> meetsCapabilityRequirements(p, request.getRequiredCapability()))
.collect(Collectors.toList());
if (availableProviders.isEmpty()) {
throw new NoAvailableProviderException("所有LLM提供商不可用");
}
// 3. 按策略选择
String selectedProvider = switch (request.getStrategy()) {
case COST_OPTIMAL -> selectCheapest(availableProviders, request.getEstimatedTokens());
case SPEED_OPTIMAL -> selectFastest(availableProviders);
case QUALITY_OPTIMAL -> selectHighestQuality(availableProviders);
default -> availableProviders.get(0);
};
log.info("路由决策: tenant={}, scenario={}, strategy={}, selected={}",
request.getTenantId(), request.getScenarioCode(),
request.getStrategy(), selectedProvider);
return chatClients.get(selectedProvider);
}
private String selectCheapest(List<String> providers, Integer estimatedTokens) {
return providers.stream()
.min(Comparator.comparingDouble(p ->
costCalculator.estimateCost(p, estimatedTokens != null ? estimatedTokens : 1000)))
.orElse(providers.get(0));
}
private String selectFastest(List<String> providers) {
return providers.stream()
.min(Comparator.comparingDouble(healthChecker::getAverageLatency))
.orElse(providers.get(0));
}
private String selectHighestQuality(List<String> providers) {
// 质量评分来自配置,可以基于benchmark定期更新
return providers.stream()
.max(Comparator.comparingDouble(providerConfigRepo::getQualityScore))
.orElse(providers.get(0));
}
}2. 统一AI服务门面
/**
* AI中台统一服务门面
* 所有业务系统通过这个接口调用AI能力
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class AiPlatformFacade {
private final LlmRouterService routerService;
private final PromptVersionManager promptManager;
private final VectorStoreService vectorService;
private final SessionService sessionService;
private final CostTrackingService costTracker;
private final RateLimiterService rateLimiter;
private final AuditLogService auditLogger;
/**
* 通用问答接口
* 业务系统调用这个接口,无需关心底层用哪个模型
*/
public AiResponse chat(AiChatRequest request) {
// 1. 鉴权和限流
validateAndCheckLimits(request);
// 2. 选择路由策略
LlmRouterService.RouteRequest routeRequest = LlmRouterService.RouteRequest.builder()
.tenantId(request.getTenantId())
.scenarioCode(request.getScenarioCode())
.strategy(determineStrategy(request))
.estimatedTokens(estimateTokens(request))
.build();
ChatClient chatClient = routerService.route(routeRequest);
// 3. 加载Prompt模板
String systemPrompt = promptManager.getActivePrompt(
request.getTenantId(), request.getScenarioCode());
// 4. 如果需要RAG,执行检索
String contextStr = "";
if (request.isEnableRag() && request.getKnowledgeBaseId() != null) {
List<Document> docs = vectorService.search(
request.getKnowledgeBaseId(), request.getMessage(), 5);
contextStr = docs.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n\n"));
}
// 5. 构建最终Prompt
String finalSystemPrompt = buildFinalSystemPrompt(systemPrompt, contextStr);
// 6. 获取会话历史(多轮对话)
List<Message> historyMessages = sessionService.getHistory(
request.getSessionId(), request.getMaxHistoryTurns());
// 7. 执行调用
long startTime = System.currentTimeMillis();
try {
ChatClient.ChatClientRequestSpec spec = chatClient.prompt()
.system(finalSystemPrompt)
.messages(historyMessages)
.user(request.getMessage());
String responseContent = spec.call().content();
long latencyMs = System.currentTimeMillis() - startTime;
// 8. 保存会话历史
sessionService.appendMessage(request.getSessionId(),
request.getMessage(), responseContent);
// 9. 成本统计
costTracker.record(request.getTenantId(), request.getScenarioCode(),
routeRequest, estimateActualTokens(request.getMessage(), responseContent));
// 10. 审计日志
auditLogger.log(request, responseContent, latencyMs);
return AiResponse.success(responseContent, latencyMs);
} catch (Exception e) {
log.error("AI调用失败: tenant={}, scenario={}",
request.getTenantId(), request.getScenarioCode(), e);
auditLogger.logFailure(request, e.getMessage());
throw new AiPlatformException("AI服务调用失败", e);
}
}
/**
* 批量处理接口(适合非实时场景,成本更低)
*/
@Async("aiTaskExecutor")
public CompletableFuture<List<AiResponse>> batchProcess(List<AiChatRequest> requests) {
List<AiResponse> results = requests.parallelStream()
.map(req -> {
try {
return chat(req);
} catch (Exception e) {
return AiResponse.failed(e.getMessage());
}
})
.collect(Collectors.toList());
return CompletableFuture.completedFuture(results);
}
/**
* 流式接口
*/
public Flux<String> streamChat(AiChatRequest request) {
validateAndCheckLimits(request);
ChatClient chatClient = routerService.route(
LlmRouterService.RouteRequest.builder()
.tenantId(request.getTenantId())
.scenarioCode(request.getScenarioCode())
.strategy(LlmRouterService.RoutingStrategy.SCENARIO_BOUND)
.build()
);
String systemPrompt = promptManager.getActivePrompt(
request.getTenantId(), request.getScenarioCode());
return chatClient.prompt()
.system(systemPrompt)
.user(request.getMessage())
.stream()
.content()
.doOnComplete(() -> log.debug("流式响应完成: session={}", request.getSessionId()))
.doOnError(e -> log.error("流式响应失败", e));
}
private void validateAndCheckLimits(AiChatRequest request) {
RateLimitResult result = rateLimiter.check(request.getTenantId(), request.getScenarioCode());
if (!result.isAllowed()) {
throw new RateLimitExceededException(result.getMessage());
}
}
}3. 成本追踪与预算管理
/**
* AI成本追踪服务
* 实现按租户、按场景的精细化成本管理
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class CostTrackingService {
private final RedisTemplate<String, String> redisTemplate;
private final CostRecordRepository costRecordRepo;
private final BudgetAlertService budgetAlertService;
// 各模型的单价(美元/1M tokens)
private static final Map<String, ModelCost> MODEL_COSTS = Map.of(
"gpt-4o", new ModelCost(5.0, 15.0),
"gpt-4o-mini", new ModelCost(0.15, 0.6),
"claude-3-5-sonnet", new ModelCost(3.0, 15.0),
"qwen-max", new ModelCost(0.04, 0.12),
"ernie-4.0", new ModelCost(0.012, 0.012)
);
record ModelCost(double inputPricePer1M, double outputPricePer1M) {}
/**
* 记录一次调用的成本
*/
public void record(String tenantId, String scenarioCode,
LlmRouterService.RouteRequest routeRequest,
TokenUsage usage) {
String modelName = "gpt-4o-mini"; // 从route信息获取实际模型
ModelCost cost = MODEL_COSTS.getOrDefault(modelName, new ModelCost(0.15, 0.6));
double inputCost = usage.getPromptTokens() * cost.inputPricePer1M() / 1_000_000;
double outputCost = usage.getCompletionTokens() * cost.outputPricePer1M() / 1_000_000;
double totalCost = inputCost + outputCost;
// 实时累计到Redis(用于快速查询当前用量)
String dayKey = String.format("cost:daily:%s:%s:%s",
tenantId, LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")), scenarioCode);
redisTemplate.opsForValue().increment(dayKey + ":tokens",
usage.getPromptTokens() + usage.getCompletionTokens());
// 成本存为微分(避免浮点数精度问题)
redisTemplate.opsForValue().increment(dayKey + ":cost_microdollar",
(long)(totalCost * 1_000_000));
redisTemplate.expire(dayKey + ":tokens", 7, TimeUnit.DAYS);
redisTemplate.expire(dayKey + ":cost_microdollar", 7, TimeUnit.DAYS);
// 异步持久化到数据库
costRecordRepo.save(CostRecord.builder()
.tenantId(tenantId)
.scenarioCode(scenarioCode)
.modelName(modelName)
.promptTokens(usage.getPromptTokens())
.completionTokens(usage.getCompletionTokens())
.costUsd(totalCost)
.recordedAt(LocalDateTime.now())
.build());
// 检查是否超出预算
checkBudget(tenantId, scenarioCode, totalCost);
}
/**
* 查询租户今日用量
*/
public CostSummary getTodayCost(String tenantId) {
String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
// 汇总所有场景的今日成本
// ... 实现省略
return CostSummary.empty();
}
private void checkBudget(String tenantId, String scenarioCode, double newCost) {
// 获取当月累计成本
double monthlyTotal = costRecordRepo.getMonthlyTotal(tenantId);
double budget = getBudgetLimit(tenantId);
if (monthlyTotal > budget * 0.9) {
budgetAlertService.sendWarning(tenantId,
String.format("AI费用已达月度预算的%.0f%%,请注意控制", monthlyTotal / budget * 100));
}
if (monthlyTotal > budget) {
log.warn("租户{}超出月度预算: used={}, budget={}", tenantId, monthlyTotal, budget);
// 可以选择自动降级到更便宜的模型
}
}
private double getBudgetLimit(String tenantId) {
// 从配置中心获取预算限制
return 1000.0; // 默认1000美元
}
}能力市场设计
AI中台的高阶功能:把沉淀的AI能力封装成可复用的"能力组件",其他业务系统直接订阅使用。
一个业务团队要用"文档摘要"能力,不需要自己写Prompt、自己选模型、自己处理Token限制,直接订阅中台的"文档摘要"能力,一行代码搞定。这就是AI中台最大的价值。
落地建议:分三阶段推进
| 阶段 | 目标 | 周期 | 关键产出 |
|---|---|---|---|
| 阶段一:统一接入 | 所有AI调用走统一网关 | 1-2个月 | 成本可见、统一鉴权 |
| 阶段二:能力沉淀 | 抽取通用能力,建能力库 | 2-3个月 | 能力复用率提升 |
| 阶段三:中台完善 | 能力市场、自助配置、全面监控 | 3-6个月 | 新功能上线提速3倍以上 |
最重要的原则:不要一开始就想把中台做成大而全,先解决"成本不透明"这一个问题,就能赢得管理层支持,后续才能持续投入。
