AI应用架构演进:从单体到分布式AI系统的迁移路径
2026/10/11大约 12 分钟架构演进单体到微服务Spring AIJava迁移
AI应用架构演进:从单体到分布式AI系统的迁移路径
开篇故事:张扬的"成长的烦恼"
2025年初,某AI写作辅助工具的创始人兼技术负责人张扬,站在一个让他既痛苦又兴奋的时刻:
产品发展到了月活50万用户,但原来一周内搭建的单体架构撑不住了。
用户反映:
- 高峰期响应慢(P99延迟>8秒)
- 偶尔"全站崩溃"(单体一挂全挂)
- 某些功能上线后影响到了看似不相关的其他功能
他们的单体Spring Boot应用里,混杂着:
- AI文章生成
- AI写作建议(流式)
- AI配图推荐
- AI SEO优化
- 用户管理
- 内容审核
- 计费系统
所有功能共享一个数据库,一套部署。
迁移到微服务势在必行,但挑战也是巨大的:
- 不能停服迁移(用户等不起)
- AI服务的会话状态需要保持(不能简单拆分)
- 部分功能的调用链路很深
- 团队只有8个工程师
张扬的团队用了8个月完成了迁移:
- P99延迟从8秒降到1.2秒
- 系统可用性从99.2%提升到99.97%
- 发版频率从每月2次提升到每天多次
- 团队大小没变,人效却提升了3倍
这就是AI系统架构演进的价值。本文将带你学习他们的完整迁移路径。
TL;DR
- AI单体的典型问题:AI服务抢占资源、会话状态难以扩展、发布相互影响
- Strangler Fig模式:新功能直接在新服务中实现,老功能逐步迁移
- 流量平滑切换:通过API网关的权重路由,灰度切换流量
- AI状态管理:会话记忆的外置化(Redis)是拆分的前提
- 回滚保障:Feature Flag + 双写,任何时候都能回滚
一、单体AI应用的痛点分析
1.1 AI单体的典型资源竞争
单体AI应用资源竞争示意:
用户请求 → 单体服务
├── 文章生成(CPU密集型,占用2s CPU)
│ └── 阻塞了...
├── 写作建议(高内存,加载上下文)
│ └── OOM了...
├── 配图推荐(调外部API,IO等待)
│ └── 线程等待...
└── 用户注册(快速,0.01s)
└── 也慢了!(线程池满)
问题:
- AI推理占满线程池 → 用户注册也变慢
- 上下文缓存占满内存 → OOM导致全站崩溃
- 某个AI功能更新 → 需要重启整个服务1.2 迁移的核心挑战
| 挑战 | 普通微服务迁移 | AI服务特有挑战 |
|---|---|---|
| 状态管理 | 无状态API | AI对话有会话历史 |
| 数据分离 | 按业务领域切 | AI向量数据与业务数据混杂 |
| 性能 | 提取快速路径 | AI推理是慢路径,需要异步 |
| 版本兼容 | 接口版本控制 | 提示词变更影响行为 |
| 测试 | 确定性测试 | AI输出非确定,测试复杂 |
二、迁移策略:Strangler Fig模式
2.1 Strangler Fig的核心思路
Strangler Fig(绞杀藤)模式:
第1步:在单体之外创建新服务
┌─────────────┐
│ 单体应用 │ ← 所有流量
└─────────────┘
第2步:部分流量路由到新服务(灰度)
┌─────────────┐
API网关─►│ 单体应用 │ ← 90%流量
└─────────────┘
┌─────────────┐
─►│ 新微服务 │ ← 10%流量
└─────────────┘
第3步:逐步切换,最终停用单体中的旧实现
┌─────────────┐
│ 单体应用 │ ← 0%流量(最终下线)
└─────────────┘
┌─────────────┐
API网关─►│ 新微服务 │ ← 100%流量
└─────────────┘2.2 迁移顺序策略
对于AI单体,推荐的拆分顺序:
优先级1(最先拆分):
└── 无状态、高频的AI服务
├── AI文章生成(无状态,调用量大)
└── AI配图推荐(完全无状态)
优先级2(其次拆分):
└── 有状态但可以外置化的AI服务
└── AI对话(会话历史迁移到Redis)
优先级3(最后拆分):
└── 与数据库耦合深的服务
└── AI内容审核(需要访问用户数据)
暂不拆分:
└── 非核心服务
└── 用户管理(AI业务无关,暂维持在单体)三、第一阶段:AI状态外置化
在拆分服务之前,必须先解决状态问题:
3.1 将AI会话历史迁移到Redis
// 迁移前:会话历史存在内存/本地缓存中
@Service
public class OldChatService {
// ❌ 本地缓存:无法在多个实例间共享
private final Map<String, List<Message>> sessionCache = new ConcurrentHashMap<>();
public String chat(String sessionId, String userMessage) {
List<Message> history = sessionCache.getOrDefault(sessionId, new ArrayList<>());
history.add(new UserMessage(userMessage));
String response = chatClient.prompt()
.messages(history)
.call()
.content();
history.add(new AssistantMessage(response));
sessionCache.put(sessionId, history);
return response;
}
}
// 迁移后:使用Redis存储会话历史
@Service
@Slf4j
public class NewChatService {
private final ChatClient chatClient;
private final RedisTemplate<String, Object> redisTemplate;
private final ObjectMapper objectMapper;
@Value("${ai.session.ttl-minutes:60}")
private int sessionTtlMinutes;
public String chat(String sessionId, String userMessage) {
// 从Redis加载历史
List<Message> history = loadHistory(sessionId);
history.add(new UserMessage(userMessage));
String response = chatClient.prompt()
.messages(history)
.call()
.content();
history.add(new AssistantMessage(response));
// 保存到Redis(TTL自动管理会话过期)
saveHistory(sessionId, history);
return response;
}
private List<Message> loadHistory(String sessionId) {
String key = "session:" + sessionId;
List<Object> rawList = redisTemplate.opsForList().range(key, 0, -1);
if (rawList == null || rawList.isEmpty()) {
return new ArrayList<>();
}
return rawList.stream()
.map(item -> deserializeMessage(item.toString()))
.toList();
}
private void saveHistory(String sessionId, List<Message> history) {
String key = "session:" + sessionId;
// 清除旧数据,重写整个历史
redisTemplate.delete(key);
List<String> serialized = history.stream()
.map(this::serializeMessage)
.toList();
redisTemplate.opsForList().rightPushAll(key, serialized.toArray());
redisTemplate.expire(key, Duration.ofMinutes(sessionTtlMinutes));
}
private String serializeMessage(Message message) {
return objectMapper.writeValueAsString(Map.of(
"type", message.getMessageType().name(),
"content", message.getContent()
));
}
private Message deserializeMessage(String json) {
Map<String, String> data = objectMapper.readValue(json,
new TypeReference<Map<String, String>>() {});
return switch (MessageType.valueOf(data.get("type"))) {
case SYSTEM -> new SystemMessage(data.get("content"));
case USER -> new UserMessage(data.get("content"));
case ASSISTANT -> new AssistantMessage(data.get("content"));
default -> throw new IllegalArgumentException("未知消息类型");
};
}
}四、第二阶段:第一个微服务
4.1 AI文章生成服务的提取
// 新的独立微服务:ai-article-service
// ArticleGenerationController.java
@RestController
@RequestMapping("/api/v1/articles")
@Slf4j
public class ArticleGenerationController {
private final ArticleGenerationService service;
@PostMapping("/generate")
public ResponseEntity<ArticleGenerationResult> generate(
@RequestBody ArticleGenerationRequest request,
@RequestHeader("X-Tenant-ID") String tenantId,
@RequestHeader("X-Request-ID") String requestId) {
// 异步生成(文章生成耗时长)
String taskId = service.submitGeneration(tenantId, requestId, request);
return ResponseEntity.accepted()
.header("X-Task-ID", taskId)
.body(ArticleGenerationResult.pending(taskId));
}
// 流式接口
@GetMapping(value = "/generate/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> generateStream(
@RequestParam String topic,
@RequestParam(required = false, defaultValue = "1000") int wordCount) {
return service.generateStream(topic, wordCount)
.map(token -> ServerSentEvent.<String>builder()
.event("token")
.data(token)
.build());
}
}
// ArticleGenerationService.java
@Service
@Slf4j
public class ArticleGenerationService {
private final ChatClient chatClient;
private final TaskRepository taskRepository;
private final EventPublisher eventPublisher;
@Async("aiTaskExecutor")
public String submitGeneration(String tenantId, String requestId,
ArticleGenerationRequest request) {
String taskId = UUID.randomUUID().toString();
// 创建任务记录
taskRepository.create(taskId, tenantId, TaskStatus.PENDING);
// 异步执行
CompletableFuture.runAsync(() -> {
try {
taskRepository.updateStatus(taskId, TaskStatus.PROCESSING);
String article = doGenerate(request);
taskRepository.saveResult(taskId, article);
taskRepository.updateStatus(taskId, TaskStatus.COMPLETED);
// 通知完成(SSE/WebPush)
eventPublisher.publishTaskCompleted(tenantId, taskId, article);
} catch (Exception e) {
log.error("文章生成失败 [taskId={}]: {}", taskId, e.getMessage());
taskRepository.updateStatus(taskId, TaskStatus.FAILED);
taskRepository.saveError(taskId, e.getMessage());
}
});
return taskId;
}
public Flux<String> generateStream(String topic, int wordCount) {
return chatClient.prompt()
.user(String.format("请写一篇关于\"%s\"的文章,字数约%d字。", topic, wordCount))
.stream()
.chatResponse()
.mapNotNull(r -> r.getResult().getOutput().getContent())
.filter(content -> !content.isEmpty());
}
private String doGenerate(ArticleGenerationRequest request) {
return chatClient.prompt()
.system("你是专业的内容创作者,擅长写作各类文章。")
.user(buildGenerationPrompt(request))
.call()
.content();
}
}4.2 API网关配置(Spring Cloud Gateway)
# gateway-config.yaml
spring:
cloud:
gateway:
routes:
# AI文章生成:已迁移到新服务
- id: ai-article-service
uri: lb://ai-article-service
predicates:
- Path=/api/v1/articles/**
filters:
# 权重路由:初始10%流量到新服务
- name: WeightRoute
args:
group: article-generation
weight: 10 # 10%流量到新服务
# 单体:还处理90%的文章生成请求
- id: monolith-article
uri: lb://monolith-service
predicates:
- Path=/api/v1/articles/**
filters:
- name: WeightRoute
args:
group: article-generation
weight: 90 # 90%流量到单体
# 其他请求继续路由到单体
- id: monolith-default
uri: lb://monolith-service
predicates:
- Path=/**五、数据迁移策略
5.1 AI向量数据的分离
// VectorDataMigrationService.java
@Service
@Slf4j
public class VectorDataMigrationService {
private final JdbcTemplate monolithJdbc; // 单体数据库
private final VectorStore newVectorStore; // 新向量存储服务
private final MigrationStateRepository stateRepo;
// 增量迁移:每批次迁移未迁移的向量数据
@Scheduled(fixedRate = 60000) // 每分钟运行
public void migrateIncrementally() {
MigrationState state = stateRepo.getState("vector_migration");
long lastMigratedId = state.getLastMigratedId();
// 获取下一批数据
List<Map<String, Object>> batch = monolithJdbc.queryForList(
"SELECT id, content, embedding, metadata FROM ai_embeddings " +
"WHERE id > ? ORDER BY id LIMIT 100",
lastMigratedId
);
if (batch.isEmpty()) {
log.debug("向量数据迁移:没有新数据");
return;
}
// 迁移数据
List<Document> documents = batch.stream()
.map(row -> {
Document doc = new Document(
(String) row.get("content"),
parseMetadata(row.get("metadata"))
);
// 如果有预计算的embedding,直接使用
doc.setId(row.get("id").toString());
return doc;
})
.toList();
newVectorStore.add(documents);
// 更新迁移状态
long lastId = (Long) batch.get(batch.size() - 1).get("id");
stateRepo.updateLastMigratedId("vector_migration", lastId);
log.info("向量数据迁移:批次完成,最后ID={}", lastId);
}
}5.2 双写策略保证数据一致性
// DualWriteService.java
@Service
@Slf4j
public class DualWriteService {
private final OldDatabase oldDb;
private final NewDatabase newDb;
@Value("${migration.dual-write.enabled:true}")
private boolean dualWriteEnabled;
// 双写:同时写入新旧数据库
public void saveUserConversation(UserConversation conversation) {
// 1. 始终写入旧数据库(保证不丢数据)
oldDb.save(conversation);
if (!dualWriteEnabled) return;
// 2. 同时写入新数据库(异步,失败不影响主流程)
CompletableFuture.runAsync(() -> {
try {
newDb.save(convertToNewFormat(conversation));
} catch (Exception e) {
log.error("双写到新数据库失败: {}", e.getMessage());
// 记录失败,供后续补偿
missingDataQueue.add(conversation.getId());
}
});
}
// 数据比对:验证新旧数据库一致性
@Scheduled(cron = "0 0 4 * * *") // 每天凌晨4点
public void verifyDataConsistency() {
long totalCount = oldDb.count();
long newCount = newDb.count();
if (totalCount != newCount) {
log.error("数据不一致:旧库={}, 新库={}", totalCount, newCount);
// 触发人工告警
}
// 抽样验证内容一致性
sampleAndVerify(100);
}
}六、流量平滑切换
6.1 基于权重的渐进切换
// TrafficSwitchController.java
@RestController
@RequestMapping("/api/admin/migration")
@Slf4j
public class TrafficSwitchController {
private final GatewayWeightManager weightManager;
private final MetricsService metricsService;
// 当前灰度状态
@GetMapping("/status")
public ResponseEntity<MigrationStatus> getStatus() {
return ResponseEntity.ok(MigrationStatus.builder()
.serviceName("ai-article-service")
.currentWeight(weightManager.getWeight("ai-article-service"))
.newServiceErrorRate(metricsService.getErrorRate("ai-article-service"))
.oldServiceErrorRate(metricsService.getErrorRate("monolith-service"))
.recommendation(generateRecommendation())
.build());
}
// 增加新服务流量比例
@PostMapping("/advance")
public ResponseEntity<String> advanceTraffic(
@RequestParam(defaultValue = "10") int increment) {
int currentWeight = weightManager.getWeight("ai-article-service");
int newWeight = Math.min(100, currentWeight + increment);
// 检查新服务健康状态
double errorRate = metricsService.getErrorRate("ai-article-service");
if (errorRate > 0.01) {
return ResponseEntity.badRequest()
.body("新服务错误率(" + errorRate*100 + "%)过高,不建议增加流量");
}
weightManager.setWeight("ai-article-service", newWeight);
weightManager.setWeight("monolith-service", 100 - newWeight);
log.info("流量切换:ai-article-service={}%, monolith={}%",
newWeight, 100 - newWeight);
return ResponseEntity.ok(String.format(
"切换成功:新服务=%d%%, 旧服务=%d%%", newWeight, 100 - newWeight));
}
// 紧急回滚
@PostMapping("/rollback")
public ResponseEntity<String> rollback() {
weightManager.setWeight("ai-article-service", 0);
weightManager.setWeight("monolith-service", 100);
log.error("紧急回滚:所有流量切回单体");
return ResponseEntity.ok("回滚完成,所有流量已切回单体服务");
}
}七、回滚保障机制
7.1 Feature Flag控制
// FeatureFlagService.java
@Service
public class FeatureFlagService {
private final RedisTemplate<String, Object> redisTemplate;
// 动态Feature Flag(可实时修改,无需重启)
public boolean isEnabled(String featureName) {
Object value = redisTemplate.opsForValue().get("feature:" + featureName);
return Boolean.TRUE.equals(value);
}
// AI服务是否使用新微服务
public boolean useNewArticleService() {
return isEnabled("new-article-service");
}
// 按用户ID的灰度
public boolean isEnabledForUser(String featureName, String userId) {
// 基于用户ID哈希确定是否在灰度范围内
int userHash = Math.abs(userId.hashCode()) % 100;
Object threshold = redisTemplate.opsForValue()
.get("feature:" + featureName + ":rollout");
if (threshold == null) return false;
return userHash < Integer.parseInt(threshold.toString());
}
}
// 在API网关或业务层使用Feature Flag
@Service
public class ArticleService {
private final OldArticleService oldService;
private final NewArticleServiceClient newClient;
private final FeatureFlagService featureFlags;
public String generateArticle(String topic) {
if (featureFlags.useNewArticleService()) {
try {
return newClient.generate(topic);
} catch (Exception e) {
log.error("新服务失败,回退到旧服务: {}", e.getMessage());
return oldService.generate(topic); // 自动降级
}
}
return oldService.generate(topic);
}
}八、迁移完成后的架构
8.1 最终的服务拓扑
# docker-compose.yml(简化版展示最终架构)
services:
# API网关(Spring Cloud Gateway)
api-gateway:
image: gateway:latest
ports:
- "80:8080"
environment:
SPRING_CLOUD_GATEWAY_ROUTES: "[
{id: articles, uri: lb://ai-article-service, predicates: [Path=/api/v1/articles/**]},
{id: chat, uri: lb://ai-chat-service, predicates: [Path=/api/v1/chat/**]},
{id: users, uri: lb://user-service, predicates: [Path=/api/v1/users/**]},
{id: billing, uri: lb://billing-service, predicates: [Path=/api/v1/billing/**]}
]"
# AI文章生成服务
ai-article-service:
image: ai-article:latest
replicas: 3
environment:
SPRING_AI_OPENAI_API_KEY: ${OPENAI_API_KEY}
REDIS_URL: redis://redis:6379
# AI对话服务
ai-chat-service:
image: ai-chat:latest
replicas: 2
environment:
SPRING_AI_OPENAI_API_KEY: ${OPENAI_API_KEY}
REDIS_URL: redis://redis:6379
VECTOR_STORE_URL: http://vector-store:8080
# 用户服务(非AI,从单体拆出)
user-service:
image: user-service:latest
replicas: 2
# 计费服务
billing-service:
image: billing:latest
replicas: 2
# 共享基础设施
redis:
image: redis:7-alpine
vector-store:
image: qdrant/qdrant:latest九、常见问题 FAQ
Q1:单体迁移微服务,团队规模多小时就要开始?
A:参考指导原则:
- 单体代码行数>10万 AND 团队>5人:可以考虑
- 出现资源争用问题(AI服务影响其他功能):该拆了
- 发版频率受阻(别人的代码影响你发布):该拆了
- 单人团队:不要拆,单体就是最佳选择
Q2:AI会话历史迁移到Redis,数据安全吗?
A:
- Redis配置密码认证 + TLS传输加密
- 敏感对话数据在存储前加密(AES-256)
- 设置合理的TTL(60分钟无操作自动清除)
- 合规场景(医疗/法律)需要评估是否能在Redis存储用户数据
Q3:如何处理迁移期间的服务间调用延迟增加?
A:
- 服务间调用用HTTP/2(Spring WebClient默认支持)
- 核心路径用gRPC(延迟<1ms)
- 用本地缓存减少跨服务调用次数
- 使用服务网格(Istio)优化服务间连接复用
Q4:数据库怎么拆?AI服务需要自己的数据库吗?
A:推荐分阶段:
- 初期:共享数据库,但AI服务用独立的Schema
- 中期:数据库实例分离(读写分离)
- 长期:完全独立的数据库(按服务)
AI服务通常需要独立的向量数据库(Qdrant/Pinecone),但关系型数据可以先共享。
Q5:如何评估迁移是否成功?
A:关键指标:
- P99延迟:是否下降?
- 服务可用性:各服务是否独立可用?
- 发布频率:是否可以独立发版?
- 故障隔离:AI服务宕机是否影响其他服务?
- 成本:资源利用率是否提升?
十、总结
AI单体到分布式的迁移路径:
| 阶段 | 核心工作 | 典型时长 | 风险 |
|---|---|---|---|
| 准备 | 状态外置化(Redis)+ 监控建立 | 1个月 | 低 |
| 第一阶段 | 拆出最简单的无状态AI服务 | 1-2个月 | 低-中 |
| 第二阶段 | 拆出核心AI对话服务 | 2-3个月 | 中 |
| 第三阶段 | 拆出数据层(向量库分离) | 2个月 | 高 |
| 完成 | 数据库拆分 + 单体下线 | 1-2个月 | 高 |
张扬的8个月迁移之所以成功,关键在于三个原则:
- 永远不停服:Strangler Fig模式
- 永远能回滚:Feature Flag + 权重路由
- 永远可验证:每步都有数据证明效果
这三个原则,是所有大型系统演进的基石。
