第1709篇:CQRS模式在AI服务中的实践——读写分离与投影设计
第1709篇:CQRS模式在AI服务中的实践——读写分离与投影设计
做AI应用久了,你会发现一个规律:写入不频繁,但查询五花八门。
用户发一条消息,触发一次写入。但后续的查询是什么样的?对话历史、Token消耗统计、模型效果分析、用户行为报表……每种查询的需求都不同,有的需要实时性,有的只需要最终一致,有的查询模型和写入模型完全不同。
这就是CQRS(Command Query Responsibility Segregation,命令查询职责分离)要解决的问题。它把写入(Command)和查询(Query)拆成两条独立的路径,各自优化。
这篇文章我会从最简单的单体CQRS实现开始,一步步演进到适合生产的架构,结合AI服务的真实场景讲。
一、为什么AI场景特别适合CQRS
先讲一个真实的痛点。
我们的AI助手有个对话历史页面,需要展示:
- 会话列表(按最后活跃时间排序)
- 每个会话的消息预览
- Token消耗
- 对话轮次
- 用户评分
还有个管理后台,需要展示:
- 各模型的用量统计
- 用户活跃度排行
- 异常对话检测(被过滤次数多的会话)
- 成本趋势
如果用传统方式,这两个需求都走同一个数据库,写个复杂的SQL查询。结果是:
- 一个大表承载所有查询,性能难以针对性优化
- 查询模型和写入模型耦合,加字段要同时考虑读写影响
- 实时查询和统计报表跑在同一个库上,互相影响
CQRS给了一个清晰的解法:写入走命令路径,查询走各自专属的投影/视图。
二、最简单的CQRS:接口层分离
最轻量的CQRS实现,连数据库都不需要分开,只是代码结构上分离Command和Query:
// ===== Command 侧 =====
// 命令基接口(不可变的意图描述)
public sealed interface ConversationCommand
permits ConversationCommand.CreateSession,
ConversationCommand.SendMessage,
ConversationCommand.SwitchModel,
ConversationCommand.ArchiveSession,
ConversationCommand.GiveFeedback {
String commandId();
String sessionId();
String userId();
Instant issuedAt();
record CreateSession(
String commandId, String sessionId, String userId, Instant issuedAt,
String title, String modelName, String systemPrompt, SessionConfig config
) implements ConversationCommand {}
record SendMessage(
String commandId, String sessionId, String userId, Instant issuedAt,
String content, MessageType messageType
) implements ConversationCommand {
public enum MessageType { TEXT, IMAGE, FILE }
}
record SwitchModel(
String commandId, String sessionId, String userId, Instant issuedAt,
String newModel, String reason
) implements ConversationCommand {}
record ArchiveSession(
String commandId, String sessionId, String userId, Instant issuedAt,
String reason
) implements ConversationCommand {}
record GiveFeedback(
String commandId, String sessionId, String userId, Instant issuedAt,
String targetMessageId, FeedbackType type, String comment
) implements ConversationCommand {
public enum FeedbackType { THUMBS_UP, THUMBS_DOWN, REPORT }
}
}
// 命令结果
public sealed interface CommandResult
permits CommandResult.Success, CommandResult.Failure {
String commandId();
record Success(String commandId, Object payload) implements CommandResult {}
record Failure(String commandId, String errorCode, String message)
implements CommandResult {}
}
// 命令处理器接口
public interface CommandHandler<C extends ConversationCommand> {
CommandResult handle(C command);
}
// 命令总线:根据命令类型路由到对应处理器
@Service
public class CommandBus {
private final Map<Class<?>, CommandHandler<?>> handlers;
public CommandBus(
CreateSessionHandler createSessionHandler,
SendMessageHandler sendMessageHandler,
SwitchModelHandler switchModelHandler,
ArchiveSessionHandler archiveSessionHandler,
GiveFeedbackHandler giveFeedbackHandler) {
this.handlers = Map.of(
ConversationCommand.CreateSession.class, createSessionHandler,
ConversationCommand.SendMessage.class, sendMessageHandler,
ConversationCommand.SwitchModel.class, switchModelHandler,
ConversationCommand.ArchiveSession.class, archiveSessionHandler,
ConversationCommand.GiveFeedback.class, giveFeedbackHandler
);
}
@SuppressWarnings("unchecked")
public CommandResult dispatch(ConversationCommand command) {
CommandHandler handler = handlers.get(command.getClass());
if (handler == null) {
return new CommandResult.Failure(
command.commandId(),
"NO_HANDLER",
"没有找到命令处理器: " + command.getClass().getSimpleName()
);
}
try {
return handler.handle(command);
} catch (Exception e) {
log.error("命令处理异常: {}", command.commandId(), e);
return new CommandResult.Failure(
command.commandId(),
"HANDLER_ERROR",
"命令处理失败: " + e.getMessage()
);
}
}
}三、命令处理器实现
// 发送消息的命令处理器
@Service
public class SendMessageHandler implements CommandHandler<ConversationCommand.SendMessage> {
private final SessionRepository sessionRepository;
private final MessageRepository messageRepository;
private final ChatClient chatClient;
private final ApplicationEventPublisher eventPublisher;
@Override
@Transactional
public CommandResult handle(ConversationCommand.SendMessage command) {
// 1. 验证会话存在且活跃
Session session = sessionRepository.findActiveById(command.sessionId())
.orElseThrow(() -> new SessionNotFoundException(command.sessionId()));
// 2. 保存用户消息
Message userMsg = Message.userMessage(
command.sessionId(),
command.content(),
command.userId()
);
messageRepository.save(userMsg);
// 3. 获取历史上下文(只取最近N条,避免超token限制)
List<Message> history = messageRepository.findRecentBySessionId(
command.sessionId(), 20 // 最近20条
);
// 4. 调用AI
long startTime = System.currentTimeMillis();
ChatResponse aiResponse = chatClient.prompt()
.system(session.getSystemPrompt())
.messages(toSpringAIMessages(history))
.call()
.chatResponse();
long latency = System.currentTimeMillis() - startTime;
// 5. 保存AI响应消息
int promptTokens = aiResponse.getMetadata().getUsage().getPromptTokens().intValue();
int completionTokens = aiResponse.getMetadata().getUsage().getCompletionTokens().intValue();
Message assistantMsg = Message.assistantMessage(
command.sessionId(),
aiResponse.getResult().getOutput().getContent(),
session.getModelName(),
promptTokens, completionTokens, latency
);
messageRepository.save(assistantMsg);
// 6. 更新会话统计(原子操作)
sessionRepository.updateStats(
command.sessionId(),
promptTokens + completionTokens,
1
);
// 7. 发布领域事件(供Query侧的投影更新)
eventPublisher.publishEvent(new MessageProcessedEvent(
command.sessionId(), userMsg, assistantMsg, promptTokens + completionTokens
));
return new CommandResult.Success(command.commandId(), assistantMsg);
}
private List<org.springframework.ai.chat.messages.Message> toSpringAIMessages(
List<Message> messages) {
return messages.stream()
.map(m -> switch (m.getRole()) {
case "user" -> new UserMessage(m.getContent());
case "assistant" -> new AssistantMessage(m.getContent());
default -> new UserMessage(m.getContent());
})
.toList();
}
}四、Query侧:专为查询优化的投影
Query侧和Command侧用不同的数据模型,专门为读取优化:
// ===== Query 侧 =====
// 对话历史视图(专为用户端对话界面优化)
public record ConversationHistoryView(
String sessionId,
String title,
String modelName,
String lastMessage, // 最后一条消息的预览
String lastMessageRole,
Instant lastActiveAt,
int totalMessages,
int totalTokens,
int totalTurns,
String status,
List<MessageView> recentMessages // 最近几条消息
) {
public record MessageView(
String messageId,
String role,
String content,
Instant createdAt,
Integer tokens
) {}
// 生成摘要文字
public String getDisplaySummary() {
if (lastMessage == null || lastMessage.isBlank()) return "空对话";
String preview = lastMessage.length() > 50
? lastMessage.substring(0, 50) + "..."
: lastMessage;
return "assistant".equals(lastMessageRole) ? "AI: " + preview : "你: " + preview;
}
}
// Token消耗统计视图(专为管理后台优化)
public record TokenUsageView(
String userId,
LocalDate date,
String modelName,
long totalCalls,
long totalPromptTokens,
long totalCompletionTokens,
double estimatedCostUSD
) {
public long totalTokens() { return totalPromptTokens + totalCompletionTokens; }
}
// 模型效果对比视图
public record ModelPerformanceView(
String modelName,
double avgLatencyMs,
double satisfactionRate, // 点赞率
long totalResponses,
long filteredResponses, // 被内容过滤的次数
double avgTokensPerResponse
) {}
// 查询接口
public interface ConversationQueryService {
// 获取用户的对话历史列表
List<ConversationHistoryView> getUserConversations(
String userId, int page, int size);
// 获取对话详情
Optional<ConversationHistoryView> getConversationDetail(String sessionId);
// Token消耗统计(按日期和模型)
List<TokenUsageView> getTokenUsage(
String userId, LocalDate start, LocalDate end);
// 模型效果对比
List<ModelPerformanceView> getModelPerformance(
LocalDate start, LocalDate end);
// 搜索对话(全文搜索)
List<ConversationHistoryView> searchConversations(
String userId, String keyword, int page, int size);
}五、投影维护:Event-Driven更新读取模型
Command侧写入后,通过领域事件触发Query侧的投影更新:
// 投影更新器:监听领域事件,更新查询视图
@Service
public class ConversationProjectionUpdater {
private final ConversationHistoryProjectionRepository projectionRepo;
private final TokenUsageProjectionRepository tokenUsageRepo;
// 监听消息处理事件,更新对话历史投影
@EventListener
@Async("projectionExecutor") // 异步处理,不阻塞主流程
public void onMessageProcessed(MessageProcessedEvent event) {
// 更新对话历史投影(denormalized数据)
projectionRepo.updateLastActivity(
event.sessionId(),
event.assistantMessage().getContent(),
event.totalTokens(),
LocalDateTime.now()
);
// 更新Token消耗投影
tokenUsageRepo.addTokenUsage(
event.userId(),
LocalDate.now(),
event.modelName(),
event.totalTokens()
);
log.debug("投影更新完成,sessionId: {}", event.sessionId());
}
// 监听模型切换事件
@EventListener
@Async("projectionExecutor")
public void onModelSwitched(ModelSwitchedEvent event) {
projectionRepo.updateModel(event.sessionId(), event.newModel());
}
// 监听反馈事件,更新质量投影
@EventListener
@Async("projectionExecutor")
public void onFeedbackGiven(FeedbackGivenEvent event) {
// 更新对应模型的满意度统计
projectionRepo.updateFeedback(
event.sessionId(),
event.feedbackType()
);
}
}
// 对话历史投影的Repository(专门为读取优化)
@Repository
public interface ConversationHistoryProjectionRepository extends JpaRepository<ConversationHistoryProjection, String> {
// 注意:用了@Query优化,只查需要的字段
@Query("""
SELECT new com.example.query.ConversationHistoryView(
s.id, s.title, s.modelName, s.lastMessage, s.lastMessageRole,
s.lastActiveAt, s.totalMessages, s.totalTokens, s.totalTurns, s.status,
null
)
FROM ConversationHistoryProjection s
WHERE s.userId = :userId AND s.deleted = false
ORDER BY s.lastActiveAt DESC
""")
Page<ConversationHistoryView> findByUserId(@Param("userId") String userId, Pageable pageable);
@Modifying
@Query("""
UPDATE ConversationHistoryProjection p
SET p.lastMessage = :content,
p.totalTokens = p.totalTokens + :tokens,
p.totalMessages = p.totalMessages + 2,
p.totalTurns = p.totalTurns + 1,
p.lastActiveAt = :activeAt
WHERE p.id = :sessionId
""")
void updateLastActivity(@Param("sessionId") String sessionId,
@Param("content") String content,
@Param("tokens") int tokens,
@Param("activeAt") LocalDateTime activeAt);
}六、读写分离:两个数据库
当流量增大时,可以进一步把读写分到不同的数据库实例:
// Spring的多数据源配置
@Configuration
public class CQRSDatabaseConfig {
// 写数据库(主库)
@Bean
@Primary
@ConfigurationProperties("spring.datasource.write")
public DataSource writeDataSource() {
return DataSourceBuilder.create().build();
}
// 读数据库(从库或专用查询库)
@Bean
@ConfigurationProperties("spring.datasource.read")
public DataSource readDataSource() {
return DataSourceBuilder.create().build();
}
// 根据上下文动态路由数据源
@Bean
@Primary
public DataSource routingDataSource(
@Qualifier("writeDataSource") DataSource write,
@Qualifier("readDataSource") DataSource read) {
AbstractRoutingDataSource routing = new AbstractRoutingDataSource() {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContext.isReadOnly() ? "read" : "write";
}
};
routing.setDefaultTargetDataSource(write);
routing.setTargetDataSources(Map.of("write", write, "read", read));
routing.afterPropertiesSet();
return routing;
}
}
// 数据源上下文
public class DataSourceContext {
private static final ThreadLocal<Boolean> READ_ONLY = ThreadLocal.withInitial(() -> false);
public static void setReadOnly(boolean readOnly) { READ_ONLY.set(readOnly); }
public static boolean isReadOnly() { return READ_ONLY.get(); }
public static void clear() { READ_ONLY.remove(); }
}
// 注解驱动的读写路由
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {}
// AOP切面:加了@ReadOnly注解的方法走读库
@Aspect
@Component
public class ReadOnlyRoutingAspect {
@Around("@annotation(com.example.annotation.ReadOnly)")
public Object routeToReadDb(ProceedingJoinPoint pjp) throws Throwable {
DataSourceContext.setReadOnly(true);
try {
return pjp.proceed();
} finally {
DataSourceContext.clear();
}
}
}
// 使用
@Service
public class ConversationQueryServiceImpl implements ConversationQueryService {
@Override
@ReadOnly // 走读库
@Cacheable(value = "conversationHistory", key = "#userId + '_' + #page")
public List<ConversationHistoryView> getUserConversations(
String userId, int page, int size) {
// ... 从读库查询
}
}七、最终一致性与补偿
读写分离后,会有短暂的不一致:用户刚发完消息,立刻查对话历史,可能还看不到刚发的消息(投影还没更新)。
处理这个问题的几种策略:
策略一:写后等待投影更新
// 写入后主动等投影更新,或者返回本地缓存
@PostMapping("/send")
public ResponseEntity<MessageResponse> sendMessage(@RequestBody SendMessageRequest req) {
CommandResult result = commandBus.dispatch(new ConversationCommand.SendMessage(...));
if (result instanceof CommandResult.Success s) {
Message assistantMsg = (Message) s.payload();
// 直接返回刚生成的消息,不依赖投影
return ResponseEntity.ok(MessageResponse.from(assistantMsg));
}
return ResponseEntity.status(500).build();
}策略二:版本号 + 读取重试
// 带版本的查询:等待投影追上指定版本
@ReadOnly
public ConversationHistoryView getConversationDetail(String sessionId, long minVersion) {
// 重试最多3次,等投影更新
for (int i = 0; i < 3; i++) {
ConversationHistoryProjection projection = projectionRepo.findById(sessionId)
.orElseThrow();
if (projection.getVersion() >= minVersion) {
return toView(projection);
}
// 等50ms再试
try { Thread.sleep(50); } catch (InterruptedException e) { break; }
}
// 超时后返回当前投影,可能稍旧
return toView(projectionRepo.findById(sessionId).orElseThrow());
}策略三:用户端乐观更新 前端收到发送成功的响应后,直接在本地状态里添加这条消息,不等服务端查询。大多数前端应用都是这么做的。
八、完整架构图
九、CQRS的适用边界
什么时候不需要CQRS:
- 简单的CRUD应用,查询和写入的模型基本一样
- 团队规模小,代码量本来就不大,加了CQRS反而增加复杂度
- 查询性能还没到瓶颈,过早优化
什么时候值得引入CQRS:
- 查询需求多样,和写入模型差异大
- 查询性能和写入性能需要独立扩展
- 需要历史查询、报表分析等重查询场景
- 写入有复杂业务规则,需要和查询路径解耦
在AI应用里,中等规模以上的产品化AI助手基本上都能从CQRS中受益。写入路径相对简单(发消息→调AI→存结果),但查询需求非常多样:用户端对话历史、管理端报表分析、模型效果监控……这些查询完全可以有各自独立的优化路径。
小结
CQRS在AI服务中的核心价值:
- 命令侧专注业务规则和数据一致性,不受查询复杂度影响
- 查询侧可以自由使用Denormalized视图、缓存、读写分离,不受写入约束
- 密封类建模命令,让命令类型清晰可扩展
- 投影的异步更新 + 最终一致性,是CQRS的主要设计取舍
- 从最简单的代码结构分离开始,按需演进到数据库分离
