第2387篇:增量更新向量库——实时文档变更同步的工程挑战
大约 6 分钟
第2387篇:增量更新向量库——实时文档变更同步的工程挑战
适读人群:需要实时同步文档变更到向量库的AI工程师 | 阅读时长:约18分钟 | 核心价值:掌握向量库增量更新的完整工程方案,解决实时同步的一致性和性能挑战
做过知识库的人都知道,文档入库只是第一步,更难的是文档更新之后,向量库如何保持同步。
我们遇到过一个真实问题:产品团队在CMS系统里更新了一份产品说明书,改了定价和参数,但向量库里的版本没有及时更新。结果AI助手在接下来的3天里,一直在用旧价格回答用户的报价咨询。
这个问题的根因不是技术难,而是整个文档更新流程里没有触发向量库同步的机制。一旦文档更新和向量库同步解耦了,就会出现这种"悄悄不同步"的情况。
增量更新的核心挑战
/**
* 向量库增量更新面临的挑战
*
* 挑战1:检测变更
* 如何知道哪些文档发生了变化?
* - 轮询:定期扫描所有文档,比较hash,成本高
* - 事件驱动:文档系统发出变更事件,实时处理
*
* 挑战2:处理已分块的文档
* 一个文档可能被切成100个chunk存在向量库里
* 文档更新后,哪些chunk变了?哪些没变?
* 暴力方案:删除所有chunk,重新切片入库
* 精细方案:做文本diff,只更新变化的部分
*
* 挑战3:更新期间的可用性
* 更新一个大文档时,可能需要几秒钟
* 期间用户检索到的是新版本还是旧版本?
*
* 挑战4:更新失败处理
* 网络错误、服务不可用等情况下,如何保证最终一致性?
*/方案一:事件驱动的增量同步
@Service
public class DocumentChangeEventHandler {
private final VectorStore vectorStore;
private final DocumentChunker chunker;
private final EmbeddingModel embeddingModel;
/**
* 监听文档变更事件
* 使用消息队列(Kafka/RocketMQ)保证可靠性
*/
@KafkaListener(topics = "document-changes", groupId = "rag-sync")
public void handleDocumentChange(DocumentChangeEvent event) {
log.info("Received document change: type={}, docId={}",
event.getChangeType(), event.getDocId());
try {
switch (event.getChangeType()) {
case CREATED -> handleDocumentCreated(event);
case UPDATED -> handleDocumentUpdated(event);
case DELETED -> handleDocumentDeleted(event);
case RENAMED -> handleDocumentRenamed(event);
}
} catch (Exception e) {
log.error("Failed to handle document change: {}", event.getDocId(), e);
// 发送到死信队列,稍后重试
deadLetterService.send(event, e);
}
}
/**
* 文档更新:删除旧chunk,添加新chunk
*
* 暴力但可靠。除非文档极大(>100MB),否则这个方案性能足够
*/
private void handleDocumentUpdated(DocumentChangeEvent event) {
String docId = event.getDocId();
// 第一步:删除旧的所有chunk
List<String> oldChunkIds = findChunksByDocId(docId);
if (!oldChunkIds.isEmpty()) {
vectorStore.delete(oldChunkIds);
log.debug("Deleted {} old chunks for doc: {}", oldChunkIds.size(), docId);
}
// 第二步:获取新文档内容
String newContent = event.getNewContent();
if (newContent == null) {
// 内容在事件里没有携带,需要从文档系统获取
newContent = documentService.getContent(docId);
}
// 第三步:切片和向量化
List<TextSegment> segments = chunker.chunk(newContent);
List<Document> newChunks = new ArrayList<>();
for (int i = 0; i < segments.size(); i++) {
Document chunk = Document.builder()
.id(docId + "_chunk_" + i)
.content(segments.get(i).text())
.metadata(Map.of(
"doc_id", docId,
"chunk_index", String.valueOf(i),
"total_chunks", String.valueOf(segments.size()),
"updated_at", LocalDateTime.now().toString(),
"content_hash", calculateHash(segments.get(i).text())
))
.build();
newChunks.add(chunk);
}
vectorStore.add(newChunks);
log.info("Updated {} chunks for doc: {}", newChunks.size(), docId);
// 更新同步状态记录
syncStatusRepository.updateSyncStatus(docId, SyncStatus.SYNCED, LocalDateTime.now());
}
private void handleDocumentDeleted(DocumentChangeEvent event) {
List<String> chunkIds = findChunksByDocId(event.getDocId());
if (!chunkIds.isEmpty()) {
vectorStore.delete(chunkIds);
log.info("Deleted {} chunks for deleted doc: {}", chunkIds.size(), event.getDocId());
}
syncStatusRepository.markAsDeleted(event.getDocId());
}
}精细化的chunk级别Diff
对于大型文档,可以做更精细的chunk级别diff,避免重建所有chunk:
@Service
public class ChunkDiffUpdater {
/**
* 基于内容hash的智能diff更新
*
* 原理:
* 对旧文档的每个chunk计算hash
* 对新文档的每个chunk计算hash
* 只更新hash不同的chunk
*/
public ChunkDiffResult diffAndUpdate(String docId, String newContent) {
// 获取旧版本的chunk信息(hash列表)
List<ChunkInfo> oldChunks = getChunkInfoByDocId(docId);
// 对新内容切片
List<TextSegment> newSegments = chunker.chunk(newContent);
List<ChunkInfo> newChunks = newSegments.stream()
.map(seg -> ChunkInfo.builder()
.content(seg.text())
.hash(calculateHash(seg.text()))
.build())
.collect(Collectors.toList());
// 计算diff
DiffResult diff = calculateDiff(oldChunks, newChunks);
// 执行更新(只处理有变化的chunk)
int added = 0, deleted = 0, unchanged = 0;
// 删除已移除的chunk
if (!diff.getDeletedChunkIds().isEmpty()) {
vectorStore.delete(diff.getDeletedChunkIds());
deleted = diff.getDeletedChunkIds().size();
}
// 添加新增的chunk
if (!diff.getAddedChunks().isEmpty()) {
List<Document> newDocs = diff.getAddedChunks().stream()
.map(c -> chunkToDocument(docId, c))
.collect(Collectors.toList());
vectorStore.add(newDocs);
added = newDocs.size();
}
unchanged = oldChunks.size() - deleted;
log.info("Doc {} diff result: {} added, {} deleted, {} unchanged",
docId, added, deleted, unchanged);
return ChunkDiffResult.builder()
.docId(docId)
.addedChunks(added)
.deletedChunks(deleted)
.unchangedChunks(unchanged)
.build();
}
/**
* 简单的chunk级别diff
* 基于hash匹配,找出新增和删除的chunk
*/
private DiffResult calculateDiff(List<ChunkInfo> oldChunks, List<ChunkInfo> newChunks) {
Set<String> oldHashes = oldChunks.stream()
.map(ChunkInfo::getHash)
.collect(Collectors.toSet());
Set<String> newHashes = newChunks.stream()
.map(ChunkInfo::getHash)
.collect(Collectors.toSet());
// 新版本中存在但旧版本没有的:新增
List<ChunkInfo> added = newChunks.stream()
.filter(c -> !oldHashes.contains(c.getHash()))
.collect(Collectors.toList());
// 旧版本中存在但新版本没有的:删除
List<String> deletedIds = oldChunks.stream()
.filter(c -> !newHashes.contains(c.getHash()))
.map(ChunkInfo::getId)
.collect(Collectors.toList());
return DiffResult.of(added, deletedIds);
}
}同步状态管理和一致性保证
@Service
public class SyncStateManager {
/**
* 同步状态表:记录每个文档的同步状态
*
* 这个表是实现最终一致性的核心
* 当同步失败时,通过这个表重试
*/
public void markPendingSync(String docId, ChangeType changeType) {
SyncRecord record = SyncRecord.builder()
.docId(docId)
.changeType(changeType)
.status(SyncStatus.PENDING)
.createdAt(LocalDateTime.now())
.retryCount(0)
.build();
syncRecordRepository.save(record);
}
public void markSyncSuccess(String docId) {
syncRecordRepository.findLatestByDocId(docId)
.ifPresent(record -> {
record.setStatus(SyncStatus.SUCCESS);
record.setCompletedAt(LocalDateTime.now());
syncRecordRepository.save(record);
});
}
public void markSyncFailed(String docId, String errorMessage) {
syncRecordRepository.findLatestByDocId(docId)
.ifPresent(record -> {
record.setStatus(SyncStatus.FAILED);
record.setErrorMessage(errorMessage);
record.setRetryCount(record.getRetryCount() + 1);
syncRecordRepository.save(record);
});
}
/**
* 定时重试失败的同步任务
* 实现最终一致性的关键
*/
@Scheduled(fixedDelay = 60000) // 每分钟检查
public void retryFailedSyncs() {
// 找出失败且重试次数少于5次的记录
List<SyncRecord> failedRecords = syncRecordRepository
.findByStatusAndRetryCountLessThan(SyncStatus.FAILED, 5);
for (SyncRecord record : failedRecords) {
try {
// 重新触发同步
DocumentChangeEvent retryEvent = DocumentChangeEvent.builder()
.docId(record.getDocId())
.changeType(record.getChangeType())
.build();
documentChangeEventHandler.handleDocumentChange(retryEvent);
} catch (Exception e) {
log.warn("Retry sync failed for doc: {}, attempt: {}",
record.getDocId(), record.getRetryCount() + 1);
}
}
}
/**
* 一致性校验:定期检查向量库和文档系统是否一致
*/
@Scheduled(cron = "0 0 4 * * *") // 每天凌晨4点
public void consistencyCheck() {
List<String> allDocIds = documentService.getAllDocIds();
List<String> syncedDocIds = getSyncedDocIdsFromVectorStore();
// 找出文档系统有但向量库没有的
Set<String> missingInVector = new HashSet<>(allDocIds);
missingInVector.removeAll(syncedDocIds);
if (!missingInVector.isEmpty()) {
log.warn("Found {} documents in doc system but not in vector store",
missingInVector.size());
// 触发补全同步
for (String missingDocId : missingInVector) {
markPendingSync(missingDocId, ChangeType.CREATED);
}
}
}
}更新期间的可用性策略
@Service
public class GracefulUpdateService {
/**
* 更新期间的可用性保证
*
* 使用"读旧写新"策略:
* 1. 先写入新chunk(加版本标记)
* 2. 确认写入成功后,删除旧chunk
* 3. 期间查询会同时看到新旧版本(可以接受短暂不一致)
*/
public void updateWithGrace(String docId, String newContent) {
String newVersion = UUID.randomUUID().toString();
try {
// 第一步:写入新版本chunk(带版本标记)
List<Document> newChunks = prepareChunks(docId, newContent, newVersion);
vectorStore.add(newChunks);
// 第二步:更新"当前版本"标记
// 检索时会优先使用标记为current的版本
markVersionAsCurrent(docId, newVersion);
// 第三步:延迟删除旧版本chunk(给正在处理中的请求完成的时间)
asyncDeleteOldVersions(docId, newVersion, 5000); // 5秒后删除
} catch (Exception e) {
// 新版本写入失败,清理残留
cleanupFailedVersion(docId, newVersion);
throw e;
}
}
private void asyncDeleteOldVersions(String docId, String keepVersion, long delayMs) {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(delayMs);
List<String> oldChunkIds = findOldVersionChunkIds(docId, keepVersion);
if (!oldChunkIds.isEmpty()) {
vectorStore.delete(oldChunkIds);
}
} catch (Exception e) {
log.error("Failed to delete old chunks for doc: {}", docId, e);
}
});
}
}增量更新向量库看似简单,实际上涉及一致性、可用性、性能的三方权衡。我的建议是:从最简单的"全量重建"开始(删除所有chunk,重新入库),先保证正确性,再根据实际的文档规模和更新频率决定是否需要做更精细的增量方案。大多数场景下,全量重建就够了。
