知识库的增量更新策略——文档改了,不想重建整个索引
知识库的增量更新策略——文档改了,不想重建整个索引
我们有个客户,做法律合规咨询的,知识库里放了大概八千份法律文件——法规、司法解释、部门规章、地方性法规。
这类文件的更新频率出乎意料的高。国家每年新出台几百份规范性文件,现有的文件也在持续修订。他们的文档更新是每天都有的,有时候一天更新十几份。
最开始,他们每周重建一次全量索引。一次全量重建要跑 6-8 小时,跑的时候知识库的查询质量下降(索引一半旧一半新),要占用大量计算资源。
他们找我的时候,说了一句让我印象很深的话:"我们不是在做模型训练,是在维护一个知识库,凭什么每次改一个文件要把所有文件重处理一遍?"
这个问题问得很好。凭什么?
全量重建的问题到底出在哪里
先搞清楚为什么很多团队默认走全量重建。
原因很简单:全量重建最容易实现,没有状态管理,不需要处理"哪些变了哪些没变"的问题。每次直接清库、重新 Embedding、写入,代码逻辑简单,不容易出错。
但代价是:
- 时间代价:文档越多,重建时间越长,线性增长
- 资源代价:Embedding 调用按 Token 计费,全量重建意味着把没变的文档反复重复计费
- 可用性代价:重建期间服务质量下降或不可用
- 频率瓶颈:重建耗时越长,更新频率上限越低
对于文档更新频繁的场景,这几个代价都是无法接受的。
增量更新的核心思路
增量更新要解决一个核心问题:怎么知道哪些文档变了,哪些没变?
这听起来简单,但实际上有几个坑:
坑 1:文件名一样,内容变了
最常见的情况。文件名叫"员工手册-2024版.pdf",但上个月刚悄悄修订了第三章。如果只看文件名,会认为没变,漏掉了更新。
坑 2:内容轻微变化,但 Embedding 不需要重建
有些文件修改的是页眉、页脚、日期,实质内容没变。这种变更没必要重建向量,只需要更新 metadata。
坑 3:一对多关系——一个文件切了多个块
文件更新了,要把原来这个文件的所有文档块都找到并删除,然后用新版本重新切块写入。不能只更新一部分块,否则新旧内容混在一起,答案会错乱。
解决核心问题的方法是:内容哈希(Content Hash)。
对文档内容(不是文件名、不是文件时间)计算哈希值,用哈希值来判断内容是否真的发生了变化。
文档版本管理的数据模型
@Entity
@Table(name = "document_version")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DocumentVersion {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 文档的业务唯一标识(相对路径或文档ID)
*/
@Column(nullable = false, unique = true)
private String documentKey;
/**
* 文档内容的 SHA-256 哈希值
* 只有内容真正变化时哈希才会变
*/
@Column(nullable = false)
private String contentHash;
/**
* 文档的元数据哈希(文件名、标签等)
* 元数据变化时只更新 metadata,不重建向量
*/
@Column(nullable = false)
private String metadataHash;
/**
* 向量数据库中对应块的 ID 列表
*/
@ElementCollection
@CollectionTable(name = "document_chunk_ids")
private List<String> vectorChunkIds;
/**
* 最后一次成功向量化的时间
*/
private LocalDateTime lastIndexedAt;
/**
* 索引状态
*/
@Enumerated(EnumType.STRING)
private IndexStatus indexStatus;
/**
* 文件大小(字节),用于快速初步筛选
*/
private long fileSizeBytes;
public enum IndexStatus {
PENDING, // 待处理
INDEXED, // 已索引
UPDATING, // 更新中
FAILED, // 失败
DELETED // 已删除
}
}@Repository
public interface DocumentVersionRepository extends JpaRepository<DocumentVersion, Long> {
Optional<DocumentVersion> findByDocumentKey(String documentKey);
List<DocumentVersion> findByIndexStatus(DocumentVersion.IndexStatus status);
List<DocumentVersion> findByLastIndexedAtBefore(LocalDateTime threshold);
}变更检测服务
@Service
@Slf4j
public class DocumentChangeDetector {
private final DocumentVersionRepository versionRepo;
/**
* 扫描文档目录,找出需要处理的变更
*/
public DocumentChangeSummary detectChanges(Path documentRoot) {
log.info("Scanning document root: {}", documentRoot);
List<DocumentChange> changes = new ArrayList<>();
try (Stream<Path> files = Files.walk(documentRoot)) {
files.filter(Files::isRegularFile)
.filter(this::isSupportedFormat)
.forEach(filePath -> {
DocumentChange change = checkFile(filePath, documentRoot);
if (change.getChangeType() != ChangeType.UNCHANGED) {
changes.add(change);
}
});
} catch (IOException e) {
throw new RuntimeException("Failed to scan document root", e);
}
// 找出已删除的文件(在 DB 里有记录,但文件不存在了)
List<DocumentChange> deletions = detectDeletions(documentRoot);
changes.addAll(deletions);
DocumentChangeSummary summary = new DocumentChangeSummary(changes);
log.info("Change detection complete: {} new, {} updated, {} deleted, {} metadata-only",
summary.getNewCount(), summary.getUpdatedCount(),
summary.getDeletedCount(), summary.getMetadataOnlyCount());
return summary;
}
private DocumentChange checkFile(Path filePath, Path root) {
String documentKey = root.relativize(filePath).toString();
try {
// 计算内容哈希
String contentHash = computeContentHash(filePath);
String metadataHash = computeMetadataHash(filePath);
Optional<DocumentVersion> existing = versionRepo.findByDocumentKey(documentKey);
if (existing.isEmpty()) {
return DocumentChange.newFile(documentKey, filePath, contentHash, metadataHash);
}
DocumentVersion version = existing.get();
if (!version.getContentHash().equals(contentHash)) {
// 内容真的变了,需要重建向量
return DocumentChange.contentChanged(documentKey, filePath, contentHash, metadataHash, version);
}
if (!version.getMetadataHash().equals(metadataHash)) {
// 只有元数据变化,只需要更新 metadata,不重建向量
return DocumentChange.metadataOnly(documentKey, filePath, metadataHash, version);
}
return DocumentChange.unchanged(documentKey);
} catch (Exception e) {
log.error("Error checking file: {}", filePath, e);
return DocumentChange.error(documentKey, filePath, e.getMessage());
}
}
/**
* 计算文件内容的 SHA-256 哈希
* 注意:对于 PDF、Word 等格式,要先提取文本再计算,
* 因为 PDF 的二进制内容可能因为保存工具不同而不同,但文本内容相同
*/
private String computeContentHash(Path filePath) throws Exception {
String extension = getExtension(filePath.toString());
String textContent;
switch (extension.toLowerCase()) {
case "pdf":
textContent = extractPdfText(filePath);
break;
case "docx":
textContent = extractDocxText(filePath);
break;
case "txt":
case "md":
textContent = Files.readString(filePath, StandardCharsets.UTF_8);
break;
default:
// 对于不支持的格式,直接用文件字节
textContent = Arrays.toString(Files.readAllBytes(filePath));
}
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = digest.digest(textContent.getBytes(StandardCharsets.UTF_8));
return HexFormat.of().formatHex(hashBytes);
}
private String computeMetadataHash(Path filePath) throws Exception {
// 元数据包括:文件名、所在目录(对应 category)、文件大小
String metadata = filePath.getFileName() + "|" +
filePath.getParent() + "|" +
Files.size(filePath);
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = digest.digest(metadata.getBytes(StandardCharsets.UTF_8));
return HexFormat.of().formatHex(hashBytes).substring(0, 16); // 只取前16位
}
private boolean isSupportedFormat(Path path) {
String name = path.getFileName().toString().toLowerCase();
return name.endsWith(".pdf") || name.endsWith(".docx") ||
name.endsWith(".txt") || name.endsWith(".md");
}
}增量更新执行引擎
@Service
@Slf4j
@Transactional
public class IncrementalIndexingService {
private final DocumentChangeDetector changeDetector;
private final DocumentVersionRepository versionRepo;
private final VectorStore vectorStore;
private final DocumentSplitter splitter;
private final EmbeddingClient embeddingClient;
// 批量写入大小,避免向量数据库单次插入太多
private static final int BATCH_SIZE = 50;
public IncrementalUpdateResult performIncrementalUpdate(Path documentRoot) {
long startTime = System.currentTimeMillis();
log.info("Starting incremental update for: {}", documentRoot);
// Step 1: 检测变更
DocumentChangeSummary changes = changeDetector.detectChanges(documentRoot);
if (changes.isEmpty()) {
log.info("No changes detected, skipping update");
return IncrementalUpdateResult.noChanges();
}
IncrementalUpdateResult result = new IncrementalUpdateResult();
// Step 2: 处理删除
for (DocumentChange deletion : changes.getDeletions()) {
processDeletedDocument(deletion, result);
}
// Step 3: 处理新增文件
for (DocumentChange newFile : changes.getNewFiles()) {
processNewDocument(newFile, result);
}
// Step 4: 处理内容更新
for (DocumentChange updated : changes.getContentUpdated()) {
processUpdatedDocument(updated, result);
}
// Step 5: 处理仅元数据更新
for (DocumentChange metaOnly : changes.getMetadataOnly()) {
processMetadataUpdate(metaOnly, result);
}
long elapsed = System.currentTimeMillis() - startTime;
result.setElapsedMs(elapsed);
log.info("Incremental update complete in {}ms: {} new, {} updated, {} deleted, {} metadata",
elapsed, result.getNewCount(), result.getUpdatedCount(),
result.getDeletedCount(), result.getMetadataOnlyCount());
return result;
}
private void processDeletedDocument(DocumentChange deletion, IncrementalUpdateResult result) {
try {
Optional<DocumentVersion> version = versionRepo.findByDocumentKey(deletion.getDocumentKey());
if (version.isPresent()) {
// 从向量数据库删除所有相关块
vectorStore.delete(version.get().getVectorChunkIds());
// 标记为已删除(保留记录,用于审计)
version.get().setIndexStatus(DocumentVersion.IndexStatus.DELETED);
versionRepo.save(version.get());
log.info("Deleted {} chunks for document: {}",
version.get().getVectorChunkIds().size(), deletion.getDocumentKey());
}
result.incrementDeleted();
} catch (Exception e) {
log.error("Failed to delete document: {}", deletion.getDocumentKey(), e);
result.addError(deletion.getDocumentKey(), e.getMessage());
}
}
private void processNewDocument(DocumentChange newFile, IncrementalUpdateResult result) {
try {
// 标记为处理中
DocumentVersion version = DocumentVersion.builder()
.documentKey(newFile.getDocumentKey())
.contentHash(newFile.getNewContentHash())
.metadataHash(newFile.getNewMetadataHash())
.indexStatus(DocumentVersion.IndexStatus.UPDATING)
.build();
versionRepo.save(version);
// 处理并写入向量数据库
List<String> chunkIds = indexDocument(newFile.getFilePath(), newFile.getDocumentKey());
// 更新版本记录
version.setVectorChunkIds(chunkIds);
version.setLastIndexedAt(LocalDateTime.now());
version.setIndexStatus(DocumentVersion.IndexStatus.INDEXED);
versionRepo.save(version);
result.incrementNew();
} catch (Exception e) {
log.error("Failed to index new document: {}", newFile.getDocumentKey(), e);
result.addError(newFile.getDocumentKey(), e.getMessage());
}
}
private void processUpdatedDocument(DocumentChange updated, IncrementalUpdateResult result) {
try {
DocumentVersion version = updated.getExistingVersion();
// 先标记为更新中(防止更新期间旧数据被查到但新数据还没写完)
version.setIndexStatus(DocumentVersion.IndexStatus.UPDATING);
versionRepo.save(version);
// 删除旧的向量块
if (version.getVectorChunkIds() != null && !version.getVectorChunkIds().isEmpty()) {
vectorStore.delete(version.getVectorChunkIds());
log.debug("Deleted {} old chunks for update: {}",
version.getVectorChunkIds().size(), updated.getDocumentKey());
}
// 用新内容重建向量块
List<String> newChunkIds = indexDocument(updated.getFilePath(), updated.getDocumentKey());
// 更新版本记录
version.setContentHash(updated.getNewContentHash());
version.setMetadataHash(updated.getNewMetadataHash());
version.setVectorChunkIds(newChunkIds);
version.setLastIndexedAt(LocalDateTime.now());
version.setIndexStatus(DocumentVersion.IndexStatus.INDEXED);
versionRepo.save(version);
result.incrementUpdated();
} catch (Exception e) {
log.error("Failed to update document: {}", updated.getDocumentKey(), e);
// 更新失败,尝试恢复旧数据(实际上此时旧数据已被删除,这里只是标记状态)
if (updated.getExistingVersion() != null) {
updated.getExistingVersion().setIndexStatus(DocumentVersion.IndexStatus.FAILED);
versionRepo.save(updated.getExistingVersion());
}
result.addError(updated.getDocumentKey(), e.getMessage());
}
}
private void processMetadataUpdate(DocumentChange metaOnly, IncrementalUpdateResult result) {
// 只更新 metadata,不重建向量
try {
DocumentVersion version = metaOnly.getExistingVersion();
// 更新向量数据库中的 metadata(通过 Milvus 的 update 操作)
updateVectorMetadata(version.getVectorChunkIds(), metaOnly);
version.setMetadataHash(metaOnly.getNewMetadataHash());
version.setLastIndexedAt(LocalDateTime.now());
versionRepo.save(version);
result.incrementMetadataOnly();
} catch (Exception e) {
log.error("Failed to update metadata: {}", metaOnly.getDocumentKey(), e);
result.addError(metaOnly.getDocumentKey(), e.getMessage());
}
}
/**
* 文档向量化的核心流程
*/
private List<String> indexDocument(Path filePath, String documentKey) throws Exception {
// 1. 解析文档
String rawText = parseDocument(filePath);
// 2. 分块
Document rawDoc = new Document(documentKey, rawText,
Map.of("source_file", filePath.getFileName().toString(),
"document_key", documentKey));
List<Document> chunks = splitter.apply(List.of(rawDoc));
// 3. 为每个 chunk 生成唯一 ID
List<Document> chunksWithIds = IntStream.range(0, chunks.size())
.mapToObj(i -> {
String chunkId = documentKey.hashCode() + "_chunk_" + i;
Map<String, Object> metadata = new HashMap<>(chunks.get(i).getMetadata());
metadata.put("chunk_index", i);
metadata.put("total_chunks", chunks.size());
return new Document(chunkId, chunks.get(i).getContent(), metadata);
})
.collect(Collectors.toList());
// 4. 分批写入向量数据库
List<String> chunkIds = new ArrayList<>();
for (int i = 0; i < chunksWithIds.size(); i += BATCH_SIZE) {
List<Document> batch = chunksWithIds.subList(i,
Math.min(i + BATCH_SIZE, chunksWithIds.size()));
vectorStore.add(batch);
batch.forEach(doc -> chunkIds.add(doc.getId()));
log.debug("Indexed batch {}/{} for document: {}",
(i / BATCH_SIZE) + 1,
(int) Math.ceil((double) chunksWithIds.size() / BATCH_SIZE),
documentKey);
}
return chunkIds;
}
}定时任务调度
@Component
@Slf4j
public class IncrementalUpdateScheduler {
private final IncrementalIndexingService indexingService;
@Value("${knowledge.base.root.path:/data/knowledge-base}")
private String knowledgeBaseRootPath;
// 工作日每天早上 6 点执行
@Scheduled(cron = "0 0 6 * * MON-FRI")
public void dailyIncrementalUpdate() {
log.info("Daily incremental update started");
try {
IncrementalUpdateResult result = indexingService.performIncrementalUpdate(
Path.of(knowledgeBaseRootPath)
);
log.info("Daily incremental update finished: {}", result.getSummary());
} catch (Exception e) {
log.error("Daily incremental update failed", e);
// 发送告警
}
}
// 也可以通过 API 触发手动更新
public void triggerManualUpdate(String subPath) {
Path targetPath = Path.of(knowledgeBaseRootPath, subPath);
log.info("Manual update triggered for: {}", targetPath);
indexingService.performIncrementalUpdate(targetPath);
}
}对比数据
这是我们给那个法律合规客户改造后的实测数据(8000+ 文档,每日约 15 个文档更新):
| 指标 | 全量重建 | 增量更新 | 提升 |
|---|---|---|---|
| 每日更新耗时 | 6-8 小时 | 8-15 分钟 | 97% 下降 |
| Embedding API 费用/天 | ¥380 | ¥12 | 97% 下降 |
| 更新期间服务中断 | 是 | 否 | 完全消除 |
| 更新延迟(文件修改到可查询) | 12-24 小时 | 10-20 分钟 | 95% 下降 |
这个结果说实话超过了我的预期。97% 的成本下降主要是因为每天真正发生内容变化的文档比例很低——8000 份文档里每天改 15 份,增量更新只处理这 15 份,而不是全部 8000 份。
一些实践细节
1. 并发安全
如果有多个进程/线程同时触发增量更新,可能导致同一文档被处理两次。需要用数据库行锁或分布式锁来保护。
2. 失败重试机制
网络问题、向量数据库临时故障等会导致部分文档更新失败。要设计重试机制,FAILED 状态的文档下次更新时会重新处理。
3. 回滚能力
重大文档更新(比如整个规范体系的大版本更新)需要支持回滚。可以在向量数据库里保留版本号,支持指向历史版本的查询。
总结
增量更新的核心是:用内容哈希准确识别变更,只处理真正发生变化的文档。
实现起来比全量重建复杂,但在文档更新频繁的场景下,这个复杂度完全值得。每天节省的 97% 成本和时间,足以证明这个投入。
