AI 应用的 Schema 设计——向量和关系型数据的混合存储
AI 应用的 Schema 设计——向量和关系型数据的混合存储
适读人群:有 AI 应用开发经验的后端工程师 | 阅读时长:约15分钟 | 核心价值:搞清楚向量库和关系库同时用时的数据一致性设计
去年我接手一个 RAG 项目,前任开发者留下了一堆烂摊子。知识库文档更新了,向量库里的老向量还在;用户删掉一篇文章,Postgres 里主表记录没了,Qdrant 里的向量点还在幽灵游荡。最惨的一次,AI 引用了一篇三个月前就被删除的内参文件,给客户看到了,差点出事。
那天我坐在椅子上看着这团乱麻,脑子里只有一个念头:这套系统根本没有认真设计过 Schema。
这篇文章就是从那次事故里总结出来的东西。
问题根源:两个世界的数据生命周期不同
关系型数据库和向量数据库本质上是两套完全不同的存储哲学。
关系型数据库(Postgres、MySQL)的核心是:记录业务事实,维护关系约束,支持事务。你创建一条记录、更新它、删除它,数据库会保证一致性。
向量数据库(Qdrant、Weaviate、Pinecone)的核心是:高效检索相似向量。它不关心你的业务逻辑,不管什么外键约束,你扔进去一个向量点,它就给你存着,直到你显式删掉。
两套系统合在一起用的时候,麻烦就来了:
业务操作 关系型数据库 向量数据库
--------- -------------- ----------
插入文档 INSERT upsert 向量
更新文档 UPDATE 需要先删后插(或 upsert)
删除文档 DELETE 需要主动清理向量
批量导入 批量 INSERT 批量 upsert传统关系型的级联删除在向量库里不存在。你 DELETE 掉主表,向量库无动于衷。
我最终用的 Schema 设计
先看关系型这边的表结构:
-- 文档主表
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title VARCHAR(500) NOT NULL,
source_url TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ, -- 软删除
status VARCHAR(20) NOT NULL DEFAULT 'active',
checksum VARCHAR(64) -- 内容 hash,判断是否变化
);
-- 文档分块表
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID NOT NULL REFERENCES documents(id),
chunk_index INT NOT NULL,
content TEXT NOT NULL,
token_count INT,
vector_id VARCHAR(100), -- 向量库里对应的点 ID
vector_status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- 'pending' | 'synced' | 'outdated' | 'deleted'
embedding_model VARCHAR(100), -- 记录用哪个 embedding 模型生成的
embedded_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(document_id, chunk_index)
);
-- 向量同步日志
CREATE TABLE vector_sync_log (
id BIGSERIAL PRIMARY KEY,
chunk_id UUID NOT NULL,
operation VARCHAR(20) NOT NULL, -- 'insert' | 'update' | 'delete'
status VARCHAR(20) NOT NULL DEFAULT 'pending',
error_msg TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
CREATE INDEX idx_chunks_document_id ON document_chunks(document_id);
CREATE INDEX idx_chunks_vector_status ON document_chunks(vector_status);
CREATE INDEX idx_sync_log_status ON vector_sync_log(status, created_at);几个关键设计决策说明一下:
1. vector_id 存在关系型这边,不是向量库那边。
向量库的 ID 策略比较混乱,Qdrant 支持 UUID 或整数,Pinecone 用字符串。把 ID 主导权放在关系库这边,保证 ID 是我们自己生成的 UUID,跨库关联不会出问题。
2. vector_status 字段是核心。
这个字段记录这个 chunk 的向量在向量库里的同步状态。任何修改操作,先把状态改成 outdated,然后异步任务来处理真正的向量更新。这是一个典型的"先标记、后处理"模式。
3. 软删除 + 异步清理。
文档不直接物理删除,先打上 deleted_at 时间戳,然后异步任务去清理向量。好处是给了你一个时间窗口处理失败重试,也方便审计。
数据一致性:两阶段写入
写入文档时,我用的是两阶段策略:
import asyncio
from uuid import uuid4
from datetime import datetime
import hashlib
async def upsert_document(content: str, title: str, doc_id: str = None) -> str:
"""
返回 document_id
"""
# Phase 1: 写关系型数据库(同步)
checksum = hashlib.sha256(content.encode()).hexdigest()
async with db.transaction():
if doc_id:
# 更新场景:检查内容是否真的变了
existing = await db.fetchrow(
"SELECT id, checksum FROM documents WHERE id = $1 AND deleted_at IS NULL",
doc_id
)
if existing and existing['checksum'] == checksum:
# 内容没变,不需要重新 embedding
return doc_id
# 内容变了,把旧的 chunks 标记为 outdated
await db.execute(
"""UPDATE document_chunks
SET vector_status = 'outdated', updated_at = NOW()
WHERE document_id = $1 AND vector_status = 'synced'""",
doc_id
)
else:
doc_id = str(uuid4())
# 写文档主表
await db.execute(
"""INSERT INTO documents (id, title, checksum)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO UPDATE
SET title = $2, checksum = $3, updated_at = NOW()""",
doc_id, title, checksum
)
# 分块并写 chunk 表
chunks = split_into_chunks(content)
chunk_ids = []
for i, chunk_text in enumerate(chunks):
chunk_id = str(uuid4())
vector_id = str(uuid4()) # 提前生成,给向量库用
await db.execute(
"""INSERT INTO document_chunks
(id, document_id, chunk_index, content, vector_id, vector_status)
VALUES ($1, $2, $3, $4, $5, 'pending')""",
chunk_id, doc_id, i, chunk_text, vector_id
)
chunk_ids.append((chunk_id, vector_id, chunk_text))
# 写同步日志
await db.execute(
"""INSERT INTO vector_sync_log (chunk_id, operation)
VALUES ($1, 'insert')""",
chunk_id
)
# Phase 2: 触发异步向量写入(不等结果)
asyncio.create_task(sync_chunks_to_vector_db(chunk_ids, doc_id))
return doc_id
async def sync_chunks_to_vector_db(chunk_ids: list, doc_id: str):
"""异步处理向量写入,带重试"""
for chunk_id, vector_id, text in chunk_ids:
try:
embedding = await get_embedding(text)
# 写入向量库
await qdrant_client.upsert(
collection_name="documents",
points=[{
"id": vector_id,
"vector": embedding,
"payload": {
"chunk_id": chunk_id,
"document_id": doc_id,
}
}]
)
# 更新 chunk 状态
await db.execute(
"""UPDATE document_chunks
SET vector_status = 'synced', embedded_at = NOW()
WHERE id = $1""",
chunk_id
)
# 更新同步日志
await db.execute(
"""UPDATE vector_sync_log
SET status = 'done', processed_at = NOW()
WHERE chunk_id = $1 AND status = 'pending'""",
chunk_id
)
except Exception as e:
await db.execute(
"""UPDATE vector_sync_log
SET status = 'failed', error_msg = $2
WHERE chunk_id = $1 AND status = 'pending'""",
chunk_id, str(e)
)删除操作:最容易出问题的地方
删除是最容易搞出幽灵数据的操作。我的做法:
async def delete_document(doc_id: str):
"""软删除文档,并触发向量清理"""
async with db.transaction():
# 软删除主记录
await db.execute(
"""UPDATE documents
SET deleted_at = NOW(), status = 'deleted'
WHERE id = $1 AND deleted_at IS NULL""",
doc_id
)
# 把所有 chunks 标记为待删除
chunks = await db.fetch(
"""SELECT id, vector_id FROM document_chunks
WHERE document_id = $1 AND vector_status != 'deleted'""",
doc_id
)
for chunk in chunks:
await db.execute(
"""UPDATE document_chunks
SET vector_status = 'outdated'
WHERE id = $1""",
chunk['id']
)
# 写删除日志
await db.execute(
"""INSERT INTO vector_sync_log (chunk_id, operation)
VALUES ($1, 'delete')""",
chunk['id']
)
# 异步清理向量
asyncio.create_task(cleanup_vectors_for_document(doc_id))
async def cleanup_vectors_for_document(doc_id: str):
"""清理某个文档的所有向量"""
chunks = await db.fetch(
"""SELECT id, vector_id FROM document_chunks
WHERE document_id = $1""",
doc_id
)
vector_ids = [c['vector_id'] for c in chunks if c['vector_id']]
if vector_ids:
try:
# 批量删除向量
await qdrant_client.delete(
collection_name="documents",
points_selector={"points": vector_ids}
)
# 批量更新状态
await db.execute(
"""UPDATE document_chunks
SET vector_status = 'deleted'
WHERE document_id = $1""",
doc_id
)
except Exception as e:
# 删除失败,记录日志,等定时任务重试
print(f"Vector cleanup failed for doc {doc_id}: {e}")定时修复任务:兜底保障
就算写了两阶段写入,网络抖动、服务重启都可能导致同步失败。所以我还有一个定时巡检任务:
async def reconcile_vector_sync():
"""定时跑,修复不一致的数据,建议每5分钟跑一次"""
# 1. 处理 pending 超过10分钟的 insert 操作
stale_inserts = await db.fetch(
"""SELECT c.id, c.vector_id, c.content, l.id as log_id
FROM vector_sync_log l
JOIN document_chunks c ON l.chunk_id = c.id
WHERE l.operation = 'insert'
AND l.status = 'pending'
AND l.created_at < NOW() - INTERVAL '10 minutes'
LIMIT 100""",
)
for row in stale_inserts:
try:
embedding = await get_embedding(row['content'])
await qdrant_client.upsert(
collection_name="documents",
points=[{"id": row['vector_id'], "vector": embedding,
"payload": {"chunk_id": row['id']}}]
)
await db.execute(
"UPDATE document_chunks SET vector_status = 'synced' WHERE id = $1",
row['id']
)
await db.execute(
"UPDATE vector_sync_log SET status = 'done', processed_at = NOW() WHERE id = $1",
row['log_id']
)
except Exception as e:
await db.execute(
"UPDATE vector_sync_log SET error_msg = $2 WHERE id = $1",
row['log_id'], str(e)
)
# 2. 处理 pending 的 delete 操作
stale_deletes = await db.fetch(
"""SELECT c.vector_id, l.id as log_id
FROM vector_sync_log l
JOIN document_chunks c ON l.chunk_id = c.id
WHERE l.operation = 'delete'
AND l.status = 'pending'
AND l.created_at < NOW() - INTERVAL '10 minutes'
LIMIT 100"""
)
if stale_deletes:
vector_ids = [r['vector_id'] for r in stale_deletes]
try:
await qdrant_client.delete(
collection_name="documents",
points_selector={"points": vector_ids}
)
for row in stale_deletes:
await db.execute(
"UPDATE vector_sync_log SET status = 'done', processed_at = NOW() WHERE id = $1",
row['log_id']
)
except Exception as e:
print(f"Batch delete failed: {e}")检索时的一致性保障
光存的时候一致还不够,查的时候也要加保护:
async def search_documents(query: str, top_k: int = 5) -> list:
"""带一致性过滤的检索"""
query_embedding = await get_embedding(query)
# 向量检索
results = await qdrant_client.search(
collection_name="documents",
query_vector=query_embedding,
limit=top_k * 2, # 多拿一些,后面要过滤
with_payload=True
)
if not results:
return []
# 从 payload 里取 chunk_id
chunk_ids = [r.payload.get('chunk_id') for r in results if r.payload.get('chunk_id')]
# 去关系型数据库验证这些 chunk 还活着
valid_chunks = await db.fetch(
"""SELECT c.id, c.content, d.title, d.source_url
FROM document_chunks c
JOIN documents d ON c.document_id = d.id
WHERE c.id = ANY($1)
AND c.vector_status = 'synced'
AND d.deleted_at IS NULL
AND d.status = 'active'""",
chunk_ids
)
valid_ids = {str(c['id']) for c in valid_chunks}
chunk_map = {str(c['id']): c for c in valid_chunks}
# 过滤掉已删除或失效的向量结果
filtered = []
for r in results:
cid = r.payload.get('chunk_id')
if cid in valid_ids:
filtered.append({
'content': chunk_map[cid]['content'],
'title': chunk_map[cid]['title'],
'score': r.score,
'source_url': chunk_map[cid]['source_url']
})
return filtered[:top_k]这个函数有个额外的 DB 查询,看起来慢了一点,但能保证你绝对不会把已删除的内容返回给 AI。那次事故之后,我宁愿多一次 IO,也不要再出现 AI 引用幽灵数据这种事。
几个踩过的坑
坑一:向量库的 upsert 不是原子的。
Qdrant 的 upsert 如果中间断了,可能部分成功。所以 vector_status 不能用 upsert 完成后再写,要用 sync_log + 定时修复来保证最终一致。
坑二:Embedding 模型换了,旧向量失效。
我在 document_chunks 里记录了 embedding_model 字段。每次换模型,用这个字段做全量重建的过滤条件,只重建旧模型生成的向量。
坑三:分块策略变了,chunk 数量变了。
文档更新时,老的 chunk 可能有10个,新的只有8个。我的做法是:把文档所有老 chunk 全部标记 outdated,重新生成分块,写入新 chunk,再统一清理旧向量。不要尝试"增量更新" chunk,太容易出 off-by-one 错误。
坑四:向量库里的 ID 类型混用。
Qdrant 的 point ID 只支持 UUID 或 uint64,不支持任意字符串。早期我用了字符串 ID,结果调用时一直报错,查了很久。现在统一用 UUID,两边都支持。
总结
混合存储的核心原则就三条:
- 关系型数据库是主数据,向量库是派生数据。向量的 ID、状态、生命周期都由关系型这边掌控。
- 写操作:先写关系型(同步),再写向量库(异步)。用 sync_log 记录中间状态,定时任务兜底。
- 读操作:向量检索结果要回关系型验证有效性,防止幽灵数据。
架构不复杂,但每个细节都是踩坑踩出来的。如果你现在正在设计一个 RAG 系统,这套 Schema 可以直接拿去用,按需改改字段就行。
