向量数据库的数据治理:如何管理海量Embedding的生命周期
2026/10/31大约 6 分钟向量数据库数据治理Embedding生命周期Java
向量数据库的数据治理:如何管理海量Embedding的生命周期
一、"知识库越来越慢,越来越不准"
老陈是一家法律科技公司的首席架构师,他们的AI法律助手上线18个月了。
系统用PGVector存储法律条文的Embedding,一开始效果很好。但随着时间推移,用户开始反馈:
"它给我找到的是2022年的旧法规,那个条文已经被修订了。" "同一个问题,三个月前回答说可以做,现在又说不确定。" "检索越来越慢了,前几天超时了两次。"
老陈打开监控:向量数据库里已经有230万个Document,其中有相当比例是过时的法规、重复的内容、测试数据没有清理的垃圾数据。
没有人管理这些数据。
大家只顾往里添加,从来没有想过清理、更新、版本管理这件事。
这就是向量数据库的数据治理问题。和传统数据库一样,向量数据库里的数据也会变脏、过期、膨胀,需要主动管理。
二、向量数据库数据治理的四大挑战
三、数据新鲜度管理
3.1 给Document加上生命周期元数据
package com.laozhang.vectordb.governance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
/**
* 向量数据库数据新鲜度管理
* 每个文档都有明确的有效期,过期自动清理或标记
*/
@Slf4j
@Service
public class VectorDataFreshnessManager {
private final VectorStore vectorStore;
private final DocumentRepository documentRepository;
public VectorDataFreshnessManager(VectorStore vectorStore,
DocumentRepository documentRepository) {
this.vectorStore = vectorStore;
this.documentRepository = documentRepository;
}
/**
* 添加文档时设置完整的生命周期元数据
* 这是治理的起点:从入库就管理好数据
*/
public void addDocumentWithLifecycle(
String content,
String sourceId,
String documentType,
LocalDate effectiveDate,
LocalDate expiryDate) {
Document doc = new Document(content, Map.of(
"source_id", sourceId,
"document_type", documentType,
"effective_date", effectiveDate.format(DateTimeFormatter.ISO_DATE),
"expiry_date", expiryDate != null
? expiryDate.format(DateTimeFormatter.ISO_DATE)
: "permanent",
"version", "1.0",
"status", "active",
"indexed_at", LocalDate.now().format(DateTimeFormatter.ISO_DATE),
"checksum", computeChecksum(content) // 用于变更检测
));
vectorStore.add(List.of(doc));
log.info("文档已入库: sourceId={}, effectiveDate={}, expiryDate={}",
sourceId, effectiveDate, expiryDate);
}
/**
* 每天凌晨3点执行数据新鲜度检查
*/
@Scheduled(cron = "0 0 3 * * ?")
public void dailyFreshnessCheck() {
log.info("开始每日数据新鲜度检查...");
LocalDate today = LocalDate.now();
// 1. 查找已过期的文档
List<String> expiredDocIds = documentRepository.findExpiredDocuments(today);
log.info("发现过期文档: count={}", expiredDocIds.size());
// 2. 软删除(标记为inactive,不立即物理删除)
if (!expiredDocIds.isEmpty()) {
documentRepository.markAsExpired(expiredDocIds);
log.info("已标记{}个文档为过期", expiredDocIds.size());
}
// 3. 检查"临近过期"的文档(7天内),发出警告
List<String> soonToExpireIds = documentRepository.findSoonToExpireDocuments(today, 7);
if (!soonToExpireIds.isEmpty()) {
log.warn("以下文档将在7天内过期,请考虑更新: count={}", soonToExpireIds.size());
}
log.info("每日数据新鲜度检查完成");
}
/**
* 每周日凌晨4点执行物理清理(软删除超过30天的文档)
*/
@Scheduled(cron = "0 0 4 ? * SUN")
public void weeklyPhysicalCleanup() {
log.info("开始每周物理清理...");
List<String> toDelete = documentRepository.findSoftDeletedOlderThan(30);
if (!toDelete.isEmpty()) {
vectorStore.delete(toDelete);
documentRepository.physicalDelete(toDelete);
log.info("物理删除过期文档: count={}", toDelete.size());
}
}
private String computeChecksum(String content) {
try {
var digest = java.security.MessageDigest.getInstance("MD5");
byte[] hash = digest.digest(content.getBytes(java.nio.charset.StandardCharsets.UTF_8));
return java.util.HexFormat.of().formatHex(hash);
} catch (Exception e) {
return String.valueOf(content.hashCode());
}
}
}3.2 文档更新的增量同步
/**
* 文档变更监听器
* 当原始文档更新时,自动重新生成并替换向量
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class DocumentChangeListener {
private final VectorStore vectorStore;
private final EmbeddingModel embeddingModel;
private final DocumentRepository documentRepository;
/**
* 监听业务数据库的文档变更事件
* 实际项目中通过MQ消费文档变更消息
*/
@EventListener
@Async
public void onDocumentUpdated(DocumentUpdatedEvent event) {
log.info("文档已更新,开始重新索引: sourceId={}", event.getSourceId());
try {
// 1. 查找老版本向量(通过sourceId关联)
List<String> oldDocIds = documentRepository
.findVectorIdsBySourceId(event.getSourceId());
// 2. 删除老版本
if (!oldDocIds.isEmpty()) {
vectorStore.delete(oldDocIds);
log.info("已删除旧版本向量: count={}", oldDocIds.size());
}
// 3. 重新切分文档(如果内容有更新)
List<String> chunks = chunkDocument(event.getNewContent());
// 4. 生成新向量并入库
List<Document> newDocs = chunks.stream()
.map(chunk -> new Document(chunk, Map.of(
"source_id", event.getSourceId(),
"version", event.getNewVersion(),
"updated_at", LocalDate.now().format(DateTimeFormatter.ISO_DATE),
"status", "active"
)))
.toList();
vectorStore.add(newDocs);
log.info("文档重新索引完成: sourceId={}, newChunks={}", event.getSourceId(), newDocs.size());
} catch (Exception e) {
log.error("文档重新索引失败: sourceId={}, error={}", event.getSourceId(), e.getMessage());
}
}
private List<String> chunkDocument(String content) {
// 按段落或固定长度切分,这里简化处理
int chunkSize = 500;
int overlap = 50;
List<String> chunks = new java.util.ArrayList<>();
for (int i = 0; i < content.length(); i += chunkSize - overlap) {
chunks.add(content.substring(i, Math.min(i + chunkSize, content.length())));
}
return chunks;
}
}四、数据去重:消除向量库的噪音
package com.laozhang.vectordb.governance;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.document.Document;
import org.springframework.ai.embedding.EmbeddingModel;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* 向量数据库去重服务
* 定期检测并合并相似度过高的重复文档
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class VectorDeduplicationService {
private final VectorStore vectorStore;
private final EmbeddingModel embeddingModel;
private final DocumentRepository documentRepository;
// 相似度高于此阈值视为重复文档
private static final float DUPLICATE_THRESHOLD = 0.97f;
/**
* 每月1号凌晨2点执行去重检查
*/
@Scheduled(cron = "0 0 2 1 * ?")
public void monthlyDeduplicationCheck() {
log.info("开始月度去重检查...");
// 分批处理,避免内存溢出
int batchSize = 1000;
int offset = 0;
int totalDuplicates = 0;
while (true) {
List<DocumentRecord> batch = documentRepository.findBatch(offset, batchSize);
if (batch.isEmpty()) break;
int duplicatesInBatch = processDeduplicationBatch(batch);
totalDuplicates += duplicatesInBatch;
offset += batchSize;
log.info("去重进度: processed={}, duplicatesFound={}", offset, totalDuplicates);
}
log.info("月度去重完成,共发现并处理{}个重复文档", totalDuplicates);
}
private int processDeduplicationBatch(List<DocumentRecord> records) {
int duplicateCount = 0;
List<String> toDelete = new ArrayList<>();
for (int i = 0; i < records.size(); i++) {
DocumentRecord record = records.get(i);
if (toDelete.contains(record.getId())) continue; // 已标记删除
// 用当前文档作为查询,找出极相似的文档
List<Document> similar = vectorStore.similaritySearch(
org.springframework.ai.vectorstore.SearchRequest
.query(record.getContent())
.withTopK(5)
.withSimilarityThreshold(DUPLICATE_THRESHOLD)
);
// 排除自身,找出真正的重复项
similar.stream()
.filter(doc -> !doc.getId().equals(record.getId()))
.forEach(dup -> {
// 保留较新版本,删除较旧版本
if (shouldKeep(record, dup)) {
toDelete.add(dup.getId());
} else {
toDelete.add(record.getId());
}
});
}
if (!toDelete.isEmpty()) {
vectorStore.delete(toDelete);
duplicateCount += toDelete.size();
log.info("批次去重完成,删除{}个重复文档", toDelete.size());
}
return duplicateCount;
}
/**
* 判断保留哪个版本
* 规则:优先保留版本号更新、更新时间更近的
*/
private boolean shouldKeep(DocumentRecord existing, Document duplicate) {
String existingVersion = (String) existing.getMetadata().getOrDefault("version", "1.0");
String dupVersion = (String) duplicate.getMetadata().getOrDefault("version", "1.0");
return existingVersion.compareTo(dupVersion) >= 0;
}
}五、性能优化:冷热数据分层
六、治理成效数据
老陈完成数据治理系统建设后的效果(6个月后):
| 指标 | 治理前 | 治理后 | 变化 |
|---|---|---|---|
| 向量数据库总记录数 | 230万 | 87万 | -62% |
| 检索P95延迟 | 820ms | 210ms | -74% |
| 检索结果中过期内容比例 | 23% | <1% | -96% |
| 用户反馈"信息过时"投诉 | 每月47条 | 每月3条 | -94% |
| 存储成本 | 基准 | -58% | 节省大量费用 |
数据治理是向量数据库的长期工程,不是一次性任务。你放进向量库的每一条数据,都需要有人负责它从生到死的全过程。
