Spring AI增量更新:文档变更时向量库的高效更新策略
2026/4/30大约 8 分钟
Spring AI增量更新:文档变更时向量库的高效更新策略
适读人群:已经有RAG系统在运营、每天需要更新知识库文档的工程师 阅读时长:约18分钟
小明的知识库更新噩梦
小明做的是一个政策法规问答系统,向量库里存着3万多条政策条款。
系统上线后第一个月,只要有政策更新,他的操作流程是:
- 手动下载新版PDF
- 手动删掉向量库里旧版相关的所有chunk
- 手动分块、手动调Embedding API生成新向量
- 手动插入向量库
每次政策更新,他至少要花3-4个小时做这些。一个月下来,他发现自己花在"维护知识库"上的时间,比写业务代码还多。
更头疼的是,有一次他在删除旧向量时操作失误,把整个文档目录下的所有chunk都删了,结果那个政策文件完全从知识库里消失了,用户反馈问了相关问题没有任何结果。他花了两小时才把数据恢复回来。
他找到我,问:"难道RAG系统的知识库更新只能这么麻烦吗?"
当然不是。文档变更触发增量更新,是RAG系统工程化必须要做的事情。 做好了,知识库可以像代码仓库一样自动同步;做不好,就是小明的噩梦。
增量更新的核心挑战
完整解决方案:文档变更监控 + 增量同步流水线
系统架构
文档版本管理
/**
* 文档版本追踪
* 核心:用文档内容Hash来判断文档是否真正发生了变化
*/
@Entity
@Table(name = "doc_versions")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DocVersion {
@Id
private String docId; // 文档唯一ID(可以是文件路径的hash)
private String docPath; // 文档原始路径/URL
private String docName; // 文档名称
@Column(nullable = false)
private String contentHash; // 文档内容的MD5/SHA256
private Long fileSize;
private String mimeType;
@Column(nullable = false)
private Integer version; // 版本号,每次更新+1
private LocalDateTime firstIndexedAt;
private LocalDateTime lastUpdatedAt;
@Enumerated(EnumType.STRING)
private DocStatus status; // ACTIVE / DELETED / PENDING
// 关联的chunk列表(存chunk ID前缀,便于批量删除)
@Column(name = "chunk_id_prefix")
private String chunkIdPrefix; // 例如:docId + "_v" + version
public enum DocStatus {
ACTIVE, DELETED, PENDING_UPDATE
}
/**
* 判断文档内容是否发生了变化
*/
public boolean hasChanged(String newContentHash) {
return !this.contentHash.equals(newContentHash);
}
/**
* 新chunk的ID前缀(更新时用新前缀,方便区分新旧版本)
*/
public String getNewChunkIdPrefix() {
return docId + "_v" + (version + 1);
}
}核心:增量更新处理器
/**
* 文档增量更新处理器
* 实现"先写新,再删旧"的安全更新策略
*/
@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class DocumentIncrementalUpdater {
private final VectorStore vectorStore;
private final DocVersionRepository docVersionRepo;
private final DocumentParser documentParser;
private final DocumentChunker chunker;
private final EmbeddingModel embeddingModel;
/**
* 处理文档变更事件
* 这是整个增量更新的主流程
*/
public UpdateResult processDocumentChange(DocumentChangeEvent event) {
String docId = event.getDocId();
byte[] docContent = event.getContent();
log.info("处理文档变更: docId={}, source={}", docId, event.getSource());
// 1. 计算内容Hash
String newHash = computeHash(docContent);
// 2. 检查是否真的有变化
DocVersion existingVersion = docVersionRepo.findById(docId).orElse(null);
if (existingVersion != null && !existingVersion.hasChanged(newHash)) {
log.info("文档内容未变化,跳过更新: docId={}", docId);
return UpdateResult.skipped(docId, "内容无变化");
}
// 3. 解析文档
String parsedText = documentParser.parse(docContent, event.getMimeType());
// 4. 分块
List<DocumentChunk> newChunks = chunker.chunk(parsedText, docId);
// 5. 生成新版本的chunk ID前缀
int newVersion = existingVersion != null ? existingVersion.getVersion() + 1 : 1;
String newChunkPrefix = docId + "_v" + newVersion;
// 6. 向量化并插入(先插入新版本)
List<Document> documents = newChunks.stream()
.map(chunk -> Document.builder()
.id(newChunkPrefix + "_" + chunk.getChunkIndex())
.content(chunk.getContent())
.metadata(Map.of(
"docId", docId,
"docName", event.getDocName(),
"version", newVersion,
"chunkIndex", chunk.getChunkIndex(),
"chunkTotal", newChunks.size()
))
.build())
.collect(Collectors.toList());
vectorStore.add(documents);
log.info("新版本向量已写入: docId={}, version={}, chunks={}",
docId, newVersion, documents.size());
// 7. 更新元数据(记录新版本信息)
DocVersion newDocVersion = DocVersion.builder()
.docId(docId)
.docPath(event.getDocPath())
.docName(event.getDocName())
.contentHash(newHash)
.version(newVersion)
.status(DocVersion.DocStatus.ACTIVE)
.chunkIdPrefix(newChunkPrefix)
.lastUpdatedAt(LocalDateTime.now())
.build();
if (existingVersion == null) {
newDocVersion.setFirstIndexedAt(LocalDateTime.now());
} else {
newDocVersion.setFirstIndexedAt(existingVersion.getFirstIndexedAt());
}
docVersionRepo.save(newDocVersion);
// 8. 删除旧版本的向量(关键:在新版本确认写入成功之后再删)
if (existingVersion != null) {
deleteOldVersionChunks(docId, existingVersion.getChunkIdPrefix());
}
log.info("文档更新完成: docId={}, version={} -> {}",
docId, existingVersion != null ? existingVersion.getVersion() : 0, newVersion);
return UpdateResult.updated(docId, newVersion, documents.size());
}
/**
* 处理文档删除
*/
public UpdateResult processDocumentDeletion(String docId) {
DocVersion version = docVersionRepo.findById(docId)
.orElseThrow(() -> new DocumentNotFoundException("文档不存在: " + docId));
// 删除向量
deleteOldVersionChunks(docId, version.getChunkIdPrefix());
// 更新文档状态
version.setStatus(DocVersion.DocStatus.DELETED);
version.setLastUpdatedAt(LocalDateTime.now());
docVersionRepo.save(version);
log.info("文档已删除: docId={}", docId);
return UpdateResult.deleted(docId);
}
/**
* 删除指定版本的所有chunk向量
* 使用chunk ID前缀过滤
*/
private void deleteOldVersionChunks(String docId, String chunkIdPrefix) {
try {
// Milvus支持按ID前缀过滤删除
// 使用filter表达式:id like "prefix%"
vectorStore.delete(Collections.singletonList(
"id like '" + chunkIdPrefix + "%'"
));
log.info("旧版本向量已删除: docId={}, prefix={}", docId, chunkIdPrefix);
} catch (Exception e) {
// 删除失败不影响主流程,记录日志后续清理
log.error("旧版本向量删除失败,需人工处理: docId={}, prefix={}",
docId, chunkIdPrefix, e);
}
}
private String computeHash(byte[] content) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] hash = md.digest(content);
return HexFormat.of().formatHex(hash);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("SHA-256不可用", e);
}
}
}文件系统监听(本地文档)
/**
* 本地文件系统监听
* 文件变更时自动触发向量库更新
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class LocalFileWatcher implements ApplicationRunner {
private final DocumentChangeEventPublisher eventPublisher;
private WatchService watchService;
@Value("${app.knowledge.watch-dir:./knowledge-docs}")
private String watchDir;
@Override
public void run(ApplicationArguments args) throws Exception {
Path watchPath = Paths.get(watchDir);
if (!Files.exists(watchPath)) {
Files.createDirectories(watchPath);
}
watchService = FileSystems.getDefault().newWatchService();
watchPath.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_DELETE
);
log.info("开始监听文档目录: {}", watchDir);
// 异步监听
Thread.ofVirtual().name("file-watcher").start(() -> {
while (true) {
try {
WatchKey key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
Path filename = (Path) event.context();
Path fullPath = watchPath.resolve(filename);
// 只处理支持的文档格式
if (!isSupportedDocument(filename.toString())) continue;
log.info("检测到文件变更: {}, 类型: {}", filename, kind.name());
if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
eventPublisher.publishDeletion(fullPath.toString());
} else {
// 新建或修改,稍等文件写入完成
Thread.sleep(500);
eventPublisher.publishChange(fullPath);
}
}
key.reset();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("文件监听异常", e);
}
}
});
}
private boolean isSupportedDocument(String filename) {
String lower = filename.toLowerCase();
return lower.endsWith(".pdf") || lower.endsWith(".docx") ||
lower.endsWith(".txt") || lower.endsWith(".md");
}
}批量初始化与增量对比
/**
* 批量扫描和增量同步
* 用于初始建库,以及定期全量校验
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class BulkSyncService {
private final DocVersionRepository docVersionRepo;
private final DocumentIncrementalUpdater updater;
/**
* 全量扫描目录,与向量库进行增量对比
* 找出新增/修改/删除的文档,只更新有变化的部分
*/
@Async
public void fullScan(Path directory) throws IOException {
log.info("开始全量扫描: {}", directory);
// 1. 扫描目录,得到当前所有文件
Map<String, String> currentFiles = new HashMap<>();
Files.walk(directory)
.filter(Files::isRegularFile)
.filter(p -> isSupportedDocument(p.toString()))
.forEach(path -> {
try {
String docId = generateDocId(path);
String hash = computeFileHash(path);
currentFiles.put(docId, hash);
} catch (IOException e) {
log.error("计算文件Hash失败: {}", path, e);
}
});
// 2. 获取向量库中已有的文档版本
Map<String, String> indexedFiles = docVersionRepo.findAll().stream()
.filter(v -> v.getStatus() == DocVersion.DocStatus.ACTIVE)
.collect(Collectors.toMap(DocVersion::getDocId, DocVersion::getContentHash));
// 3. 分析差异
Set<String> toAdd = new HashSet<>(currentFiles.keySet());
toAdd.removeAll(indexedFiles.keySet());
Set<String> toDelete = new HashSet<>(indexedFiles.keySet());
toDelete.removeAll(currentFiles.keySet());
Set<String> toUpdate = currentFiles.entrySet().stream()
.filter(e -> indexedFiles.containsKey(e.getKey())
&& !indexedFiles.get(e.getKey()).equals(e.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
log.info("扫描完成: 新增={}, 删除={}, 更新={}",
toAdd.size(), toDelete.size(), toUpdate.size());
// 4. 执行更新(分批处理,避免一次处理太多)
int batchSize = 10;
// 处理新增和更新
Stream.concat(toAdd.stream(), toUpdate.stream())
.collect(Collectors.partitioningBy(id -> id.hashCode() % batchSize == 0))
.values()
.forEach(batch -> batch.forEach(docId -> {
try {
Path docPath = getPathFromDocId(docId, directory);
updater.processDocumentChange(buildChangeEvent(docId, docPath));
} catch (Exception e) {
log.error("文档更新失败: docId={}", docId, e);
}
}));
// 处理删除
toDelete.forEach(docId -> {
try {
updater.processDocumentDeletion(docId);
} catch (Exception e) {
log.error("文档删除失败: docId={}", docId, e);
}
});
log.info("全量同步完成");
}
private String generateDocId(Path path) {
return DigestUtils.md5DigestAsHex(path.toString().getBytes());
}
private String computeFileHash(Path path) throws IOException {
return DigestUtils.md5DigestAsHex(Files.readAllBytes(path));
}
}更新策略选型对比
| 更新策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 删旧插新(同步) | 实现简单 | 删除期间无法查询该文档 | 更新不频繁的场景 |
| 先插新,再删旧 | 无停机,安全 | 短暂内存占用翻倍 | 推荐,生产首选 |
| 版本化存储 | 支持回滚 | 空间占用多 | 需要回滚能力的场景 |
| 影子写入 | 零感知切换 | 实现复杂 | 高可用要求极高的场景 |
踩过的几个坑
坑1:没有Hash比对,每次都全量重建
早期我们没有内容Hash比对,结果定时任务触发全量重建,把没有变化的文档也重新Embedding了一遍,浪费了大量Embedding API费用。
坑2:删除旧向量用"文档名"过滤
用文档名做删除过滤,结果两个子目录下有同名文件,一次删除操作把不该删的向量也清掉了。后来改成用内容Hash生成的唯一docId。
坑3:更新中途服务重启
新向量插入一半,服务挂了,重启后旧向量还在但新向量不完整,两个版本混在向量库里。现在在元数据里加了PENDING_UPDATE状态,重启后自动检测并补全未完成的更新。
