微服务架构下的 AI 能力共享——横跨多个服务的 AI 调用怎么管理
微服务架构下的 AI 能力共享——横跨多个服务的 AI 调用怎么管理
三个月前我接到一个 Code Review 请求,看一个同事写的代码。打开一看,好家伙,一个电商系统,四个不同的微服务——商品服务、订单服务、客服服务、推荐服务——每个服务里都有一个几乎一样的 ChatGPTClient.java,每个都自己配了 API Key,每个都自己搞了一套重试逻辑,代码复制粘贴的痕迹非常明显。
我问他:"为什么不做成共享服务?"
他说:"每个服务的需求不一样,抽出来太复杂了。"
这个回答我理解,但不认同。AI 能力共享不只是代码复用的问题,更核心的是治理问题:
- API Key 分散在各服务,一旦泄露,要改 N 个地方
- 各服务各自限流,整体对 LLM 的调用量难以控制
- 成本无法按服务分摊,不知道哪个服务在烧钱
- 模型升级时,要修改 N 个服务
今天这篇文章,我来系统讲微服务架构下 AI 能力共享的架构设计、实现方案,以及什么情况下该共享、什么情况下该嵌入。
每个微服务都自己接大模型的问题
我把这种模式叫"分散式 AI 接入",它的问题不只是代码重复:
安全问题:API Key 出现在多个服务的配置文件里,任何一个服务被突破都会导致 API Key 泄露。
限流失控:每个服务自己做限流,但对 LLM 提供商来说是同一个账号。商品服务和客服服务同时高峰,加起来的请求量可能超过整体配额,导致双方都被限速。
成本盲区:月底算账发现 LLM 成本超了预算,但不知道是哪个服务用的。
运维噩梦:模型从 gpt-4 升级到更新的版本,需要修改 4 个服务的代码、测试、发布,每个服务还可能有不同的 Prompt 版本管理问题。
共享 AI 服务 vs 嵌入式 AI 的取舍
在讨论怎么做之前,先说清楚什么时候不应该做共享服务。
嵌入式 AI 更合适的场景
- 服务有强隔离要求:比如金融系统里,不同业务线的 AI 功能需要严格隔离,不能走同一个服务
- AI 功能高度专一:某个服务的 AI 功能非常定制化,几乎不会被复用
- 团队边界清晰:不同团队负责不同服务,共享服务会带来跨团队协作成本
- 系统规模还小:两三个服务的早期阶段,过度设计是浪费
共享 AI 服务更合适的场景
- 多个服务有相似的 AI 需求:文本理解、内容审核、语义搜索这类通用能力
- 需要统一的成本管理和配额控制
- 需要集中的监控和告警
- 团队有专门的 AI 平台能力建设意愿
共享 AI 服务的架构设计
推荐的架构是:建立一个独立的 AI Gateway Service,作为所有微服务访问 LLM 的统一入口。
这个架构的核心设计决策:
AI Gateway 不是 API 网关的一部分:AI Gateway 有自己的业务逻辑(配额管理、模型路由、语义缓存),职责比 API 网关重得多,要单独部署。
下游服务用服务身份(Service Identity)认证:不是用户的 JWT,而是服务级别的认证 token,用来追踪哪个服务消耗了多少。
配额按服务分配:每个服务有独立的 token 配额,超了自动限流,不影响其他服务。
代码:共享 AI 服务的接口设计
AI Gateway 服务的 REST 接口
// AI Gateway 的请求模型
@Data
@Builder
public class AIGatewayRequest {
private String requestId; // 客户端生成的请求ID,用于幂等
private String serviceId; // 调用方服务ID
private String scenarioCode; // 业务场景代码(用于路由和配额)
private List<ChatMessage> messages;
private AIRequestOptions options;
@Data
@Builder
public static class AIRequestOptions {
private Integer maxTokens;
private Double temperature;
private Boolean stream;
private String preferredModel; // 可选,优先使用的模型
private Map<String, Object> metadata;
}
}
@Data
@Builder
public class AIGatewayResponse {
private String requestId;
private String content;
private String model; // 实际使用的模型
private TokenUsage usage;
private String finishReason;
@Data
@Builder
public static class TokenUsage {
private int inputTokens;
private int outputTokens;
private double estimatedCost; // 估算成本(美元)
}
}AI Gateway Controller
@RestController
@RequestMapping("/api/v1/ai")
@Slf4j
public class AIGatewayController {
@Autowired
private AIGatewayService gatewayService;
@Autowired
private ServiceAuthenticator serviceAuthenticator;
/**
* 同步调用接口
*/
@PostMapping("/chat")
public ResponseEntity<AIGatewayResponse> chat(
@RequestHeader("X-Service-Token") String serviceToken,
@RequestBody AIGatewayRequest request) {
// 服务身份验证
ServiceIdentity identity = serviceAuthenticator.authenticate(serviceToken);
if (identity == null) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(null);
}
// 配额检查
QuotaCheckResult quotaCheck = gatewayService.checkQuota(
identity.getServiceId(), request.getScenarioCode());
if (!quotaCheck.isAllowed()) {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.header("X-Quota-Reset", String.valueOf(quotaCheck.getResetTimeMs()))
.build();
}
try {
AIGatewayResponse response = gatewayService.process(identity, request);
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("AI Gateway 处理失败: serviceId={}, error={}",
identity.getServiceId(), e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
/**
* 流式调用接口(SSE)
*/
@PostMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
@RequestHeader("X-Service-Token") String serviceToken,
@RequestBody AIGatewayRequest request) {
ServiceIdentity identity = serviceAuthenticator.authenticate(serviceToken);
if (identity == null) {
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("{\"error\": \"Unauthorized\"}")
.build());
}
return gatewayService.processStream(identity, request)
.map(token -> ServerSentEvent.<String>builder()
.event("token")
.data(token)
.build())
.onErrorResume(e -> Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("{\"error\": \"" + e.getMessage() + "\"}")
.build()));
}
}服务认证和配额管理
@Service
public class ServiceAuthenticator {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String SERVICE_TOKEN_PREFIX = "service:token:";
/**
* 验证服务 Token,返回服务身份信息
*/
public ServiceIdentity authenticate(String serviceToken) {
if (serviceToken == null || !serviceToken.startsWith("svc_")) {
return null;
}
String key = SERVICE_TOKEN_PREFIX + serviceToken;
String serviceInfoJson = redisTemplate.opsForValue().get(key);
if (serviceInfoJson == null) {
return null;
}
// 反序列化服务身份信息
return parseServiceIdentity(serviceInfoJson);
}
private ServiceIdentity parseServiceIdentity(String json) {
// JSON 解析逻辑
// ...
return new ServiceIdentity();
}
}
@Service
public class QuotaManager {
@Autowired
private RedisTemplate<String, String> redisTemplate;
// 各服务每小时 token 配额配置
private final Map<String, Long> serviceHourlyQuota = Map.of(
"product-service", 500_000L, // 50万 tokens/小时
"order-service", 200_000L,
"customer-service", 800_000L, // 客服场景用量大
"recommend-service", 1_000_000L // 推荐场景批量处理
);
/**
* 检查并消耗 token 配额(原子操作)
*/
public QuotaCheckResult consumeQuota(String serviceId, int estimatedTokens) {
Long hourlyLimit = serviceHourlyQuota.getOrDefault(serviceId, 100_000L);
String quotaKey = buildQuotaKey(serviceId);
// 使用 Lua 脚本原子地检查和消耗配额
String luaScript = """
local current = redis.call('GET', KEYS[1])
local limit = tonumber(ARGV[1])
local consume = tonumber(ARGV[2])
if current == false then
current = 0
else
current = tonumber(current)
end
if current + consume > limit then
return {0, current, limit}
end
local newValue = redis.call('INCRBY', KEYS[1], consume)
redis.call('EXPIRE', KEYS[1], 3600)
return {1, newValue, limit}
""";
List<Long> result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, List.class),
List.of(quotaKey),
hourlyLimit.toString(),
String.valueOf(estimatedTokens)
);
boolean allowed = result.get(0) == 1;
long currentUsage = result.get(1);
long limit = result.get(2);
return QuotaCheckResult.builder()
.allowed(allowed)
.currentUsage(currentUsage)
.limit(limit)
.resetTimeMs(getNextHourResetTime())
.build();
}
private String buildQuotaKey(String serviceId) {
// 按小时分桶
String hourBucket = LocalDateTime.now()
.format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
return "ai:quota:" + serviceId + ":" + hourBucket;
}
private long getNextHourResetTime() {
LocalDateTime nextHour = LocalDateTime.now()
.truncatedTo(ChronoUnit.HOURS)
.plusHours(1);
return nextHour.toInstant(ZoneOffset.UTC).toEpochMilli();
}
}模型路由策略
@Component
public class ModelRouter {
private static final Logger log = LoggerFactory.getLogger(ModelRouter.class);
// 场景到模型的路由配置
private static final Map<String, String> SCENARIO_MODEL_MAP = Map.of(
"customer_service_simple", "claude-haiku-3", // 简单客服用小模型
"customer_service_complex", "claude-sonnet-4-5", // 复杂问题用中等模型
"code_analysis", "claude-opus-4-5", // 代码分析用最强模型
"content_moderation", "claude-haiku-3", // 内容审核用快速模型
"recommendation", "claude-sonnet-4-5", // 推荐场景
"default", "claude-sonnet-4-5"
);
/**
* 根据业务场景和请求特征选择合适的模型
*/
public String selectModel(String scenarioCode,
AIGatewayRequest request,
ServiceIdentity identity) {
// 如果调用方明确指定了模型,在权限允许的情况下使用
if (request.getOptions() != null &&
request.getOptions().getPreferredModel() != null) {
if (hasModelPermission(identity, request.getOptions().getPreferredModel())) {
return request.getOptions().getPreferredModel();
}
}
// 根据场景和 token 估算选择模型
String baseModel = SCENARIO_MODEL_MAP.getOrDefault(
scenarioCode, SCENARIO_MODEL_MAP.get("default"));
// 自动降级:如果估算的请求成本超过阈值,降到更小的模型
int estimatedInputTokens = estimateTokens(request.getMessages());
if (estimatedInputTokens < 500 &&
baseModel.equals("claude-opus-4-5")) {
// 短请求不需要最强模型
log.debug("请求较短,自动降级到 Sonnet 模型");
return "claude-sonnet-4-5";
}
return baseModel;
}
private boolean hasModelPermission(ServiceIdentity identity, String model) {
// 检查服务是否有权限使用指定模型
// 某些服务只能用便宜的模型,需要审批才能用贵的
return true; // 简化实现
}
private int estimateTokens(List<ChatMessage> messages) {
return messages.stream()
.mapToInt(m -> m.getContent().length() / 3)
.sum();
}
}下游服务如何调用 AI Gateway
// 在商品服务中引入 AI Gateway 客户端
@Component
public class AIGatewayClient {
private final RestTemplate restTemplate;
private final String gatewayBaseUrl;
private final String serviceToken;
public AIGatewayClient(@Value("${ai.gateway.url}") String gatewayBaseUrl,
@Value("${ai.gateway.service-token}") String serviceToken) {
this.gatewayBaseUrl = gatewayBaseUrl;
this.serviceToken = serviceToken;
this.restTemplate = new RestTemplate();
}
public String callAI(String scenarioCode, String prompt) {
AIGatewayRequest request = AIGatewayRequest.builder()
.requestId(UUID.randomUUID().toString())
.serviceId("product-service") // 写死服务ID
.scenarioCode(scenarioCode)
.messages(List.of(new ChatMessage("user", prompt)))
.options(AIGatewayRequest.AIRequestOptions.builder()
.maxTokens(2048)
.temperature(0.7)
.build())
.build();
HttpHeaders headers = new HttpHeaders();
headers.set("X-Service-Token", serviceToken);
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<AIGatewayRequest> entity = new HttpEntity<>(request, headers);
try {
ResponseEntity<AIGatewayResponse> response = restTemplate.postForEntity(
gatewayBaseUrl + "/api/v1/ai/chat",
entity,
AIGatewayResponse.class
);
if (response.getStatusCode().is2xxSuccessful() && response.getBody() != null) {
return response.getBody().getContent();
}
} catch (HttpClientErrorException e) {
if (e.getStatusCode().value() == 429) {
throw new AIQuotaExceededException("AI配额已用完,请稍后再试");
}
throw e;
}
return null;
}
}
// 商品服务的配置
# product-service application.yml
ai:
gateway:
url: http://ai-gateway-service
service-token: ${AI_GATEWAY_SERVICE_TOKEN} # 从环境变量注入,不写死成本统计和分摊
共享 AI 服务的一个核心价值是统一的成本视图:
@Service
public class AICostTracker {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 记录每次 AI 调用的成本
*/
public void recordCost(String serviceId, String scenarioCode,
String model, int inputTokens, int outputTokens) {
double cost = calculateCost(model, inputTokens, outputTokens);
String dailyKey = "ai:cost:" + serviceId + ":" +
LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE);
String scenarioKey = dailyKey + ":" + scenarioCode;
// 记录到 Redis Hash
redisTemplate.opsForHash().increment(dailyKey, "total_cost", (long)(cost * 100000));
redisTemplate.opsForHash().increment(dailyKey, "total_tokens",
(long)(inputTokens + outputTokens));
redisTemplate.opsForHash().increment(scenarioKey, "cost", (long)(cost * 100000));
redisTemplate.expire(dailyKey, 90, TimeUnit.DAYS);
redisTemplate.expire(scenarioKey, 90, TimeUnit.DAYS);
}
private double calculateCost(String model, int inputTokens, int outputTokens) {
// 各模型定价(每百万tokens,单位美元)
Map<String, double[]> pricing = Map.of(
"claude-haiku-3", new double[]{0.25, 1.25},
"claude-sonnet-4-5", new double[]{3.0, 15.0},
"claude-opus-4-5", new double[]{15.0, 75.0}
);
double[] modelPricing = pricing.getOrDefault(model, new double[]{3.0, 15.0});
return (inputTokens * modelPricing[0] + outputTokens * modelPricing[1]) / 1_000_000.0;
}
}总结
微服务架构下 AI 能力共享的核心价值在于治理,而不只是代码复用。
统一的 AI Gateway 解决了 API Key 安全、整体限流、成本分摊、模型路由这四个关键问题。代价是增加了一个服务依赖和网络跳转,以及需要额外维护这个服务。
什么时候值得建共享服务:超过 3 个服务有 AI 调用需求,且月 LLM 成本超过 $500,这时候共享服务的治理价值就开始体现了。
规模小的时候,共享服务是过度设计。规模大了之后,没有共享服务是技术债。你需要在合适的时机做这个决策。
