CQRS 在 AI 应用里的价值——读写分离的新场景
CQRS 在 AI 应用里的价值——读写分离的新场景
适读人群:做高并发 AI 应用的后端工程师 | 阅读时长:约15分钟 | 核心价值:CQRS 模式在 AI 场景下的完整实践,解决读写争用的架构问题
去年我们的 AI 客服系统出了一个诡异的问题:并发高峰期,用户更新知识库(写操作)会导致 AI 检索响应变慢,P99 延迟从 800ms 涨到 3500ms。
查了两天,发现根源在于:写知识库时要做向量 embedding(重计算操作),embedding 时会抢占 GPU 资源,而 AI 检索(读操作)也在用 GPU 做推理。两个操作打架,读性能被写操作拖累。
这个问题让我想到了 CQRS。
CQRS 是什么,AI 场景下为什么有价值
CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一个把"写操作路径"和"读操作路径"分开设计的架构模式。
在传统 CRUD 应用里,读和写通常走同一套代码、同一个数据库,共用资源。这在低并发场景没问题,但在以下情况会出问题:
- 读和写的负载特性差异很大
- 读需要非常低的延迟,写可以有一定延迟
- 读的数据视图和写的数据模型差异大
AI 应用恰好在这三个方面都很典型:
写操作(Command) 读操作(Query)
-------------------------------- --------------------------------
用户上传文档 → 分块 → 做 embedding 用户对话 → 向量检索 → 生成回答
管理员更新知识库 AI 引用检索结果
用户对话历史写入 推荐内容检索
重计算、高 CPU/GPU 消耗 低延迟要求,P99 < 500ms
允许几秒到几分钟的延迟 每次对话必须即时响应读写的性质完全不同,混在一起必然互相干扰。
整体架构
我最终设计的 CQRS 架构:
+-------------------+ +-------------------+
| Command Side | | Query Side |
| | | |
| [API /commands] | | [API /queries] |
| | | | | |
| [Command Bus] | | [Query Handler] |
| | | | | |
| [Handlers] | | [Read Models] |
| - doc_ingest | | - vector store |
| - kb_update | | - cache layer |
| - feedback_write | | - search index |
| | | | ^ |
| [Write DB] | | [Read DB replicas]|
| (PostgreSQL) | | (read-only) |
| | | | ^ |
+-------|-----------+ +-------|------------+
| |
+---[Event Bus (Redis)]------+
同步写事件到读侧写侧和读侧通过事件总线同步。这意味着读侧的数据是最终一致的,不是强一致的。这个取舍在 AI 应用里是可以接受的——用户更新了知识库,AI 不需要立刻感知到,延迟个10-30秒完全没问题。
写侧(Command Side)的实现
from dataclasses import dataclass
from typing import Any
import asyncio
# Command 定义:每种写操作是一个 Command 对象
@dataclass
class IngestDocumentCommand:
document_id: str
title: str
content: str
source: str
user_id: str
@dataclass
class UpdateKnowledgeBaseCommand:
kb_id: str
document_ids: list[str]
operation: str # 'add' | 'remove' | 'rebuild'
@dataclass
class WriteChatHistoryCommand:
session_id: str
user_id: str
role: str
content: str
metadata: dict
# Command Bus:路由 Command 到对应的 Handler
class CommandBus:
def __init__(self):
self._handlers = {}
def register(self, command_type: type, handler):
self._handlers[command_type] = handler
async def dispatch(self, command: Any) -> Any:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command).__name__}")
return await handler.handle(command)
# IngestDocument Handler
class IngestDocumentHandler:
def __init__(self, db, embedding_service, event_bus):
self.db = db
self.embedding_service = embedding_service
self.event_bus = event_bus
async def handle(self, cmd: IngestDocumentCommand) -> dict:
# 1. 写入关系型数据库(同步,快)
async with self.db.transaction():
await self.db.execute(
"""INSERT INTO documents (id, title, content, source, user_id, status)
VALUES ($1, $2, $3, $4, $5, 'processing')
ON CONFLICT (id) DO UPDATE
SET title=$2, content=$3, status='processing', updated_at=NOW()""",
cmd.document_id, cmd.title, cmd.content,
cmd.source, cmd.user_id
)
# 2. 发送事件到事件总线(触发读侧异步更新)
await self.event_bus.publish("document.ingested", {
"document_id": cmd.document_id,
"title": cmd.title,
"content": cmd.content,
"user_id": cmd.user_id,
"timestamp": datetime.utcnow().isoformat()
})
# 命令立即返回,不等 embedding 完成
return {"document_id": cmd.document_id, "status": "processing"}写侧的关键设计:命令处理只负责写主数据库和发事件,不做 embedding,不等向量写入完成。 命令执行时间控制在 50ms 以内。
读侧(Query Side)的实现
# 读模型:专门为查询优化的数据视图
class KnowledgeBaseReadModel:
def __init__(self, qdrant_client, cache, db_read_replica):
self.qdrant = qdrant_client
self.cache = cache
self.db = db_read_replica # 只读副本
async def search(
self,
query: str,
kb_id: str,
top_k: int = 5
) -> list[dict]:
"""向量检索,全程走读侧,不碰写侧数据库"""
# 先查缓存(热门查询)
cache_key = f"kb_search:{kb_id}:{hash(query)}"
cached = await self.cache.get(cache_key)
if cached:
return cached
# 向量检索
query_embedding = await self._get_query_embedding(query)
results = await self.qdrant.search(
collection_name=f"kb_{kb_id}",
query_vector=query_embedding,
limit=top_k * 2,
with_payload=True
)
if not results:
return []
# 回查只读副本验证数据有效性
chunk_ids = [r.payload.get('chunk_id') for r in results]
valid_chunks = await self.db.fetch(
"""SELECT c.id, c.content, c.document_id, d.title
FROM document_chunks c
JOIN documents d ON c.document_id = d.id
WHERE c.id = ANY($1)
AND c.vector_status = 'synced'
AND d.status = 'active'""",
chunk_ids
)
valid_map = {str(c['id']): c for c in valid_chunks}
output = []
for r in results:
cid = r.payload.get('chunk_id')
if cid in valid_map:
output.append({
"content": valid_map[cid]['content'],
"title": valid_map[cid]['title'],
"document_id": str(valid_map[cid]['document_id']),
"score": r.score
})
output = output[:top_k]
# 写缓存,TTL 60秒
await self.cache.setex(cache_key, 60, output)
return output
# Query Handler:处理读请求
class AIQueryHandler:
def __init__(self, read_model: KnowledgeBaseReadModel, llm_client):
self.read_model = read_model
self.llm = llm_client
async def handle_chat_query(
self,
user_message: str,
kb_id: str,
session_history: list
) -> dict:
"""处理对话查询,完全在读侧完成"""
# 1. 检索相关文档
retrieved_docs = await self.read_model.search(
query=user_message,
kb_id=kb_id,
top_k=5
)
# 2. 组装上下文
context = "\n\n".join([
f"[来源: {doc['title']}]\n{doc['content']}"
for doc in retrieved_docs
])
# 3. 调用 LLM
messages = [
{"role": "system", "content": f"""根据以下参考资料回答用户问题。
参考资料:
{context}
要求:只根据资料内容回答,资料里没有的信息请如实说不知道。"""}
]
messages.extend(session_history[-4:]) # 最近2轮对话
messages.append({"role": "user", "content": user_message})
response = await self.llm.chat(messages=messages)
return {
"answer": response.content,
"sources": [{"title": d['title'], "id": d['document_id']}
for d in retrieved_docs]
}事件总线:连接读写两侧
import redis.asyncio as aioredis
import json
class EventBus:
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
async def publish(self, topic: str, payload: dict):
await self.redis.lpush(
f"events:{topic}",
json.dumps(payload)
)
async def subscribe(self, topic: str, handler, batch_size: int = 10):
"""消费事件,处理读侧的更新"""
while True:
# 批量取事件,减少 Redis 调用次数
items = await self.redis.rpop(f"events:{topic}", batch_size)
if not items:
await asyncio.sleep(0.5) # 没有事件就等一会
continue
for item in items:
try:
payload = json.loads(item)
await handler(payload)
except Exception as e:
# 处理失败的事件写到死信队列
await self.redis.lpush(
f"events:dead_letter:{topic}",
json.dumps({"payload": json.loads(item), "error": str(e)})
)
# 读侧的事件处理器:负责更新向量库
class ReadSideProjection:
def __init__(self, qdrant_client, embedding_service, db):
self.qdrant = qdrant_client
self.embedding = embedding_service
self.db = db
async def on_document_ingested(self, event: dict):
"""处理文档摄入事件,更新向量库"""
doc_id = event['document_id']
content = event['content']
# 分块
chunks = self._split_chunks(content)
# 批量生成 embedding
texts = [c['text'] for c in chunks]
embeddings = await self.embedding.batch_embed(texts)
# 写入向量库
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = f"{doc_id}_chunk_{i}"
points.append({
"id": chunk_id,
"vector": embedding,
"payload": {
"document_id": doc_id,
"chunk_index": i,
"chunk_id": chunk_id
}
})
await self.qdrant.upsert(
collection_name="documents",
points=points
)
# 更新数据库里的 chunk 状态
await self.db.execute(
"UPDATE documents SET status = 'active' WHERE id = $1",
doc_id
)
def _split_chunks(self, content: str, chunk_size: int = 500) -> list:
# 简单按字数切块,实际项目里用更智能的切块策略
words = content.split()
chunks = []
for i in range(0, len(words), chunk_size):
chunk_words = words[i:i + chunk_size]
chunks.append({"text": " ".join(chunk_words), "index": i // chunk_size})
return chunks效果对比
实施 CQRS 后,同样的并发压力测试结果:
指标 CQRS 前 CQRS 后
----------------------------------------------
AI 查询 P99 延迟 3500ms 620ms
AI 查询 P50 延迟 800ms 280ms
知识库更新耗时 同步阻塞 异步,用户感知 <50ms
写操作对读操作影响 严重互相干扰 完全隔离
峰值 QPS(查询) 约 50 约 300最大的改变是:写操作(知识库更新)完全不再影响读性能了。用户上传文档,系统立刻返回"正在处理",然后异步完成 embedding 和向量写入。读侧只服务检索请求,专注低延迟。
代价和注意事项
CQRS 不是银弹,有明确的代价:
1. 读写不强一致。 用户更新知识库,AI 不会立刻感知到新内容,有 10-30 秒的延迟。对大多数 AI 应用这是可以接受的,但如果你的场景要求"写完立刻能查到",CQRS 不适合。
2. 架构复杂度提升。 事件总线、读侧投影、死信队列,多了很多需要运维的组件。团队规模小的时候要慎重权衡。
3. 调试更难。 出了问题,追链路要同时看写侧日志、事件总线、读侧投影,比单体应用复杂得多。
我的建议是:并发低于 50 QPS 的 AI 应用,先别搞 CQRS,简单的读写分离(主从复制)就够了。并发超过 100 QPS,且有明显的读写资源争用,再考虑 CQRS。
架构设计要解决真实问题,不是炫技。
