AI应用的微服务拆分:单体AI应用的服务化演进路径
AI应用的微服务拆分:单体AI应用的服务化演进路径
date: 2026-09-18 tags: [微服务, 服务拆分, Spring AI, DDD, Java]
开篇故事:那个每次部署都要祈祷的下午
李明是某电商平台的架构师,2025年初他们花3个月把智能客服系统做成了一个单体应用:接入了GPT-4o的意图识别、本地部署了bge-m3的向量召回、集成了3套业务系统的工单处理。系统上线时效果很好,日均处理8万次对话,准确率91%。
但6个月后,噩梦开始了。
每次部署,整个系统需要停机20分钟。那个JAR包已经膨胀到847MB,JVM启动就要4分钟,健康检查又要等2分钟,数据预热还要14分钟。测试环境部署一次,产品经理在旁边抖腿;生产环境部署一次,全组人盯着监控,心跳漏半拍。
更痛苦的是,RAG召回服务需要调优——但整个系统都得重新部署。向量模型想换成text-embedding-3-large——还是整个系统重新部署。工单处理逻辑加个字段——依然整个系统重新部署。
"我们当时就是把AI能力堆到一个Spring Boot里,以为够用了。"李明后来复盘说,"没想到一年不到,这个东西就变成了无法动弹的巨石。"
拆分之后,向量服务独立部署只需要58秒,意图识别服务独立部署47秒,工单服务独立部署31秒。三个服务并行发布,总耗时不超过1分钟。那天下午第一次做到1分钟部署,整个团队在工位上鼓掌。
这篇文章就是李明团队走过的那条路。
一、什么时候该拆,什么时候不该拆
1.1 单体AI应用完全够用的场景
不是所有AI应用都需要微服务化。以下场景,保持单体是正确选择:
场景一:日均请求量低于10万次
单体应用在10万日均请求量以下,一台8核16G的服务器完全可以扛住,再加一台做主备,成本低、运维简单。微服务化引入的网络延迟、服务发现、分布式链路追踪,反而是额外负担。
场景二:团队规模小于5人
微服务的本质是组织架构的技术映射。5人以下的团队,沟通成本低,单体开发迭代更快。强行微服务化,每天光是服务间接口联调就能把人折腾死。
场景三:业务边界尚未稳定
AI应用早期,产品方向每周都可能变。这时候如果把边界划错,拆出来的微服务比单体还难改。建议至少等核心业务流程跑稳3个月再考虑拆分。
判断标准(量化):
单体可以继续的阈值:
- 代码行数 < 50,000 行
- 日均请求 < 100,000
- 团队人数 < 5
- 部署频率 < 每周2次
- 独立扩展需求 = 无1.2 必须拆分的信号
以下任何一条出现,就应该启动拆分计划:
信号一:部署时间超过10分钟且影响业务
AI应用的JAR包通常比普通业务应用大3-5倍(内嵌模型文件、向量库),启动时间天然更长。一旦部署窗口超过10分钟,业务容忍度就开始下降。
信号二:不同AI能力的扩容需求完全不同
向量检索是CPU密集型,需要大内存。流式对话是IO密集型,需要高并发。Agent编排需要长连接保持。这三种能力放在一起,水平扩容只能按最高配置买机器,浪费极大。
信号三:技术栈出现分歧
团队决定引入Python的LangChain做部分Agent逻辑,但主服务是Java——这时候多语言微服务是唯一出路。
信号四:不同服务有不同的SLA要求
核心对话必须P99 < 500ms,但知识库更新允许分钟级延迟。把这两个绑在一起,任何一个出问题都会拖累另一个。
拆分时机决策树:
二、DDD领域划分:AI应用的限界上下文识别
2.1 智能客服系统的领域分析
以李明团队的智能客服系统为例,做完整的DDD分析。
核心业务流程:
用户发来消息
→ 意图识别(是咨询/投诉/退款/闲聊?)
→ 知识检索(从FAQ/产品文档里找答案)
→ 答复生成(结合上下文生成回复)
→ 工单创建(需要人工处理时)
→ 对话记录(保存完整会话)事件风暴识别领域事件:
限界上下文划分(4个核心BC):
| 限界上下文 | 核心职责 | 主要模型 | 外部依赖 |
|---|---|---|---|
| 对话管理上下文 | 会话生命周期、消息路由 | Conversation, Message, Session | Redis(会话状态) |
| 智能理解上下文 | 意图识别、实体抽取、情感分析 | Intent, Entity, Sentiment | LLM API |
| 知识服务上下文 | 向量检索、文档管理、知识更新 | Document, Embedding, KnowledgeBase | 向量数据库 |
| 工单处理上下文 | 工单创建、分配、跟踪 | Ticket, Assignment, SLA | CRM系统 |
2.2 限界上下文的上下文映射
防腐层(ACL)设计原则:
对于上游的CRM系统和用户管理系统,不要让外部系统的数据模型污染自己的领域模型。用防腐层做翻译:
// 防腐层:将CRM的用户模型转换为本地域模型
@Component
public class UserContextAntiCorruptionLayer {
private final CrmUserClient crmUserClient;
public CustomerProfile translateCrmUser(String userId) {
CrmUserDto crmUser = crmUserClient.getUser(userId);
// 外部模型 → 本地领域模型,不让CRM的字段命名污染本域
return CustomerProfile.builder()
.customerId(crmUser.getUid()) // uid → customerId
.tier(mapTier(crmUser.getMemberLevel())) // memberLevel → Tier枚举
.hasActivePurchase(crmUser.getOrderCount() > 0)
.lastPurchaseDate(parseDate(crmUser.getLastOrderTime()))
.build();
}
private CustomerTier mapTier(String memberLevel) {
return switch (memberLevel) {
case "GOLD" -> CustomerTier.PREMIUM;
case "SILVER" -> CustomerTier.STANDARD;
default -> CustomerTier.BASIC;
};
}
}三、拆分策略:按AI能力维度划分服务
3.1 三个核心AI服务的职责边界
服务边界定义原则:
高内聚:同一服务内的功能,变更原因相同
低耦合:服务间只通过接口通信,不共享数据库
单一职责:每个服务只做一件事,能用一句话描述清楚Embedding服务(向量化服务):
- 职责:文本向量化、向量存储、相似度检索
- 变更原因:换embedding模型、优化检索算法、扩展向量维度
- 独立扩展需求:CPU密集,需要GPU加速
RAG服务(检索增强生成服务):
- 职责:知识召回、上下文组装、答复生成
- 变更原因:换LLM模型、优化召回策略、调整上下文窗口
- 独立扩展需求:IO密集,需要高并发处理
Agent服务(智能代理服务):
- 职责:多轮推理、工具调用、任务规划
- 变更原因:增加工具、优化推理链、调整Agent策略
- 独立扩展需求:长连接保持,需要有状态管理
3.2 服务拆分的渐进式路径
不要一次性大爆炸重写! 正确的拆分路径是绞杀者模式(Strangler Fig Pattern):
四、服务间通信:gRPC实现高性能AI微服务通信
4.1 为什么AI微服务选gRPC而不是REST
AI服务的通信特点:
- 向量数据是
float[]数组,JSON序列化效率极低(一个1536维向量JSON化后约6KB,Protobuf只需6.1KB但无需文本解析) - 流式响应需要服务端推送(gRPC天然支持Server Streaming)
- 内部服务调用,不需要HTTP的文本协议开销
性能对比测试数据(1536维向量,1000次调用):
| 协议 | 平均延迟 | P99延迟 | 吞吐量 | 带宽消耗 |
|---|---|---|---|---|
| REST/JSON | 12.3ms | 28.7ms | 4,200/s | 6.1MB/s |
| gRPC/Protobuf | 3.1ms | 7.2ms | 18,500/s | 1.8MB/s |
| 提升倍数 | 4x | 4x | 4.4x | 3.4x |
4.2 完整的Proto定义
创建ai-service.proto:
syntax = "proto3";
package com.laozhang.ai;
option java_package = "com.laozhang.ai.grpc";
option java_outer_classname = "AiServiceProto";
option java_multiple_files = true;
// =============================================
// Embedding服务:文本向量化
// =============================================
service EmbeddingService {
// 单文本向量化
rpc Embed(EmbedRequest) returns (EmbedResponse);
// 批量向量化
rpc BatchEmbed(BatchEmbedRequest) returns (BatchEmbedResponse);
// 相似度检索(流式返回)
rpc SearchSimilar(SearchRequest) returns (stream SearchResult);
}
message EmbedRequest {
string text = 1;
string model = 2; // 默认:bge-m3
int32 dimensions = 3; // 默认:1536
}
message EmbedResponse {
repeated float embedding = 1 [packed = true]; // 向量数据
string model = 2;
int32 usage_tokens = 3;
int64 latency_ms = 4;
}
message BatchEmbedRequest {
repeated string texts = 1;
string model = 2;
int32 batch_size = 3; // 默认:32
}
message BatchEmbedResponse {
repeated EmbedResponse embeddings = 1;
int32 total_tokens = 2;
}
message SearchRequest {
repeated float query_embedding = 1 [packed = true];
string collection = 2; // 向量集合名称
int32 top_k = 3; // 默认:5
float min_score = 4; // 最低相似度阈值
map<string, string> filters = 5; // 元数据过滤
}
message SearchResult {
string doc_id = 1;
string content = 2;
float score = 3;
map<string, string> metadata = 4;
}
// =============================================
// RAG服务:检索增强生成
// =============================================
service RagService {
// 标准RAG查询
rpc Query(RagRequest) returns (RagResponse);
// 流式RAG(边生成边返回)
rpc StreamQuery(RagRequest) returns (stream RagChunk);
}
message RagRequest {
string question = 1;
string session_id = 2;
repeated Message history = 3;
RagConfig config = 4;
}
message RagConfig {
int32 retrieval_top_k = 1; // 召回文档数
float min_relevance = 2; // 最低相关度
string llm_model = 3; // LLM模型
int32 max_tokens = 4; // 最大生成token数
bool rerank = 5; // 是否重排序
}
message Message {
string role = 1; // user/assistant/system
string content = 2;
int64 timestamp = 3;
}
message RagResponse {
string answer = 1;
repeated SourceDocument sources = 2;
int32 total_tokens = 3;
int64 latency_ms = 4;
string trace_id = 5;
}
message RagChunk {
string content = 1;
bool is_final = 2;
repeated SourceDocument sources = 3; // 仅在is_final=true时填充
}
message SourceDocument {
string doc_id = 1;
string title = 2;
string snippet = 3;
float relevance_score = 4;
string url = 5;
}
// =============================================
// Agent服务:智能代理编排
// =============================================
service AgentService {
// 执行Agent任务
rpc Execute(AgentRequest) returns (stream AgentEvent);
}
message AgentRequest {
string task = 1;
string session_id = 2;
repeated Tool available_tools = 3;
AgentConfig config = 4;
map<string, string> context = 5;
}
message Tool {
string name = 1;
string description = 2;
string parameters_schema = 3; // JSON Schema
}
message AgentConfig {
int32 max_iterations = 1; // 最大推理步骤
int32 timeout_seconds = 2;
string strategy = 3; // react/plan_and_execute/etc
}
message AgentEvent {
oneof event {
ThinkingEvent thinking = 1;
ToolCallEvent tool_call = 2;
ToolResultEvent tool_result = 3;
FinalAnswerEvent final_answer = 4;
ErrorEvent error = 5;
}
}
message ThinkingEvent {
string thought = 1;
int32 step = 2;
}
message ToolCallEvent {
string tool_name = 1;
string arguments = 2; // JSON
int32 step = 3;
}
message ToolResultEvent {
string tool_name = 1;
string result = 2;
bool success = 3;
}
message FinalAnswerEvent {
string answer = 1;
int32 total_steps = 2;
int32 total_tokens = 3;
}
message ErrorEvent {
string code = 1;
string message = 2;
bool retryable = 3;
}4.3 gRPC服务端实现(Embedding服务)
@GrpcService
@Slf4j
public class EmbeddingGrpcServiceImpl extends EmbeddingServiceGrpc.EmbeddingServiceImplBase {
private final EmbeddingModel embeddingModel;
private final VectorStore vectorStore;
private final MeterRegistry meterRegistry;
// 构造注入
public EmbeddingGrpcServiceImpl(
EmbeddingModel embeddingModel,
VectorStore vectorStore,
MeterRegistry meterRegistry) {
this.embeddingModel = embeddingModel;
this.vectorStore = vectorStore;
this.meterRegistry = meterRegistry;
}
@Override
public void embed(EmbedRequest request, StreamObserver<EmbedResponse> responseObserver) {
long startTime = System.currentTimeMillis();
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 参数验证
if (request.getText().isBlank()) {
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription("text cannot be empty")
.asRuntimeException()
);
return;
}
// 调用Spring AI的EmbeddingModel
EmbeddingResponse embeddingResponse = embeddingModel.embedForResponse(
List.of(request.getText())
);
float[] vector = embeddingResponse.getResults().get(0).getOutput();
// 构建响应
EmbedResponse.Builder responseBuilder = EmbedResponse.newBuilder();
for (float v : vector) {
responseBuilder.addEmbedding(v);
}
responseBuilder
.setModel(embeddingModel.getClass().getSimpleName())
.setUsageTokens(
embeddingResponse.getMetadata().getUsage() != null
? (int) embeddingResponse.getMetadata().getUsage().getTotalTokens()
: 0
)
.setLatencyMs(System.currentTimeMillis() - startTime);
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
// 记录指标
sample.stop(meterRegistry.timer("embedding.latency", "status", "success"));
} catch (Exception e) {
log.error("Embedding failed for text: {}",
request.getText().substring(0, Math.min(50, request.getText().length())), e);
sample.stop(meterRegistry.timer("embedding.latency", "status", "error"));
responseObserver.onError(
Status.INTERNAL
.withDescription("Embedding failed: " + e.getMessage())
.withCause(e)
.asRuntimeException()
);
}
}
@Override
public void batchEmbed(BatchEmbedRequest request,
StreamObserver<BatchEmbedResponse> responseObserver) {
try {
List<String> texts = request.getTextsList();
int batchSize = request.getBatchSize() > 0 ? request.getBatchSize() : 32;
BatchEmbedResponse.Builder batchBuilder = BatchEmbedResponse.newBuilder();
int totalTokens = 0;
// 分批处理,避免单次请求过大
for (int i = 0; i < texts.size(); i += batchSize) {
List<String> batch = texts.subList(i, Math.min(i + batchSize, texts.size()));
EmbeddingResponse response = embeddingModel.embedForResponse(batch);
for (Embedding embedding : response.getResults()) {
EmbedResponse.Builder embedBuilder = EmbedResponse.newBuilder();
for (float v : embedding.getOutput()) {
embedBuilder.addEmbedding(v);
}
batchBuilder.addEmbeddings(embedBuilder.build());
}
if (response.getMetadata().getUsage() != null) {
totalTokens += response.getMetadata().getUsage().getTotalTokens();
}
}
batchBuilder.setTotalTokens(totalTokens);
responseObserver.onNext(batchBuilder.build());
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Batch embedding failed", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()
);
}
}
@Override
public void searchSimilar(SearchRequest request,
StreamObserver<SearchResult> responseObserver) {
try {
// 构建查询
float[] queryEmbedding = new float[request.getEmbeddingCount()];
for (int i = 0; i < request.getEmbeddingCount(); i++) {
queryEmbedding[i] = request.getEmbedding(i);
}
// 使用Spring AI VectorStore搜索
SearchRequest searchRequest = SearchRequest.query("")
.withTopK(request.getTopK() > 0 ? request.getTopK() : 5)
.withSimilarityThreshold(
request.getMinScore() > 0 ? request.getMinScore() : 0.7f
);
List<Document> results = vectorStore.similaritySearch(searchRequest);
// 流式返回结果
for (Document doc : results) {
SearchResult result = SearchResult.newBuilder()
.setDocId(doc.getId())
.setContent(doc.getContent())
.setScore((float) doc.getMetadata().getOrDefault("score", 0.0))
.putAllMetadata(
doc.getMetadata().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> String.valueOf(e.getValue())
))
)
.build();
responseObserver.onNext(result);
}
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Vector search failed", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()
);
}
}
}4.4 gRPC客户端配置(在对话服务中调用)
@Configuration
public class GrpcClientConfig {
@Value("${ai.embedding.service.host:embedding-service}")
private String embeddingHost;
@Value("${ai.embedding.service.port:9090}")
private int embeddingPort;
@Value("${ai.rag.service.host:rag-service}")
private String ragHost;
@Value("${ai.rag.service.port:9091}")
private int ragPort;
@Bean
public EmbeddingServiceGrpc.EmbeddingServiceBlockingStub embeddingServiceStub() {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(embeddingHost, embeddingPort)
.usePlaintext()
// 配置重试策略
.defaultServiceConfig(buildRetryConfig())
// 连接池
.executor(Executors.newFixedThreadPool(20))
// 最大消息大小(向量数据较大)
.maxInboundMessageSize(10 * 1024 * 1024) // 10MB
.build();
return EmbeddingServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(5, TimeUnit.SECONDS);
}
@Bean
public RagServiceGrpc.RagServiceStub ragServiceAsyncStub() {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(ragHost, ragPort)
.usePlaintext()
.maxInboundMessageSize(20 * 1024 * 1024)
.build();
// 异步Stub用于流式调用
return RagServiceGrpc.newStub(channel);
}
private Map<String, Object> buildRetryConfig() {
return Map.of(
"methodConfig", List.of(Map.of(
"name", List.of(Map.of()),
"retryPolicy", Map.of(
"maxAttempts", 3,
"initialBackoff", "0.5s",
"maxBackoff", "5s",
"backoffMultiplier", 2.0,
"retryableStatusCodes", List.of("UNAVAILABLE", "DEADLINE_EXCEEDED")
)
))
);
}
}
// 对话服务调用Embedding服务的示例
@Service
@Slf4j
public class ConversationService {
private final EmbeddingServiceGrpc.EmbeddingServiceBlockingStub embeddingStub;
private final RagServiceGrpc.RagServiceStub ragStub;
public ConversationResponse processMessage(String sessionId, String userMessage) {
// Step 1: 向量化用户问题
EmbedRequest embedRequest = EmbedRequest.newBuilder()
.setText(userMessage)
.setModel("bge-m3")
.build();
EmbedResponse embedResponse = embeddingStub.embed(embedRequest);
log.debug("Embedded in {}ms, tokens: {}",
embedResponse.getLatencyMs(), embedResponse.getUsageTokens());
// Step 2: 流式调用RAG服务
RagRequest ragRequest = RagRequest.newBuilder()
.setQuestion(userMessage)
.setSessionId(sessionId)
.setConfig(RagConfig.newBuilder()
.setRetrievalTopK(5)
.setMinRelevance(0.7f)
.setLlmModel("gpt-4o")
.setMaxTokens(2048)
.setRerank(true)
.build())
.build();
StringBuilder answerBuilder = new StringBuilder();
CountDownLatch latch = new CountDownLatch(1);
List<SourceDocument> sources = new ArrayList<>();
ragStub.streamQuery(ragRequest, new StreamObserver<RagChunk>() {
@Override
public void onNext(RagChunk chunk) {
answerBuilder.append(chunk.getContent());
if (chunk.getIsFinal()) {
sources.addAll(chunk.getSourcesList());
}
}
@Override
public void onError(Throwable t) {
log.error("RAG streaming failed for session: {}", sessionId, t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return ConversationResponse.builder()
.answer(answerBuilder.toString())
.sources(sources)
.sessionId(sessionId)
.build();
}
}五、数据所有权:拆分后每个服务独立数据库
5.1 数据库拆分原则
微服务最核心的规则:不允许跨服务直接查询数据库。
5.2 各服务数据库选型
对话服务:MySQL(主从)
-- 会话表
CREATE TABLE conversations (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(64) NOT NULL,
channel ENUM('web','app','wechat') NOT NULL,
status ENUM('active','ended','archived') DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
metadata JSON,
INDEX idx_user_id (user_id),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 消息表(按月分区)
CREATE TABLE messages (
id VARCHAR(36) NOT NULL,
conversation_id VARCHAR(36) NOT NULL,
role ENUM('user','assistant','system') NOT NULL,
content TEXT NOT NULL,
token_count INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, created_at),
INDEX idx_conversation_id (conversation_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
PARTITION BY RANGE (UNIX_TIMESTAMP(created_at)) (
PARTITION p202601 VALUES LESS THAN (UNIX_TIMESTAMP('2026-02-01')),
PARTITION p202602 VALUES LESS THAN (UNIX_TIMESTAMP('2026-03-01')),
PARTITION p202603 VALUES LESS THAN (UNIX_TIMESTAMP('2026-04-01')),
PARTITION p_max VALUES LESS THAN MAXVALUE
);知识服务:PostgreSQL + pgvector
-- 启用向量扩展
CREATE EXTENSION IF NOT EXISTS vector;
-- 知识库表
CREATE TABLE knowledge_bases (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(200) NOT NULL,
description TEXT,
embedding_model VARCHAR(100) DEFAULT 'bge-m3',
dimensions INT DEFAULT 1536,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 文档表
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
kb_id UUID NOT NULL REFERENCES knowledge_bases(id),
title VARCHAR(500) NOT NULL,
content TEXT NOT NULL,
embedding vector(1536), -- pgvector类型
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 创建HNSW索引(高性能向量检索)
CREATE INDEX ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- 创建全文检索索引(混合检索用)
CREATE INDEX ON documents USING GIN (to_tsvector('chinese', content));5.3 跨服务数据一致性:CQRS模式
当工单服务需要展示对话上下文时,不能直接查对话库。采用CQRS模式,对话服务通过事件推送只读视图:
// 对话服务发布事件
@Service
public class ConversationEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
@EventListener
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onConversationEnded(ConversationEndedEvent event) {
ConversationSummaryEvent summaryEvent = ConversationSummaryEvent.builder()
.conversationId(event.getConversationId())
.userId(event.getUserId())
.summary(event.getSummary()) // 已由AI生成的摘要
.messageCount(event.getMessageCount())
.resolvedStatus(event.isResolved())
.endedAt(event.getEndedAt())
.build();
kafkaTemplate.send("conversation.ended",
event.getConversationId(), summaryEvent);
}
}
// 工单服务订阅事件,维护本地只读副本
@Service
@KafkaListener(topics = "conversation.ended")
@Slf4j
public class ConversationProjectionService {
private final ConversationSnapshotRepository snapshotRepository;
public void handleConversationEnded(ConversationSummaryEvent event) {
// 在工单服务的DB里维护对话摘要快照
ConversationSnapshot snapshot = ConversationSnapshot.builder()
.conversationId(event.getConversationId())
.userId(event.getUserId())
.summary(event.getSummary())
.messageCount(event.getMessageCount())
.resolvedStatus(event.isResolvedStatus())
.endedAt(event.getEndedAt())
.build();
snapshotRepository.save(snapshot);
log.info("Conversation snapshot saved: {}", event.getConversationId());
}
}六、服务发现:Nacos注册AI微服务
6.1 Nacos配置
Nacos Server部署(docker-compose):
version: '3.8'
services:
nacos:
image: nacos/nacos-server:v2.3.2
container_name: nacos
environment:
- MODE=standalone
- PREFER_HOST_MODE=hostname
- NACOS_AUTH_ENABLE=true
- NACOS_AUTH_TOKEN=SecretKey012345678901234567890123456789012345678901234567890123456789
- SPRING_DATASOURCE_PLATFORM=mysql
- MYSQL_SERVICE_HOST=mysql
- MYSQL_SERVICE_PORT=3306
- MYSQL_SERVICE_DB_NAME=nacos_config
- MYSQL_SERVICE_USER=nacos
- MYSQL_SERVICE_PASSWORD=nacos_password
ports:
- "8848:8848"
- "9848:9848"
volumes:
- ./nacos/logs:/home/nacos/logs各AI服务的Nacos配置(application.yml):
# embedding-service/src/main/resources/application.yml
spring:
application:
name: embedding-service
cloud:
nacos:
discovery:
server-addr: nacos:8848
namespace: ai-prod # 命名空间隔离
group: AI_SERVICE_GROUP
metadata:
version: v2.1.0
grpc-port: "9090" # gRPC端口信息注册到元数据
gpu-enabled: "true"
model: "bge-m3"
config:
server-addr: nacos:8848
namespace: ai-prod
group: AI_SERVICE_GROUP
file-extension: yaml
# 配置热更新
refresh-enabled: true
server:
port: 8080 # HTTP端口(健康检查用)
grpc:
server:
port: 9090 # gRPC端口
# 健康检查
management:
endpoints:
web:
exposure:
include: health,info,metrics
endpoint:
health:
show-details: always# rag-service/src/main/resources/application.yml
spring:
application:
name: rag-service
cloud:
nacos:
discovery:
server-addr: nacos:8848
namespace: ai-prod
group: AI_SERVICE_GROUP
metadata:
version: v1.8.0
grpc-port: "9091"
llm-model: "gpt-4o"
grpc:
server:
port: 90916.2 gRPC服务通过Nacos动态发现
@Configuration
public class NacosGrpcDiscoveryConfig {
@Autowired
private NacosDiscoveryClient discoveryClient;
@Bean
public EmbeddingServiceGrpc.EmbeddingServiceBlockingStub embeddingStub() {
// 使用Nacos动态获取服务地址,而非硬编码
ManagedChannel channel = ManagedChannelBuilder
.forTarget("nacos:///embedding-service") // 通过Nacos服务名寻址
.usePlaintext()
.nameResolverFactory(new NacosNameResolverProvider(discoveryClient))
.defaultLoadBalancingPolicy("round_robin")
.build();
return EmbeddingServiceGrpc.newBlockingStub(channel);
}
}
// Nacos名称解析器(让gRPC通过Nacos做服务发现)
public class NacosNameResolverProvider extends NameResolverProvider {
private final NacosDiscoveryClient discoveryClient;
@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
String serviceName = targetUri.getHost();
return new NacosNameResolver(serviceName, discoveryClient);
}
private static class NacosNameResolver extends NameResolver {
private final String serviceName;
private final NacosDiscoveryClient discoveryClient;
private Listener2 listener;
private ScheduledFuture<?> refreshTask;
@Override
public void start(Listener2 listener) {
this.listener = listener;
refresh();
// 每30秒刷新一次服务列表
refreshTask = Executors.newScheduledThreadPool(1)
.scheduleAtFixedRate(this::refresh, 30, 30, TimeUnit.SECONDS);
}
private void refresh() {
List<ServiceInstance> instances =
discoveryClient.getInstances(serviceName);
if (instances.isEmpty()) {
listener.onError(Status.UNAVAILABLE
.withDescription("No instances for: " + serviceName));
return;
}
// 从Nacos元数据里获取gRPC端口
List<EquivalentAddressGroup> addresses = instances.stream()
.map(instance -> {
int grpcPort = Integer.parseInt(
instance.getMetadata().getOrDefault("grpc-port", "9090")
);
return new EquivalentAddressGroup(
new InetSocketAddress(instance.getHost(), grpcPort)
);
})
.collect(Collectors.toList());
listener.onResult(ResolutionResult.newBuilder()
.setAddresses(addresses)
.build());
}
@Override
public void shutdown() {
if (refreshTask != null) refreshTask.cancel(false);
}
}
}七、API Gateway:Spring Cloud Gateway聚合AI服务
7.1 Gateway配置
# gateway-service/src/main/resources/application.yml
spring:
application:
name: ai-gateway
cloud:
nacos:
discovery:
server-addr: nacos:8848
namespace: ai-prod
gateway:
discovery:
locator:
enabled: false # 关闭自动路由,使用精细配置
routes:
# 对话服务路由
- id: conversation-service
uri: lb://conversation-service
predicates:
- Path=/api/v1/conversations/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 100
redis-rate-limiter.burstCapacity: 200
key-resolver: "#{@userKeyResolver}"
- name: CircuitBreaker
args:
name: conversationCB
fallbackUri: forward:/fallback/conversation
- AddRequestHeader=X-Gateway-Version, v2
- name: Retry
args:
retries: 2
statuses: BAD_GATEWAY,SERVICE_UNAVAILABLE
# RAG查询路由(允许较大响应体)
- id: rag-service
uri: lb://rag-service
predicates:
- Path=/api/v1/knowledge/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 50
redis-rate-limiter.burstCapacity: 100
key-resolver: "#{@userKeyResolver}"
- name: CircuitBreaker
args:
name: ragCB
fallbackUri: forward:/fallback/rag
# 管理接口路由(需要鉴权)
- id: admin-routes
uri: lb://conversation-service
predicates:
- Path=/api/admin/**
- Header=X-Admin-Token, .+
filters:
- AdminAuthFilter
# 全局过滤器配置
default-filters:
- name: Logging
- name: RequestId
- DedupeResponseHeader=Access-Control-Allow-Credentials Access-Control-Allow-Origin
globalcors:
cors-configurations:
'[/**]':
allowedOrigins: "*"
allowedMethods: "*"
allowedHeaders: "*"7.2 自定义限流和鉴权过滤器
@Component
@Slf4j
public class JwtAuthGatewayFilter implements GlobalFilter, Ordered {
private final JwtTokenService jwtTokenService;
// 白名单路径(不需要鉴权)
private static final List<String> WHITE_LIST = List.of(
"/api/v1/auth/login",
"/api/v1/auth/register",
"/actuator/health"
);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String path = exchange.getRequest().getPath().value();
// 白名单直接放行
if (WHITE_LIST.stream().anyMatch(path::startsWith)) {
return chain.filter(exchange);
}
String token = extractToken(exchange.getRequest());
if (token == null) {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
return jwtTokenService.validateToken(token)
.flatMap(claims -> {
// 将用户信息传递给下游服务
ServerHttpRequest mutatedRequest = exchange.getRequest()
.mutate()
.header("X-User-Id", claims.getSubject())
.header("X-User-Roles", String.join(",", claims.getRoles()))
.build();
return chain.filter(exchange.mutate().request(mutatedRequest).build());
})
.onErrorResume(e -> {
log.warn("JWT validation failed: {}", e.getMessage());
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
});
}
@Override
public int getOrder() {
return -100; // 最高优先级
}
private String extractToken(ServerHttpRequest request) {
List<String> authHeaders = request.getHeaders().get("Authorization");
if (authHeaders != null && !authHeaders.isEmpty()) {
String bearer = authHeaders.get(0);
if (bearer.startsWith("Bearer ")) {
return bearer.substring(7);
}
}
return null;
}
}
// 降级处理控制器
@RestController
@RequestMapping("/fallback")
public class GatewayFallbackController {
@GetMapping("/conversation")
public ResponseEntity<Map<String, Object>> conversationFallback(ServerWebExchange exchange) {
return ResponseEntity
.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(Map.of(
"code", "SERVICE_UNAVAILABLE",
"message", "对话服务暂时不可用,请稍后重试",
"timestamp", System.currentTimeMillis()
));
}
@GetMapping("/rag")
public ResponseEntity<Map<String, Object>> ragFallback() {
return ResponseEntity
.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(Map.of(
"code", "SERVICE_UNAVAILABLE",
"message", "知识查询服务暂时不可用,已记录您的请求,稍后将为您回复",
"timestamp", System.currentTimeMillis()
));
}
}八、分布式事务:跨服务AI操作的Saga模式
8.1 场景:用户发起退款,需要跨服务协调
用户投诉 → 对话服务记录投诉会话
→ 工单服务创建退款工单
→ 积分服务扣减积分(补偿用户)
→ 通知服务发送短信如果工单创建成功但积分扣减失败,需要补偿(回滚工单)。
8.2 Saga编排模式实现
// Saga编排器
@Service
@Slf4j
public class RefundSagaOrchestrator {
private final TicketService ticketService;
private final PointService pointService;
private final NotificationService notificationService;
private final SagaStateRepository sagaStateRepository;
@Transactional
public SagaResult executeRefundSaga(RefundSagaContext context) {
String sagaId = UUID.randomUUID().toString();
// 记录Saga状态(用于故障恢复)
SagaState sagaState = SagaState.builder()
.sagaId(sagaId)
.sagaType("REFUND")
.status(SagaStatus.STARTED)
.context(JsonUtils.toJson(context))
.build();
sagaStateRepository.save(sagaState);
try {
// Step 1: 创建工单
String ticketId = executeStep(sagaId, "CREATE_TICKET", () -> {
CreateTicketCommand cmd = new CreateTicketCommand(
context.getUserId(),
context.getOrderId(),
context.getReason()
);
return ticketService.createTicket(cmd);
});
// Step 2: 补偿积分
try {
executeStep(sagaId, "COMPENSATE_POINTS", () -> {
CompensatePointsCommand cmd = new CompensatePointsCommand(
context.getUserId(),
context.getRefundAmount() / 100 // 1元=1积分
);
return pointService.addPoints(cmd);
});
} catch (Exception e) {
// Step 2失败:补偿Step 1
log.error("Point compensation failed, rolling back ticket: {}", ticketId, e);
compensateStep(sagaId, "CANCEL_TICKET", () -> {
ticketService.cancelTicket(ticketId, "积分补偿失败,自动取消");
});
updateSagaStatus(sagaId, SagaStatus.COMPENSATED);
return SagaResult.failed(sagaId, "积分补偿失败");
}
// Step 3: 发送通知(允许失败,不需要补偿)
try {
executeStep(sagaId, "SEND_NOTIFICATION", () -> {
notificationService.sendSms(context.getUserId(),
"您的退款申请已受理,工单号:" + ticketId);
return ticketId;
});
} catch (Exception e) {
// 通知失败不影响主流程
log.warn("Notification failed for saga: {}, error: {}", sagaId, e.getMessage());
}
updateSagaStatus(sagaId, SagaStatus.COMPLETED);
return SagaResult.success(sagaId, ticketId);
} catch (Exception e) {
updateSagaStatus(sagaId, SagaStatus.FAILED);
log.error("Saga failed: {}", sagaId, e);
return SagaResult.failed(sagaId, e.getMessage());
}
}
private <T> T executeStep(String sagaId, String stepName, Supplier<T> action) {
SagaStep step = SagaStep.builder()
.sagaId(sagaId)
.stepName(stepName)
.status(StepStatus.EXECUTING)
.startedAt(Instant.now())
.build();
sagaStateRepository.saveStep(step);
try {
T result = action.get();
sagaStateRepository.updateStepStatus(sagaId, stepName,
StepStatus.COMPLETED, JsonUtils.toJson(result));
return result;
} catch (Exception e) {
sagaStateRepository.updateStepStatus(sagaId, stepName, StepStatus.FAILED, null);
throw e;
}
}
private void compensateStep(String sagaId, String stepName, Runnable compensation) {
try {
compensation.run();
sagaStateRepository.updateStepStatus(sagaId, stepName, StepStatus.COMPENSATED, null);
} catch (Exception e) {
log.error("Compensation failed: {} - {}", sagaId, stepName, e);
// 补偿失败需要人工介入,发送告警
sagaStateRepository.updateStepStatus(sagaId, stepName, StepStatus.COMPENSATION_FAILED, null);
}
}
private void updateSagaStatus(String sagaId, SagaStatus status) {
sagaStateRepository.updateStatus(sagaId, status);
}
}
// Saga状态实体
@Entity
@Table(name = "saga_states")
@Data
@Builder
public class SagaState {
@Id
private String sagaId;
private String sagaType;
@Enumerated(EnumType.STRING)
private SagaStatus status;
@Column(columnDefinition = "TEXT")
private String context;
private Instant createdAt;
private Instant updatedAt;
}@Entity
@Table(name = "saga_steps")
@Data
@Builder
public class SagaStep {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String sagaId;
private String stepName;
@Enumerated(EnumType.STRING)
private StepStatus status;
@Column(columnDefinition = "TEXT")
private String result;
private Instant startedAt;
private Instant completedAt;
}九、拆分后的测试策略:服务间契约测试
9.1 为什么需要契约测试
微服务的核心问题:接口变更导致调用方崩溃。
传统做法:集成测试(慢、环境复杂、排查难) 契约测试:消费者定义期望 → Provider验证能否满足(快、独立、明确)
9.2 Pact契约测试实现
消费者端(对话服务)定义契约:
@ExtendWith(PactConsumerTestExt.class)
@PactTestFor(providerName = "embedding-service")
class EmbeddingServiceContractTest {
@Pact(consumer = "conversation-service")
public RequestResponsePact embedTextPact(PactDslWithProvider builder) {
return builder
.given("embedding service is available")
.uponReceiving("embed a text")
.path("/api/v1/embed")
.method("POST")
.headers("Content-Type", "application/json")
.body(new PactDslJsonBody()
.stringType("text", "退款申请")
.stringType("model", "bge-m3"))
.willRespondWith()
.status(200)
.headers(Map.of("Content-Type", "application/json"))
.body(new PactDslJsonBody()
.minArrayLike("embedding", 1,
new PactDslJsonRootValue().numberType(0.1), 1536)
.stringType("model", "bge-m3")
.integerType("usageTokens", 5)
.integerType("latencyMs", 50))
.toPact();
}
@Test
@PactTestFor(pactMethod = "embedTextPact")
void testEmbedding(MockServer mockServer) {
EmbeddingClient client = new EmbeddingClient(mockServer.getUrl());
EmbedResult result = client.embed("退款申请", "bge-m3");
assertThat(result.getEmbedding()).hasSize(1536);
assertThat(result.getModel()).isEqualTo("bge-m3");
assertThat(result.getUsageTokens()).isGreaterThan(0);
}
}Provider端(Embedding服务)验证契约:
@Provider("embedding-service")
@PactBroker(url = "https://pact-broker.internal.com")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class EmbeddingServiceProviderVerificationTest {
@LocalServerPort
private int port;
@MockBean
private EmbeddingModel embeddingModel;
@BeforeEach
void setupTestTarget(PactVerificationContext context) {
context.setTarget(new HttpTestTarget("localhost", port));
}
@State("embedding service is available")
void embeddingServiceAvailable() {
// 准备测试数据:Mock EmbeddingModel
float[] mockVector = new float[1536];
Arrays.fill(mockVector, 0.1f);
Embedding mockEmbedding = new Embedding(mockVector, 0);
EmbeddingResponse mockResponse = new EmbeddingResponse(
List.of(mockEmbedding),
new EmbeddingResponseMetadata()
);
when(embeddingModel.embedForResponse(any())).thenReturn(mockResponse);
}
@TestTemplate
@ExtendWith(PactVerificationInvocationContextProvider.class)
void verifyPact(PactVerificationContext context) {
context.verifyInteraction();
}
}9.3 测试分层建议
pyramid
title AI微服务测试金字塔
"端到端测试 (E2E)" : 5%
"契约测试 (Pact)" : 15%
"集成测试 (Testcontainers)" : 30%
"单元测试 (JUnit + Mockito)" : 50%各层测试执行时间目标:
| 测试层 | 数量 | 执行时间 | CI阶段 |
|---|---|---|---|
| 单元测试 | 500+ | < 30s | PR提交时 |
| 集成测试 | 50+ | < 3min | PR合并时 |
| 契约测试 | 30+ | < 1min | PR合并时 |
| E2E测试 | 10+ | < 10min | 每日凌晨 |
十、性能数据:拆分前后对比
10.1 关键指标对比
| 指标 | 单体 | 微服务 | 提升 |
|---|---|---|---|
| 部署时间 | 20min | 1min | 20x |
| 启动时间(冷启动) | 4min | 45s(平均) | 5.3x |
| 内存占用(单实例) | 8GB | 2GB(平均) | 4x |
| 向量服务独立扩容成本 | 无法独立扩 | 可单独扩3倍 | — |
| P99响应时间 | 820ms | 210ms | 3.9x |
| 系统可用性 | 99.1% | 99.95% | +0.85% |
10.2 分布式追踪配置
# 所有AI服务共同配置:SkyWalking链路追踪
management:
tracing:
sampling:
probability: 0.1 # 生产环境采样10%
zipkin:
tracing:
endpoint: http://zipkin:9411/api/v2/spans
# gRPC链路传播配置
grpc:
server:
interceptors:
- com.laozhang.ai.tracing.TracingServerInterceptor
client:
interceptors:
- com.laozhang.ai.tracing.TracingClientInterceptor常见问题 FAQ
Q1:拆分后网络延迟增加怎么办?
A:将同机房内服务调用延迟控制在1ms以内(gRPC内网延迟通常 < 0.5ms)。对延迟敏感的同步调用使用gRPC,对延迟不敏感的用Kafka异步解耦。实测拆分后整体P99反而下降,原因是独立扩容避免了资源竞争。
Q2:微服务数量多了之后,本地开发怎么办?
A:用Docker Compose启动核心依赖(Nacos/Redis/Kafka),其他服务用Testcontainers或WireMock mock掉。不需要在本地启动全部服务。推荐使用Tilt或Skaffold做本地开发环境管理。
Q3:AI服务的配置(如LLM API Key)怎么在多服务间管理?
A:所有密钥存在Nacos Config,按命名空间隔离(dev/staging/prod)。敏感密钥额外加密(Jasypt)。每个服务只能读取自己命名空间下的配置,不能跨服务读。
Q4:服务拆分后,调试一个请求的链路变得复杂怎么办?
A:必须从Day 1就引入分布式追踪(推荐SkyWalking或Jaeger)。每个请求生成全局唯一TraceId,通过gRPC Header和Kafka消息Header传递。日志里必须打印TraceId,这样可以跨服务聚合日志。
Q5:多少个微服务是合适的数量?
A:没有固定答案,原则是"每个服务对应一个两个披萨的团队"(亚马逊定义)。AI应用建议以4-6个核心服务为起点,按需增加。服务太多会增加运维复杂度,一般建议每个技术团队负责不超过5个服务。
总结
AI应用的微服务化不是一个技术问题,本质上是一个组织问题——清晰的边界来自清晰的职责划分。
核心步骤回顾:
- 时机判断:部署 > 10min、日均 > 100万次、团队 > 10人,再拆
- DDD建模:用事件风暴识别限界上下文,先建模再拆代码
- 渐进拆分:绞杀者模式,不要大爆炸重写
- gRPC通信:内部AI服务用gRPC,比REST快4倍
- 数据独立:每服务一个库,用事件同步跨服务数据
- Saga事务:跨服务操作用Saga模式,记录每一步状态
- 契约测试:Consumer驱动的契约测试,防接口破坏
李明的团队用了12周完成拆分,这12周每一步都战战兢兢,但结果是值得的——1分钟部署的那个下午,全组鼓掌,不是因为技术有多酷,而是因为终于不用再祈祷了。
