知识库文档更新策略——增量更新 vs 全量重建
知识库文档更新策略——增量更新 vs 全量重建
适读人群:做RAG系统的AI工程师、技术架构师 | 阅读时长:约13分钟 | 核心价值:掌握大规模知识库文档更新的工程策略,避免生产环境踩坑
去年有段时间,我们的知识库更新任务隔三差五就出问题。
那是一个内容平台的文章知识库,大概 12 万篇文档,每天新增和修改的文档大约 2000-3000 篇。我们用的是全量重建策略——每天凌晨 2 点,把整个向量索引删掉重建一遍。
这个方案在文档量刚过 3 万的时候跑得很好,重建一次大概 40 分钟,凌晨 2 点开始,4 点结束,用户完全感觉不到。
然后文档量涨到 12 万了。重建时间变成了 4-5 个小时。凌晨 2 点开始,早上 7 点还没跑完,早高峰的用户查询到的是昨天的老索引,新文档的内容完全找不到。
更糟的是,有时候重建任务到一半出错了(Embedding API 超时、磁盘写满……各种原因),索引就坏了,整个知识库不可用,要手动干预恢复。
这才逼着我系统想了一下文档更新策略这件事。
先理清两种策略的本质差异
全量重建:把现有的向量索引全部清空,然后对所有文档重新做 Embedding,重新写入索引。
增量更新:只处理有变化的文档(新增的、修改的、删除的),已经在索引里且没有变化的文档不动。
表面上看,增量更新明显更优——处理量小、时间短、风险低。但实际上没那么简单,否则我不会在全量重建上用了那么久。
全量重建的问题
时间复杂度是线性的:文档量翻倍,重建时间也翻倍(近似)。12 万文档重建 5 小时,120 万文档就要 50 小时,完全不可接受。
重建期间的可用性问题:重建过程中,旧索引被删了,新索引还没建好,或者你用了「先建新的再切换」的策略,那在这段时间里需要双份存储空间。
重建失败的恢复成本高:一旦重建中途失败,没有好的恢复点,只能从头来。
但全量重建也有优点:
实现简单:不需要追踪哪些文档变了,不需要处理各种边界情况(文档分块发生了变化怎么办、同一文档分了不同块怎么对齐……)。
索引状态一定是一致的:重建完毕之后,索引里的内容和文档库是完全对齐的,没有「漏更新了某个文档」的风险。
Embedding 模型升级时别无选择:如果你换了一个新的 Embedding 模型,旧的向量和新的向量不能混在一个索引里,必须全量重建。
增量更新的坑
增量更新听起来简单,但工程实现上有几个真实的坑。
坑一:文档变更追踪
你怎么知道哪些文档变了?
最简单的方式是记录每个文档的最后修改时间(updated_at),每次增量更新时找出 updated_at > 上次更新时间 的文档。
但这个有问题:如果文档系统的 updated_at 不可靠(比如有些修改没有更新时间戳),你就会漏掉一些变更。我在一个项目里遇到过,某类文档批量迁移的时候 updated_at 没有更新,导致那批文档的旧向量留在索引里很久。
更可靠的方式是对文档内容做哈希(MD5 或 SHA256),把哈希值存起来,每次比较哈希来判断内容是否变化。
import hashlib
import json
def compute_doc_hash(doc_content: str, doc_metadata: dict) -> str:
"""
计算文档的哈希,基于内容+关键元数据
注意:只包含影响索引的字段,不要包含更新时间本身
"""
hash_source = {
"content": doc_content,
"title": doc_metadata.get("title", ""),
"category": doc_metadata.get("category", "")
# 不包含 updated_at,避免无意义的内容没变但时间变了导致重新索引
}
content_str = json.dumps(hash_source, sort_keys=True, ensure_ascii=False)
return hashlib.md5(content_str.encode()).hexdigest()坑二:文档删除处理
新增和修改很好处理,删除是最容易被遗忘的。
如果一篇文档在源系统里被删除了,但你的增量更新只处理「新增和修改」,那这篇文档的向量会永远留在索引里,变成僵尸数据。
我见过的最典型的问题:产品下架了,文档删除了,但知识库里还有它的向量,用户搜索时还会召回它,AI 根据它回答了一些关于已下架产品的信息,导致误导。
删除处理的方案:在文档表里记录「软删除」标志,增量更新时除了处理新增和修改,还要同步处理软删除的文档——从索引里删掉对应的向量。
坑三:分块 ID 的稳定性
大文档通常会被切成多个块(Chunk)再做 Embedding。如果你的分块策略是基于字符位置或者段落编号,那文档内容稍微变化(比如在开头加了一段),所有后面的块的 ID 都会变,等于整篇文档的所有块都要重新索引,和全量重建没什么区别。
更好的方案是给每个块一个稳定的 ID,基于文档 ID + 块的内容哈希:
def generate_chunk_id(doc_id: str, chunk_content: str) -> str:
"""
生成稳定的 Chunk ID
基于文档ID + 块内容的哈希
只有块内容真的变了,ID 才会变
"""
content_hash = hashlib.md5(chunk_content.encode()).hexdigest()[:8]
return f"{doc_id}_{content_hash}"这样,文档内某个段落变了,只有那个段落对应的块 ID 变了,其他没变的块不需要重新索引。
坑四:索引一致性问题
增量更新的过程中,索引处于「部分更新」状态——有些文档已经更新了,有些还没更新。如果在这个中间状态下有用户查询,可能会出现一篇文档的旧版本和新版本内容同时在索引里的情况(更新过程中旧块还没删、新块已经写入了)。
解决方案:先写新块,再删旧块。这样的最坏情况是有段时间旧块和新块都存在,用户可能召回旧的内容,但不会出现文档部分消失的情况。
def update_doc_in_index(vectorstore, doc_id: str, new_chunks: list, old_chunk_ids: list):
"""
安全的文档更新流程:先写新块,再删旧块
"""
# 步骤1:写入新块(此时旧块仍然存在)
vectorstore.add_texts(
texts=[chunk.content for chunk in new_chunks],
metadatas=[chunk.metadata for chunk in new_chunks],
ids=[chunk.id for chunk in new_chunks]
)
# 步骤2:删除旧块
vectorstore.delete(ids=old_chunk_ids)
# 如果步骤2失败,只是留下了旧的冗余数据,不影响新数据的正确性
# 可以通过定期的清理任务来清除这类僵尸数据我在 12 万文档知识库里的策略演进
说完理论,说一下我们在那个 12 万文档项目里的实际演进过程。
阶段一:纯全量重建(文档量 < 3 万)
每天凌晨全量重建,简单可靠,运行了大约 8 个月。
阶段二:增量更新 + 每周一次全量重建(文档量 3-12 万)
触发原因:全量重建时间从 40 分钟涨到 2.5 小时,开始影响早高峰。
方案:工作日每天增量更新(只处理过去 24 小时有变化的文档),周日凌晨做一次全量重建(保证索引和文档库的对齐,清除可能的增量更新累积误差)。
效果:工作日的更新任务从 2.5 小时降到 15-30 分钟。
阶段三:纯增量更新 + 定期对齐检查(文档量 12 万+)
触发原因:全量重建时间涨到 5 小时,周日凌晨也跑不完了。
方案:
- 完全切换到增量更新(基于文档哈希追踪变更)
- 引入「对齐检查」任务:每周运行一次,对比文档库里的所有文档哈希和索引记录的哈希,找出不一致的文档,做针对性修复
- Embedding 模型升级时,单独做一次全量重建,选在业务低谷期(深夜)分批完成
def run_alignment_check(doc_store, vectorstore, index_meta_store):
"""
对齐检查:找出文档库和向量索引不一致的文档
返回需要重新索引的文档列表
"""
misaligned = []
# 遍历文档库里所有文档(只取 ID 和哈希,不取内容,快)
for doc_id, doc_hash in doc_store.iter_doc_hashes():
indexed_hash = index_meta_store.get_hash(doc_id)
if indexed_hash is None:
# 文档存在但没被索引(可能是增量更新漏了)
misaligned.append(("missing", doc_id))
elif indexed_hash != doc_hash:
# 文档内容变了但索引没更新
misaligned.append(("outdated", doc_id))
# 检查索引里有没有已被删除的文档
for doc_id in index_meta_store.iter_indexed_doc_ids():
if not doc_store.exists(doc_id):
misaligned.append(("zombie", doc_id))
return misaligned这个「对齐检查」是关键——它让我不再依赖全量重建来保证一致性,而是有了一个独立的一致性验证机制。
什么时候必须全量重建
即使你建立了完善的增量更新机制,以下几种情况必须全量重建:
换 Embedding 模型:不同模型的向量空间不兼容,没有任何增量方案能解决这个问题。必须全量重建。建议这类操作建一个新索引,迁移完毕验证正确后再切流量,不要在线更新。
修改分块策略:如果你决定把块的大小从 500 tokens 改成 1000 tokens,或者修改了分块的重叠逻辑,那所有文档的分块都变了,必须全量重建。
索引文件损坏:有时候磁盘问题或者 bug 会导致索引文件损坏,只能重建。
积累了大量僵尸数据:如果因为历史原因索引里积累了大量无效向量(比如几十万条),影响了检索质量,彻底清理比增量修复更简单。
一个完整的更新系统设计
最后给一个我现在用的文档更新系统的核心结构:
class KnowledgeBaseUpdater:
def __init__(self, doc_store, vectorstore, index_meta_store):
self.doc_store = doc_store
self.vectorstore = vectorstore
self.meta_store = index_meta_store # 存储每个文档的索引状态和哈希
def run_incremental_update(self, since: datetime):
"""增量更新:处理自 since 以来有变化的文档"""
changed_docs = self.doc_store.get_changed_since(since)
for doc in changed_docs:
current_hash = compute_doc_hash(doc.content, doc.metadata)
indexed_hash = self.meta_store.get_hash(doc.id)
if current_hash == indexed_hash:
continue # 内容没变,跳过
# 重新分块
new_chunks = chunk_document(doc)
# 获取旧的块 ID
old_chunk_ids = self.meta_store.get_chunk_ids(doc.id)
# 安全更新
update_doc_in_index(self.vectorstore, doc.id, new_chunks, old_chunk_ids)
# 更新元数据
self.meta_store.update(
doc_id=doc.id,
hash=current_hash,
chunk_ids=[c.id for c in new_chunks],
updated_at=datetime.now()
)
# 处理删除
deleted_docs = self.doc_store.get_deleted_since(since)
for doc_id in deleted_docs:
old_chunk_ids = self.meta_store.get_chunk_ids(doc_id)
self.vectorstore.delete(ids=old_chunk_ids)
self.meta_store.delete(doc_id)
def run_alignment_check(self):
"""一致性检查,找出并修复不一致"""
misaligned = run_alignment_check(
self.doc_store, self.vectorstore, self.meta_store
)
for issue_type, doc_id in misaligned:
if issue_type in ("missing", "outdated"):
doc = self.doc_store.get(doc_id)
# 重新索引
self._reindex_doc(doc)
elif issue_type == "zombie":
# 删除索引里的僵尸数据
old_chunk_ids = self.meta_store.get_chunk_ids(doc_id)
self.vectorstore.delete(ids=old_chunk_ids)
self.meta_store.delete(doc_id)
return len(misaligned)这个设计的核心思路是:增量更新保证「新鲜性」,对齐检查保证「一致性」,两者职责分开。
文档更新策略是一个很容易被低估的工程问题。在文档量小的时候怎么做都行,但规模上去了以后,选错了策略就是每天的运维噩梦。早点建立合理的架构,比出了问题再重构要省力得多。
