Spring AI与Spring Cloud:微服务体系下的AI集成最佳实践
2026/4/30大约 7 分钟
Spring AI与Spring Cloud:微服务体系下的AI集成最佳实践
适读人群:已有Spring Cloud微服务架构、需要接入AI能力的Java工程师 阅读时长:约20分钟
老周的微服务改造难题
老周在一家中型电商公司做架构师,系统是标准的Spring Cloud体系:Nacos注册中心、Gateway网关、十几个业务微服务。
今年公司要把AI能力接进来,老周把任务分给了几个团队。结果一个月后汇报:
- 订单服务团队:直接在OrderService里引入了Spring AI,调OpenAI
- 商品服务团队:也在ProductService里接了一套,调的通义千问
- 用户服务团队:用了个第三方AI SDK,绕开了Spring AI
老周看完汇报,脸有点绿。三个团队各自集成了三套不同的AI调用逻辑,没有统一的配置管理,没有统一的调用监控,API Key散落在各自的配置文件里。更要命的是,LLM调用都是同步的,一旦AI服务慢了,整个调用链路都会被拖垮。
他找到我,问:"我现在怎么办,是全部重写,还是有什么优雅的接入方式?"
我的答案是:微服务体系接入AI,需要专门的架构考量,不能简单地在每个服务里引个依赖就完事。
微服务AI集成的核心挑战
推荐架构:AI Gateway + 异步化
核心设计决策:
- 单独部署一个
AI Gateway Service,所有AI调用都通过它 - 短任务(<30秒)走同步HTTP;长任务走消息队列异步处理
- API Key统一存Nacos配置中心,各业务服务不持有任何密钥
核心代码实现
AI Gateway Service的Feign接口定义
每个业务服务通过Feign接口调用AI Gateway,完全不需要引入Spring AI依赖。
/**
* AI Gateway的OpenFeign客户端接口
* 由ai-gateway-starter提供,各业务服务引入依赖即可使用
*/
@FeignClient(
name = "ai-gateway-service",
configuration = AiGatewayFeignConfig.class,
fallbackFactory = AiGatewayFallbackFactory.class // 降级工厂
)
public interface AiGatewayClient {
/**
* 同步聊天接口(适合实时场景,建议超时<30秒)
*/
@PostMapping("/api/ai/chat")
AiChatResponse chat(@RequestBody AiChatRequest request);
/**
* 提交异步AI任务(适合耗时较长的任务)
* 返回taskId,通过回调或轮询获取结果
*/
@PostMapping("/api/ai/tasks")
AiTaskSubmitResponse submitTask(@RequestBody AiTaskRequest request);
/**
* 查询异步任务状态
*/
@GetMapping("/api/ai/tasks/{taskId}")
AiTaskStatusResponse getTaskStatus(@PathVariable String taskId);
/**
* 文本向量化
*/
@PostMapping("/api/ai/embeddings")
EmbeddingResponse embed(@RequestBody EmbeddingRequest request);
}
/**
* Feign降级工厂
* AI Gateway不可用时的降级处理
*/
@Component
@Slf4j
public class AiGatewayFallbackFactory implements FallbackFactory<AiGatewayClient> {
@Override
public AiGatewayClient create(Throwable cause) {
return new AiGatewayClient() {
@Override
public AiChatResponse chat(AiChatRequest request) {
log.error("AI Gateway不可用,执行降级: {}", cause.getMessage());
return AiChatResponse.fallback(
"AI服务暂时不可用,请稍后重试或联系人工客服");
}
@Override
public AiTaskSubmitResponse submitTask(AiTaskRequest request) {
log.error("AI Gateway不可用,任务提交失败: {}", cause.getMessage());
return AiTaskSubmitResponse.failed("AI服务暂时不可用");
}
@Override
public AiTaskStatusResponse getTaskStatus(String taskId) {
return AiTaskStatusResponse.failed(taskId, "AI服务暂时不可用");
}
@Override
public EmbeddingResponse embed(EmbeddingRequest request) {
log.error("向量化服务不可用: {}", cause.getMessage());
return EmbeddingResponse.empty();
}
};
}
}AI Gateway核心路由实现
/**
* AI Gateway核心控制器
* 处理所有AI请求的路由、限流、监控
*/
@RestController
@RequestMapping("/api/ai")
@RequiredArgsConstructor
@Slf4j
public class AiGatewayController {
private final ChatClient chatClient;
private final RateLimiterService rateLimiter;
private final TokenUsageTracker usageTracker;
private final TaskQueueService taskQueue;
private final MeterRegistry meterRegistry;
@PostMapping("/chat")
public ResponseEntity<AiChatResponse> chat(
@RequestBody @Validated AiChatRequest request,
@RequestHeader("X-Service-Id") String serviceId) {
// 按调用方服务做限流
RateLimitResult rl = rateLimiter.checkServiceLimit(serviceId);
if (!rl.isAllowed()) {
return ResponseEntity.status(429)
.header("Retry-After", String.valueOf(rl.getRetryAfterSeconds()))
.body(AiChatResponse.rateLimited(rl.getMessage()));
}
long startTime = System.currentTimeMillis();
// 记录指标
Counter.builder("ai.gateway.requests")
.tag("service", serviceId)
.tag("scenario", request.getScenarioCode())
.register(meterRegistry)
.increment();
try {
String response = chatClient.prompt()
.system(getSystemPrompt(request.getScenarioCode()))
.user(request.getMessage())
.call()
.content();
long latency = System.currentTimeMillis() - startTime;
// 上报用量(异步,不阻塞响应)
usageTracker.recordAsync(serviceId, request.getScenarioCode(), latency);
// 记录延迟指标
Timer.builder("ai.gateway.latency")
.tag("service", serviceId)
.register(meterRegistry)
.record(latency, TimeUnit.MILLISECONDS);
return ResponseEntity.ok(AiChatResponse.success(response, latency));
} catch (Exception e) {
log.error("AI调用失败: service={}, error={}", serviceId, e.getMessage());
meterRegistry.counter("ai.gateway.errors", "service", serviceId).increment();
throw e;
}
}
/**
* 异步任务接口
* 提交后立即返回taskId,结果通过回调推送
*/
@PostMapping("/tasks")
public ResponseEntity<AiTaskSubmitResponse> submitTask(
@RequestBody @Validated AiTaskRequest request,
@RequestHeader("X-Service-Id") String serviceId) {
String taskId = taskQueue.submit(serviceId, request);
return ResponseEntity.accepted()
.body(AiTaskSubmitResponse.submitted(taskId));
}
private String getSystemPrompt(String scenarioCode) {
// 从Nacos配置中心加载场景对应的Prompt
return promptConfigService.getPrompt(scenarioCode);
}
}异步任务处理(长任务的标准方案)
/**
* AI异步任务处理
* 用于耗时超过30秒的任务(文档总结、大批量处理等)
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class AiTaskProcessor {
private final ChatClient chatClient;
private final AiTaskRepository taskRepo;
private final WebhookNotifier webhookNotifier;
/**
* 消费MQ消息,处理AI任务
*/
@RabbitListener(queues = "ai.task.queue")
public void processTask(AiTaskMessage message) {
log.info("开始处理AI任务: taskId={}, serviceId={}",
message.getTaskId(), message.getServiceId());
// 更新任务状态为处理中
taskRepo.updateStatus(message.getTaskId(), TaskStatus.PROCESSING);
try {
String result = executeTask(message);
// 更新任务结果
taskRepo.updateResult(message.getTaskId(), result, TaskStatus.COMPLETED);
// 如果有回调地址,推送结果
if (message.getCallbackUrl() != null) {
webhookNotifier.notify(message.getCallbackUrl(),
AiTaskResult.success(message.getTaskId(), result));
}
log.info("AI任务完成: taskId={}", message.getTaskId());
} catch (Exception e) {
log.error("AI任务失败: taskId={}", message.getTaskId(), e);
taskRepo.updateStatus(message.getTaskId(), TaskStatus.FAILED);
if (message.getCallbackUrl() != null) {
webhookNotifier.notify(message.getCallbackUrl(),
AiTaskResult.failed(message.getTaskId(), e.getMessage()));
}
}
}
private String executeTask(AiTaskMessage message) {
return switch (message.getTaskType()) {
case DOCUMENT_SUMMARY -> summarizeDocument(message);
case BATCH_CLASSIFY -> batchClassify(message);
case KNOWLEDGE_EXTRACTION -> extractKnowledge(message);
default -> chatClient.prompt()
.user(message.getContent())
.call()
.content();
};
}
private String summarizeDocument(AiTaskMessage message) {
// 长文档分段处理
List<String> chunks = splitDocument(message.getContent(), 3000);
// 先分段摘要,再合并
List<String> chunkSummaries = chunks.stream()
.map(chunk -> chatClient.prompt()
.user("请对以下内容进行简洁摘要(200字以内):\n" + chunk)
.call()
.content())
.collect(Collectors.toList());
// 对所有摘要再做一次汇总
String allSummaries = String.join("\n\n", chunkSummaries);
return chatClient.prompt()
.user("以下是一篇文档各部分的摘要,请综合整理成一篇完整的摘要(500字以内):\n" + allSummaries)
.call()
.content();
}
private List<String> splitDocument(String content, int chunkSize) {
List<String> chunks = new ArrayList<>();
for (int i = 0; i < content.length(); i += chunkSize) {
chunks.add(content.substring(i, Math.min(i + chunkSize, content.length())));
}
return chunks;
}
}Nacos配置中心集成
# ai-gateway-service的application.yml
spring:
application:
name: ai-gateway-service
cloud:
nacos:
config:
server-addr: ${NACOS_ADDR:localhost:8848}
namespace: ${NACOS_NAMESPACE:dev}
group: AI_CONFIG
# 监听动态配置,修改Prompt无需重启
extension-configs:
- data-id: ai-prompts.yaml
group: AI_CONFIG
refresh: true
- data-id: ai-providers.yaml
group: AI_CONFIG
refresh: true
# 这些配置放在Nacos里,支持动态更新
# Nacos中的 ai-providers.yaml:
# spring:
# ai:
# openai:
# api-key: ${OPENAI_API_KEY}
# options:
# model: gpt-4o-mini/**
* 从Nacos动态加载Prompt配置
* 支持在不重启服务的情况下更新Prompt
*/
@Component
@RefreshScope // 支持Nacos配置变更时自动刷新
@RequiredArgsConstructor
@Slf4j
public class PromptConfigService {
@Value("${ai.prompts.customer-service:你是一个专业的客服助手}")
private String customerServicePrompt;
@Value("${ai.prompts.code-review:你是一个代码审查专家}")
private String codeReviewPrompt;
@Value("${ai.prompts.default:你是一个通用AI助手}")
private String defaultPrompt;
public String getPrompt(String scenarioCode) {
return switch (scenarioCode) {
case "customer_service" -> customerServicePrompt;
case "code_review" -> codeReviewPrompt;
default -> {
log.warn("未找到场景{}的Prompt配置,使用默认", scenarioCode);
yield defaultPrompt;
}
};
}
}分布式链路追踪集成
/**
* AI调用链路追踪
* 让每次AI调用都能在SkyWalking/Zipkin里追踪到
*/
@Aspect
@Component
@RequiredArgsConstructor
@Slf4j
public class AiTracingAspect {
private final Tracer tracer; // Micrometer Tracing
@Around("@annotation(AiTraceable)")
public Object traceAiCall(ProceedingJoinPoint pjp) throws Throwable {
Span span = tracer.nextSpan()
.name("ai.llm.call")
.tag("ai.service", "spring-ai")
.start();
try (Tracer.SpanInScope scope = tracer.withSpan(span)) {
Object result = pjp.proceed();
span.tag("ai.status", "success");
return result;
} catch (Exception e) {
span.tag("ai.status", "error");
span.tag("ai.error", e.getMessage());
throw e;
} finally {
span.end();
}
}
}微服务AI集成最佳实践清单
| 实践 | 重要性 | 说明 |
|---|---|---|
| 独立AI Gateway服务 | 高 | 统一入口,便于管控和监控 |
| API Key存配置中心 | 高 | 不在业务代码里硬编码密钥 |
| 同步/异步分离 | 高 | 长任务用MQ,避免同步超时 |
| Feign+降级工厂 | 高 | AI不可用时有兜底响应 |
| 熔断器(Resilience4j) | 高 | 防止级联失败 |
| 链路追踪 | 中 | 快速定位跨服务问题 |
| 按服务限流 | 中 | 防止某个服务独占AI资源 |
| 动态Prompt配置 | 中 | Nacos + @RefreshScope |
