第2287篇:AI系统的CQRS模式——读写分离在LLM应用中的工程价值
第2287篇:AI系统的CQRS模式——读写分离在LLM应用中的工程价值
适读人群:有DDD/CQRS基础、正在设计复杂AI系统的架构师和高级工程师 | 阅读时长:约15分钟 | 核心价值:理解CQRS在AI应用场景下的特殊价值,掌握读写分离的工程实现
我第一次在AI系统里认真考虑CQRS,是因为一个很具体的问题:我们的AI助手系统里,"问AI一个问题"和"查历史对话记录"这两个操作,无论在资源消耗、一致性要求、还是可用性要求上,都有巨大差异,但我们一开始把它们塞在同一个服务里,共用同一个数据模型。
麻烦接踵而至。查询历史记录要求快(用户看过往对话不能等),但AI对话要求容错强(万一AI服务短暂不可用,用户应该看到合理提示而不是整个页面崩溃)。数据模型为了存储AI对话做了优化,但对于"按时间范围查历史"这类查询来说,索引设计很别扭。
CQRS——命令查询责任分离——在这里能解决实质性问题。
CQRS在AI应用中的特殊价值
传统应用里,CQRS主要解决读写负载不均衡的问题。在AI应用里,它额外解决三个问题:
1. AI对话是写操作,但查询历史是读操作,两者的一致性要求完全不同
"问AI一个问题"是一个有副作用的命令:它消耗AI API配额、产生费用、改变对话状态。如果因为某个错误导致这个命令重复执行,后果严重。
"查历史记录"是纯读取,重复执行没有副作用,可以接受最终一致性。
把它们分开,可以给命令端加更严格的幂等保护,给查询端做更激进的缓存。
2. AI系统的"读模型"需要针对不同查询场景分别优化
AI应用的查询场景很多样:
- 用户查自己的历史对话(按时间排序)
- 管理员查某个时间段的所有对话(按时间范围过滤)
- 数据分析师查某类意图的对话分布(按意图分组统计)
- 运营人员查包含特定关键词的对话(全文搜索)
这些查询场景的数据结构需求完全不同。CQRS允许你为每个场景维护独立的读模型(物化视图),而不是在一个通用模型上做各种妥协。
3. 写操作(AI推理)极慢,读操作要求极快
AI推理动辄10-60秒,而用户查历史记录期望在200ms内返回。共享同一个服务时,慢写影响快读,或者为了保护读性能而限制写并发,都是常见的问题。
架构设计
命令端实现
命令端负责处理用户发起的AI对话,核心是保证幂等性和状态一致性:
// 命令对象:不可变,包含所有必要信息
public record SendMessageCommand(
String conversationId,
String messageId, // 幂等键
String userId,
String content,
Map<String, Object> context
) {}
@Service
public class ConversationCommandHandler {
private final LlmClient llmClient;
private final ConversationRepository repository;
private final EventPublisher eventPublisher;
private final IdempotencyStore idempotencyStore;
@Transactional
public SendMessageResult handle(SendMessageCommand command) {
// 幂等检查
Optional<SendMessageResult> existing =
idempotencyStore.find(command.messageId());
if (existing.isPresent()) {
log.info("重复请求,返回缓存结果: messageId={}", command.messageId());
return existing.get();
}
// 加载对话上下文
Conversation conversation = repository.findById(command.conversationId())
.orElseThrow(() -> new ConversationNotFoundException(command.conversationId()));
// 执行业务规则验证
validateCommand(command, conversation);
// 调用AI
String aiResponse = callAi(command, conversation);
// 更新聚合根状态
conversation.addMessage(
UserMessage.of(command.messageId(), command.content()),
AiMessage.of(UUID.randomUUID().toString(), aiResponse)
);
repository.save(conversation);
// 发布领域事件
eventPublisher.publish(new MessageSentEvent(
command.conversationId(),
command.messageId(),
command.userId(),
command.content(),
aiResponse,
Instant.now()
));
SendMessageResult result = new SendMessageResult(
command.messageId(), aiResponse, conversation.getMessageCount()
);
// 存储幂等结果(TTL 24小时)
idempotencyStore.store(command.messageId(), result, Duration.ofHours(24));
return result;
}
private String callAi(SendMessageCommand command, Conversation conversation) {
// 构建历史对话上下文(命令端只取必要的上下文)
List<ChatMessage> history = conversation.getRecentMessages(20)
.stream()
.map(msg -> new ChatMessage(msg.getRole(), msg.getContent()))
.collect(Collectors.toList());
return llmClient.chat(history, command.content(), command.context());
}
}查询端实现
查询端维护多个针对不同场景的读模型:
// 读模型1:用户对话列表(按时间排序的摘要)
@Document(indexName = "conversation-summaries")
public class ConversationSummaryView {
private String conversationId;
private String userId;
private String lastMessage; // 最后一条消息摘要
private int messageCount;
private Instant createdAt;
private Instant lastMessageAt;
private List<String> topics; // AI提取的话题标签
}
// 读模型2:消息全文搜索索引
@Document(indexName = "messages")
public class MessageSearchView {
private String messageId;
private String conversationId;
private String userId;
private String userContent;
private String aiContent;
private Instant createdAt;
private String intent; // AI识别的意图
private double sentiment; // 情感分析分数
}
// 读模型3:实时对话缓存(Redis)
// 用于快速加载正在进行的对话@Service
public class ConversationQueryHandler {
private final ElasticsearchClient esClient;
private final RedisTemplate<String, Object> redis;
/**
* 查询用户历史对话列表(分页)
*/
public PageResult<ConversationSummaryView> getConversationList(
String userId, int page, int size) {
// 查读模型(Elasticsearch),不走主库
SearchRequest request = SearchRequest.of(sr -> sr
.index("conversation-summaries")
.query(q -> q.term(t -> t.field("userId").value(userId)))
.sort(s -> s.field(f -> f.field("lastMessageAt").order(SortOrder.Desc)))
.from(page * size)
.size(size)
);
SearchResponse<ConversationSummaryView> response =
esClient.search(request, ConversationSummaryView.class);
return PageResult.of(
response.hits().hits().stream()
.map(Hit::source)
.collect(Collectors.toList()),
response.hits().total().value(),
page, size
);
}
/**
* 全文搜索对话内容
*/
public List<MessageSearchView> searchMessages(String userId, String keyword) {
SearchRequest request = SearchRequest.of(sr -> sr
.index("messages")
.query(q -> q.bool(b -> b
.must(m -> m.term(t -> t.field("userId").value(userId)))
.must(m -> m.multiMatch(mm -> mm
.fields("userContent", "aiContent")
.query(keyword)
.fuzziness("AUTO")
))
))
.size(20)
);
SearchResponse<MessageSearchView> response =
esClient.search(request, MessageSearchView.class);
return response.hits().hits().stream()
.map(Hit::source)
.collect(Collectors.toList());
}
/**
* 获取正在进行的对话(实时性要求高,从Redis取)
*/
public List<ChatMessage> getActiveConversation(String conversationId) {
String cacheKey = "active-conv:" + conversationId;
List<ChatMessage> cached = (List<ChatMessage>) redis.opsForValue().get(cacheKey);
if (cached != null) {
return cached;
}
// 缓存未命中,从ES取并重建缓存
List<ChatMessage> messages = loadFromEs(conversationId);
redis.opsForValue().set(cacheKey, messages, Duration.ofMinutes(30));
return messages;
}
}事件处理:读模型同步
写端产生事件后,需要同步更新各个读模型:
@Component
public class MessageSentProjectionBuilder {
private final ElasticsearchClient esClient;
private final RedisTemplate<String, Object> redis;
private final LlmClient llmClient;
@KafkaListener(topics = "conversation-events", groupId = "projection-builder")
public void handleMessageSent(MessageSentEvent event) {
// 更新消息全文搜索索引
updateMessageSearchIndex(event);
// 更新对话摘要
updateConversationSummary(event);
// 更新活跃对话缓存
updateActiveConversationCache(event);
// 异步:提取意图和情感(AI操作,可以稍慢一些)
CompletableFuture.runAsync(() -> enrichMessageWithAiMetadata(event));
}
private void updateMessageSearchIndex(MessageSentEvent event) {
MessageSearchView view = new MessageSearchView();
view.setMessageId(event.getMessageId());
view.setConversationId(event.getConversationId());
view.setUserId(event.getUserId());
view.setUserContent(event.getUserContent());
view.setAiContent(event.getAiContent());
view.setCreatedAt(event.getTimestamp());
esClient.index(i -> i
.index("messages")
.id(event.getMessageId())
.document(view)
);
}
private void enrichMessageWithAiMetadata(MessageSentEvent event) {
// 用轻量AI模型提取意图和情感(异步、非阻塞主流程)
try {
String intent = classifyIntent(event.getUserContent());
double sentiment = analyzeSentiment(event.getUserContent());
esClient.update(u -> u
.index("messages")
.id(event.getMessageId())
.doc(Map.of("intent", intent, "sentiment", sentiment))
);
} catch (Exception e) {
// 元数据提取失败不影响主流程
log.warn("AI元数据提取失败: messageId={}", event.getMessageId(), e);
}
}
}最终一致性的可接受窗口
CQRS引入了最终一致性——命令处理完后,读模型需要一点时间才能同步更新。在实践中,这个延迟通常在100-500毫秒。
对于AI对话应用,这个延迟通常是可以接受的:用户发完消息后,不会立刻去查历史记录列表;就算查了,几百毫秒的延迟在感知上几乎不存在。
但有一个例外需要处理:用户刚发完消息,立刻点开这条消息希望看到它的详情。这时候读模型可能还没更新。解决方案是:在API层缓存刚创建的消息,读取时先查缓存,缓存未命中再查ES:
@RestController
public class ConversationApiController {
// 刚创建的消息缓存(TTL 5秒,足够读模型同步)
private final Cache<String, MessageSearchView> recentMessagesCache =
Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.SECONDS)
.maximumSize(10000)
.build();
@PostMapping("/conversations/{id}/messages")
public SendMessageResult sendMessage(
@PathVariable String id,
@RequestBody SendMessageRequest req) {
SendMessageResult result = commandHandler.handle(
new SendMessageCommand(id, req.getMessageId(), /* ... */)
);
// 把刚创建的消息放入短暂缓存,解决读写一致性的短暂间隙
recentMessagesCache.put(req.getMessageId(), buildView(result));
return result;
}
}CQRS在AI系统里不是过度设计,是真实解决了读写模型差异巨大这个工程问题。
