RAG增量更新策略:千万级知识库的实时同步方案
RAG增量更新策略:千万级知识库的实时同步方案
那个让全组人崩溃的"6小时窗口"
陈浩是某头部保险公司的AI平台负责人,他的团队在2025年底上线了一套企业内部知识库系统,底层是标准的RAG架构:文档入库 → 分块 → 向量化 → 存Milvus → 检索问答。
系统上线第一周,用户反馈不错。但问题很快就来了。
保险公司每天要更新大量产品条款、合规文件、内部政策。运营团队每天上传约300篇新文档,另有100篇左右的旧文档需要修订。原来的系统逻辑很简单:每天凌晨2点跑一个全量重建任务,把所有文档重新向量化、重新入库。
全量重建需要多久?6小时。
这意味着从凌晨2点到早上8点,知识库处于"只读模式"——系统还在跑,但数据是昨天的。更严重的是,随着文档量从最初的50万篇增长到如今的1200万篇,全量重建的时间从最初的1.5小时膨胀到了6小时,而且还在持续增长。
"按这个趋势,明年就要跑12小时了,"陈浩在一次内部复盘会上说,"我们的知识库永远落后现实世界6小时,这在保险行业是不可接受的。"
真正让他下决心重构的,是2026年1月的一次重大事故:一份新发布的产品条款更新了理赔规则,但因为恰好在全量重建窗口期,客服AI还在用旧版本回答用户问题,导致3名用户根据错误信息操作,公司被迫赔付并道歉。
陈浩的团队用了整整8周时间,把全量重建改造成了增量更新体系。新系统上线后,文档变更的同步延迟从6小时降到了30秒以内,向量库的查询性能也提升了40%。
这篇文章就是对那套系统的完整复盘。
先说结论(TL;DR)
| 方案 | 同步延迟 | 实现复杂度 | 适用场景 |
|---|---|---|---|
| 全量重建 | 小时级 | 低 | 文档量<10万,更新频率低 |
| 定时增量 | 分钟级 | 低 | 文档量<100万,对实时性要求不高 |
| CDC增量同步 | 秒级 | 中 | 文档存储在关系型数据库 |
| 消息队列驱动 | 毫秒~秒级 | 中 | 文档来源多样,已有消息中间件 |
| 混合架构 | 秒级+兜底 | 高 | 千万级,高可靠要求 |
核心结论:
- 文档量超过100万,必须上增量更新
- CDC方案适合数据库驱动的系统,消息队列方案适合事件驱动架构
- 增量更新的最大难点不是新增,而是修改和删除的一致性处理
- 生产环境必须有版本管理 + 幂等处理机制
全量重建 vs 增量更新:一场不公平的比赛
为什么全量重建看起来简单?
全量重建的逻辑确实简单到令人发指:
// 全量重建的伪代码——看起来很美
public void fullRebuild() {
vectorStore.deleteAll(); // 清空向量库
List<Document> docs = docRepo.findAll(); // 取所有文档
for (Document doc : docs) {
List<Chunk> chunks = splitter.split(doc);
List<float[]> vectors = embedder.embed(chunks);
vectorStore.batchInsert(chunks, vectors);
}
}问题也很直观——这个循环,在1200万文档的情况下,要跑6小时。
增量更新需要解决哪些问题?
增量更新看起来只是"只处理变化的文档",但实际上需要回答三个核心问题:
问题1:如何知道哪些文档变了?
这不是废话。向量数据库里存的是分块后的向量,跟原始文档是多对一的关系。一篇文档可能被切成20个chunk,每个chunk在Milvus里是一条独立记录。当文档被修改时,你需要找到这篇文档对应的所有chunk,并且全部更新。
问题2:修改操作怎么处理?
修改一篇文档,在向量库里不是简单的"update"操作。因为文档内容变了,分块结果可能也变了——原来切成18个chunk,现在可能变成22个chunk,而且每个chunk的内容和向量都不一样。所以"修改"在向量库层面,等价于"先删除旧chunk,再插入新chunk"。
问题3:删除操作怎么保证不遗漏?
如果文档被删除了,但对应的向量没有及时清理,用户可能检索到"幽灵文档"——一个在原始数据库里已经不存在的文档的内容。更危险的是,过期的错误信息会持续影响AI的回答。
增量更新的三大挑战详解
挑战1:新增文档的处理
新增看起来最简单,但也有坑。
新增的关键设计:幂等性。同一篇文档被重复提交(网络重试、消息重发等场景),系统不能重复入库。
@Service
public class DocumentIndexService {
@Autowired
private DocumentRepository docRepo;
@Autowired
private VectorStore vectorStore;
@Autowired
private EmbeddingService embeddingService;
/**
* 幂等的文档索引方法
* 同一个documentId+contentHash组合只会被索引一次
*/
public IndexResult indexDocument(String documentId, String content, Map<String, Object> metadata) {
// 1. 计算内容哈希
String contentHash = DigestUtils.sha256Hex(content);
// 2. 检查是否已经索引过同样的内容
Optional<DocumentIndex> existing = docRepo.findByDocumentIdAndContentHash(documentId, contentHash);
if (existing.isPresent()) {
log.info("Document {} with hash {} already indexed, skipping", documentId, contentHash);
return IndexResult.skipped(documentId);
}
// 3. 如果是新内容,先标记为"indexing"状态(防止并发重复处理)
DocumentIndex docIndex = new DocumentIndex();
docIndex.setDocumentId(documentId);
docIndex.setContentHash(contentHash);
docIndex.setStatus(IndexStatus.INDEXING);
docIndex.setCreatedAt(Instant.now());
try {
docRepo.save(docIndex);
} catch (DataIntegrityViolationException e) {
// 并发情况下另一个线程已经抢先插入了,跳过
log.info("Document {} indexing already in progress by another thread", documentId);
return IndexResult.skipped(documentId);
}
try {
// 4. 分块
List<TextChunk> chunks = chunkDocument(documentId, content, metadata);
// 5. 向量化(批量处理提高效率)
List<float[]> vectors = embeddingService.embedBatch(
chunks.stream().map(TextChunk::getText).collect(Collectors.toList())
);
// 6. 写入向量库
vectorStore.batchInsert(chunks, vectors);
// 7. 更新状态为成功
docIndex.setStatus(IndexStatus.INDEXED);
docIndex.setChunkCount(chunks.size());
docIndex.setIndexedAt(Instant.now());
docRepo.save(docIndex);
return IndexResult.success(documentId, chunks.size());
} catch (Exception e) {
// 8. 失败时更新状态,允许重试
docIndex.setStatus(IndexStatus.FAILED);
docIndex.setErrorMessage(e.getMessage());
docRepo.save(docIndex);
throw new IndexException("Failed to index document: " + documentId, e);
}
}
private List<TextChunk> chunkDocument(String documentId, String content, Map<String, Object> metadata) {
RecursiveCharacterSplitter splitter = RecursiveCharacterSplitter.builder()
.chunkSize(512)
.chunkOverlap(50)
.separators(List.of("\n\n", "\n", "。", "!", "?", ".", "!", "?", " "))
.build();
List<String> textChunks = splitter.split(content);
List<TextChunk> result = new ArrayList<>();
for (int i = 0; i < textChunks.size(); i++) {
TextChunk chunk = new TextChunk();
chunk.setId(documentId + "_chunk_" + i);
chunk.setDocumentId(documentId);
chunk.setChunkIndex(i);
chunk.setText(textChunks.get(i));
chunk.setMetadata(new HashMap<>(metadata));
chunk.getMetadata().put("chunk_index", i);
chunk.getMetadata().put("total_chunks", textChunks.size());
result.add(chunk);
}
return result;
}
}挑战2:修改文档的处理
修改是增量更新里最复杂的操作,因为它涉及"先删后增"的事务性问题。
关键问题:步骤H和步骤I之间,如果系统崩溃了怎么办?
解决方案是版本号机制 + 软删除:
@Service
public class DocumentUpdateService {
@Autowired
private VectorStore vectorStore;
@Autowired
private DocumentVersionRepo versionRepo;
@Transactional
public UpdateResult updateDocument(String documentId, String newContent, Map<String, Object> metadata) {
String newContentHash = DigestUtils.sha256Hex(newContent);
// 1. 获取当前版本
DocumentVersion currentVersion = versionRepo.findLatestByDocumentId(documentId).orElse(null);
if (currentVersion != null && currentVersion.getContentHash().equals(newContentHash)) {
log.info("Document {} content unchanged, skipping update", documentId);
return UpdateResult.unchanged(documentId);
}
// 2. 创建新版本记录(先写元数据,再操作向量库)
long newVersionNumber = currentVersion != null ? currentVersion.getVersionNumber() + 1 : 1;
DocumentVersion newVersion = new DocumentVersion();
newVersion.setDocumentId(documentId);
newVersion.setVersionNumber(newVersionNumber);
newVersion.setContentHash(newContentHash);
newVersion.setStatus(VersionStatus.PENDING);
newVersion.setCreatedAt(Instant.now());
versionRepo.save(newVersion);
try {
// 3. 生成并索引新chunks
List<TextChunk> newChunks = chunkDocument(documentId, newContent, metadata, newVersionNumber);
List<float[]> newVectors = embeddingService.embedBatch(
newChunks.stream().map(TextChunk::getText).collect(Collectors.toList())
);
// 4. 插入新版本的chunks(此时旧chunks还在)
vectorStore.batchInsert(newChunks, newVectors);
// 5. 将新版本标记为active
newVersion.setStatus(VersionStatus.ACTIVE);
newVersion.setChunkCount(newChunks.size());
versionRepo.save(newVersion);
// 6. 将旧版本标记为deprecated(软删除)
if (currentVersion != null) {
currentVersion.setStatus(VersionStatus.DEPRECATED);
currentVersion.setDeprecatedAt(Instant.now());
versionRepo.save(currentVersion);
// 7. 异步删除旧chunks
asyncChunkCleaner.scheduleCleanup(documentId, currentVersion.getVersionNumber());
}
return UpdateResult.success(documentId, newVersionNumber, newChunks.size());
} catch (Exception e) {
newVersion.setStatus(VersionStatus.FAILED);
versionRepo.save(newVersion);
throw new UpdateException("Failed to update document: " + documentId, e);
}
}
}这个设计的关键点:新版本激活之前,查询还走旧版本。即使更新过程中系统崩溃,用户查到的是旧内容,而不是损坏的内容。
挑战3:删除文档的处理
@Service
public class DocumentDeleteService {
@Autowired
private VectorStore vectorStore;
@Autowired
private DocumentVersionRepo versionRepo;
@Autowired
private DeleteAuditRepo deleteAuditRepo;
public DeleteResult deleteDocument(String documentId, String reason, String operatorId) {
List<DocumentVersion> versions = versionRepo.findAllByDocumentId(documentId);
if (versions.isEmpty()) {
log.warn("Attempting to delete non-existent document: {}", documentId);
return DeleteResult.notFound(documentId);
}
// 1. 记录删除审计日志
DeleteAudit audit = new DeleteAudit();
audit.setDocumentId(documentId);
audit.setReason(reason);
audit.setOperatorId(operatorId);
audit.setDeletedAt(Instant.now());
deleteAuditRepo.save(audit);
// 2. 标记所有版本为deleted状态(软删除)
versions.forEach(v -> {
v.setStatus(VersionStatus.DELETED);
v.setDeletedAt(Instant.now());
});
versionRepo.saveAll(versions);
// 3. 立即从向量库中硬删除(确保不会被检索到)
int deletedCount = vectorStore.deleteByFilter(
Filter.builder().eq("document_id", documentId).build()
);
log.info("Deleted {} chunks for document {}", deletedCount, documentId);
return DeleteResult.success(documentId, deletedCount);
}
/**
* 批量删除:按批次处理,避免一次性删除太多影响向量库性能
*/
public void batchDeleteDocuments(List<String> documentIds, String reason, String operatorId) {
int batchSize = 100;
List<List<String>> batches = Lists.partition(documentIds, batchSize);
for (List<String> batch : batches) {
try {
vectorStore.deleteByFilter(Filter.builder().in("document_id", batch).build());
versionRepo.markAsDeletedByDocumentIds(batch, Instant.now());
log.info("Batch deleted {} documents", batch.size());
Thread.sleep(100); // 批次间稍作停顿
} catch (Exception e) {
log.error("Failed to delete batch: {}", batch, e);
failedDeleteRepo.recordFailedBatch(batch, e.getMessage());
}
}
}
}方案1:基于CDC的增量同步
CDC(Change Data Capture)是一种监听数据库变更日志(如MySQL的binlog)来捕获数据变化的技术。
整体架构
pom.xml 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.laozhang.rag</groupId>
<artifactId>rag-incremental-sync</artifactId>
<version>1.0.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
</parent>
<properties>
<java.version>21</java.version>
<spring-ai.version>1.0.0</spring-ai.version>
<milvus.version>2.4.1</milvus.version>
<debezium.version>2.6.0.Final</debezium.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>${milvus.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.1-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>application.yml
spring:
application:
name: rag-incremental-sync
datasource:
url: jdbc:mysql://localhost:3306/knowledge_base?useSSL=false&serverTimezone=Asia/Shanghai
username: ${DB_USERNAME:root}
password: ${DB_PASSWORD:}
kafka:
bootstrap-servers: ${KAFKA_SERVERS:localhost:9092}
consumer:
group-id: rag-sync-consumer
auto-offset-reset: earliest
max-poll-records: 100
ai:
openai:
api-key: ${OPENAI_API_KEY}
embedding:
options:
model: text-embedding-3-small
data:
redis:
host: ${REDIS_HOST:localhost}
port: 6379
milvus:
host: ${MILVUS_HOST:localhost}
port: 19530
collection-name: knowledge_base_vectors
dimension: 1536
rag:
sync:
cdc:
mysql-host: ${CDC_MYSQL_HOST:localhost}
mysql-port: 3306
mysql-user: ${CDC_MYSQL_USER:debezium}
mysql-password: ${CDC_MYSQL_PASSWORD:}
database-name: knowledge_base
table-name: documents
embedding-batch-size: 50
concurrent-threads: 8CDC事件消费者
package com.laozhang.rag.cdc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* 监听Debezium产生的CDC事件,驱动向量库的增量更新
*/
@Slf4j
@Component
public class CdcEventConsumer {
private final DocumentIndexService indexService;
private final DocumentUpdateService updateService;
private final DocumentDeleteService deleteService;
private final ObjectMapper objectMapper;
public CdcEventConsumer(DocumentIndexService indexService,
DocumentUpdateService updateService,
DocumentDeleteService deleteService,
ObjectMapper objectMapper) {
this.indexService = indexService;
this.updateService = updateService;
this.deleteService = deleteService;
this.objectMapper = objectMapper;
}
@KafkaListener(
topics = "${rag.sync.cdc.kafka-topic:mysql-server.knowledge_base.documents}",
containerFactory = "cdcKafkaListenerContainerFactory"
)
public void handleCdcEvent(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {
try {
JsonNode event = objectMapper.readTree(message);
String operation = event.path("op").asText();
log.debug("Processing CDC event: op={}, partition={}, offset={}",
operation, partition, offset);
switch (operation) {
case "c" -> handleCreate(event.path("after"));
case "u" -> handleUpdate(event.path("before"), event.path("after"));
case "d" -> handleDelete(event.path("before"));
case "r" -> handleSnapshot(event.path("after"));
default -> log.warn("Unknown CDC operation: {}", operation);
}
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Failed to process CDC event at partition={}, offset={}", partition, offset, e);
throw new RuntimeException("CDC event processing failed", e);
}
}
private void handleCreate(JsonNode after) {
String documentId = after.path("id").asText();
String content = after.path("content").asText();
String title = after.path("title").asText();
String category = after.path("category").asText();
if (content == null || content.isBlank()) {
log.warn("Document {} has empty content, skipping", documentId);
return;
}
Map<String, Object> metadata = Map.of(
"title", title,
"category", category,
"source", "mysql_cdc",
"created_at", after.path("created_at").asText()
);
indexService.indexDocument(documentId, content, metadata);
log.info("Indexed new document: {}", documentId);
}
private void handleUpdate(JsonNode before, JsonNode after) {
String documentId = after.path("id").asText();
String beforeHash = before.path("content_hash").asText();
String afterHash = after.path("content_hash").asText();
if (beforeHash.equals(afterHash)) {
log.debug("Document {} metadata changed but content unchanged, skipping reindex", documentId);
return;
}
String newContent = after.path("content").asText();
Map<String, Object> metadata = Map.of(
"title", after.path("title").asText(),
"category", after.path("category").asText(),
"source", "mysql_cdc",
"updated_at", after.path("updated_at").asText()
);
updateService.updateDocument(documentId, newContent, metadata);
log.info("Updated document in vector store: {}", documentId);
}
private void handleDelete(JsonNode before) {
String documentId = before.path("id").asText();
deleteService.deleteDocument(documentId, "CDC_DELETE", "system");
log.info("Deleted document from vector store: {}", documentId);
}
private void handleSnapshot(JsonNode after) {
String documentId = after.path("id").asText();
String content = after.path("content").asText();
if (content != null && !content.isBlank()) {
Map<String, Object> metadata = Map.of(
"title", after.path("title").asText(),
"source", "mysql_cdc_snapshot"
);
indexService.indexDocument(documentId, content, metadata);
}
}
}方案2:基于消息队列的事件驱动更新
当文档来源多样(Web应用、移动端、爬虫、第三方系统等),消息队列方案更灵活。
事件定义
package com.laozhang.rag.event;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import java.time.Instant;
import java.util.Map;
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "eventType")
@JsonSubTypes({
@JsonSubTypes.Type(value = DocumentCreatedEvent.class, name = "CREATED"),
@JsonSubTypes.Type(value = DocumentUpdatedEvent.class, name = "UPDATED"),
@JsonSubTypes.Type(value = DocumentDeletedEvent.class, name = "DELETED")
})
public abstract class DocumentChangeEvent {
private String eventId;
private String documentId;
private String tenantId;
private Instant occurredAt;
private String operatorId;
}@Data
@EqualsAndHashCode(callSuper = true)
public class DocumentCreatedEvent extends DocumentChangeEvent {
private String title;
private String content;
private String category;
private Map<String, String> metadata;
}@Data
@EqualsAndHashCode(callSuper = true)
public class DocumentUpdatedEvent extends DocumentChangeEvent {
private String title;
private String newContent;
private String contentHash;
private Map<String, String> metadata;
}@Data
@EqualsAndHashCode(callSuper = true)
public class DocumentDeletedEvent extends DocumentChangeEvent {
private String reason;
}事件消费服务
package com.laozhang.rag.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class DocumentEventProcessor {
private final DocumentIndexService indexService;
private final DocumentUpdateService updateService;
private final DocumentDeleteService deleteService;
private final IdempotencyChecker idempotencyChecker;
@KafkaListener(
topics = "document-changes",
groupId = "rag-sync-group",
concurrency = "4"
)
public void processDocumentEvent(
DocumentChangeEvent event,
Acknowledgment acknowledgment) {
if (idempotencyChecker.isAlreadyProcessed(event.getEventId())) {
log.info("Event {} already processed, skipping", event.getEventId());
acknowledgment.acknowledge();
return;
}
try {
processEvent(event);
idempotencyChecker.markAsProcessed(event.getEventId());
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Failed to process event {}: {}", event.getEventId(), e.getMessage(), e);
throw e;
}
}
@Retryable(
retryFor = {TransientIndexException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
private void processEvent(DocumentChangeEvent event) {
switch (event) {
case DocumentCreatedEvent created -> {
indexService.indexDocument(
created.getDocumentId(),
created.getContent(),
buildMetadata(created)
);
}
case DocumentUpdatedEvent updated -> {
updateService.updateDocument(
updated.getDocumentId(),
updated.getNewContent(),
buildMetadata(updated)
);
}
case DocumentDeletedEvent deleted -> {
deleteService.deleteDocument(
deleted.getDocumentId(),
deleted.getReason(),
deleted.getOperatorId()
);
}
default -> log.warn("Unknown event type: {}", event.getClass().getSimpleName());
}
}
private Map<String, Object> buildMetadata(DocumentCreatedEvent event) {
Map<String, Object> metadata = new HashMap<>();
metadata.put("title", event.getTitle());
metadata.put("category", event.getCategory());
metadata.put("tenant_id", event.getTenantId());
metadata.put("created_at", event.getOccurredAt().toString());
if (event.getMetadata() != null) {
metadata.putAll(event.getMetadata());
}
return metadata;
}
}文档版本管理:追踪每一次变更
版本表设计
-- 文档版本表
CREATE TABLE document_versions (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
document_id VARCHAR(64) NOT NULL,
version_number INT NOT NULL,
content_hash VARCHAR(64) NOT NULL,
chunk_count INT NOT NULL DEFAULT 0,
status ENUM('PENDING', 'ACTIVE', 'DEPRECATED', 'DELETED', 'FAILED') NOT NULL,
created_at DATETIME(3) NOT NULL,
activated_at DATETIME(3),
deprecated_at DATETIME(3),
deleted_at DATETIME(3),
error_message TEXT,
UNIQUE KEY uk_doc_version (document_id, version_number),
INDEX idx_document_id (document_id),
INDEX idx_status (status),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 向量chunk追踪表
CREATE TABLE vector_chunks (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
chunk_id VARCHAR(128) NOT NULL UNIQUE,
document_id VARCHAR(64) NOT NULL,
version_number INT NOT NULL,
chunk_index INT NOT NULL,
content_hash VARCHAR(64) NOT NULL,
token_count INT,
created_at DATETIME(3) NOT NULL,
deleted_at DATETIME(3),
INDEX idx_document_version (document_id, version_number),
INDEX idx_document_id (document_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;版本管理服务
@Service
public class DocumentVersionService {
@Autowired
private DocumentVersionRepo versionRepo;
@Autowired
private VectorChunkRepo chunkRepo;
public List<VersionSummary> getVersionHistory(String documentId, int limit) {
return versionRepo.findByDocumentIdOrderByVersionNumberDesc(documentId,
PageRequest.of(0, limit))
.stream()
.map(v -> VersionSummary.builder()
.versionNumber(v.getVersionNumber())
.contentHash(v.getContentHash())
.status(v.getStatus())
.chunkCount(v.getChunkCount())
.createdAt(v.getCreatedAt())
.build())
.collect(Collectors.toList());
}
@Transactional
public RollbackResult rollbackToVersion(String documentId, int targetVersionNumber) {
DocumentVersion targetVersion = versionRepo.findByDocumentIdAndVersionNumber(
documentId, targetVersionNumber)
.orElseThrow(() -> new VersionNotFoundException(documentId, targetVersionNumber));
if (targetVersion.getStatus() == VersionStatus.DELETED) {
throw new IllegalStateException("Cannot rollback to a deleted version");
}
String originalContent = documentStorage.getContentByHash(targetVersion.getContentHash());
return updateService.updateDocument(documentId, originalContent, Map.of(
"rollback_from_version", targetVersionNumber,
"rollback_at", Instant.now().toString()
));
}
@Scheduled(cron = "0 2 * * * *")
public void cleanupOldVersions() {
Instant cutoffTime = Instant.now().minus(30, ChronoUnit.DAYS);
List<DocumentVersion> oldVersions = versionRepo.findDeprecatedVersionsBefore(cutoffTime);
for (DocumentVersion version : oldVersions) {
try {
long activeVersionCount = versionRepo.countActiveVersions(version.getDocumentId());
if (activeVersionCount <= 1) continue;
List<String> chunkIds = chunkRepo.findChunkIdsByDocumentVersion(
version.getDocumentId(), version.getVersionNumber());
vectorStore.deleteByIds(chunkIds);
chunkRepo.deleteByDocumentVersion(version.getDocumentId(), version.getVersionNumber());
version.setStatus(VersionStatus.CLEANED);
versionRepo.save(version);
} catch (Exception e) {
log.error("Failed to cleanup version {}/{}: {}",
version.getDocumentId(), version.getVersionNumber(), e.getMessage());
}
}
log.info("Cleaned up {} old document versions", oldVersions.size());
}
}向量一致性保障
@Service
public class VectorConsistencyChecker {
@Autowired
private DocumentVersionRepo versionRepo;
@Autowired
private VectorChunkRepo chunkRepo;
@Autowired
private VectorStore vectorStore;
@Data
public static class ConsistencyReport {
private int totalDocuments;
private int consistentDocuments;
private List<String> missingInVectorStore;
private List<String> orphanInVectorStore;
private Instant checkedAt;
}
public ConsistencyReport checkConsistency() {
ConsistencyReport report = new ConsistencyReport();
report.setCheckedAt(Instant.now());
List<String> activeDocumentIds = versionRepo.findAllActiveDocumentIds();
report.setTotalDocuments(activeDocumentIds.size());
List<String> missingInVectorStore = new ArrayList<>();
for (List<String> batch : Lists.partition(activeDocumentIds, 1000)) {
for (String docId : batch) {
List<VectorChunk> expectedChunks = chunkRepo.findActiveChunksByDocumentId(docId);
if (expectedChunks.isEmpty()) {
missingInVectorStore.add(docId);
continue;
}
List<String> expectedChunkIds = expectedChunks.stream()
.map(VectorChunk::getChunkId)
.collect(Collectors.toList());
List<String> actualChunkIds = vectorStore.existingIds(expectedChunkIds);
if (actualChunkIds.size() < expectedChunkIds.size()) {
log.warn("Document {} missing {} chunks in vector store",
docId, expectedChunkIds.size() - actualChunkIds.size());
missingInVectorStore.add(docId);
}
}
}
report.setMissingInVectorStore(missingInVectorStore);
report.setConsistentDocuments(activeDocumentIds.size() - missingInVectorStore.size());
return report;
}
public void repairInconsistentDocuments(List<String> documentIds) {
for (String documentId : documentIds) {
try {
log.info("Repairing inconsistent document: {}", documentId);
DocumentVersion activeVersion = versionRepo.findLatestActiveByDocumentId(documentId)
.orElseThrow();
String content = documentStorage.getContentByHash(activeVersion.getContentHash());
Map<String, Object> metadata = versionRepo.getMetadataByVersion(
documentId, activeVersion.getVersionNumber());
indexService.indexDocument(documentId, content, metadata);
log.info("Successfully repaired document: {}", documentId);
} catch (Exception e) {
log.error("Failed to repair document {}: {}", documentId, e.getMessage(), e);
}
}
}
}并发更新的冲突解决
@Service
public class ConflictResolver {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean tryAcquireLock(String documentId, Duration timeout) {
String lockKey = "doc_index_lock:" + documentId;
String lockValue = UUID.randomUUID().toString();
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, timeout);
return Boolean.TRUE.equals(acquired);
}
public void releaseLock(String documentId) {
redisTemplate.delete("doc_index_lock:" + documentId);
}
public UpdateResult updateWithLock(String documentId, String content,
Map<String, Object> metadata) {
if (!tryAcquireLock(documentId, Duration.ofSeconds(30))) {
log.info("Document {} is being processed, queuing update", documentId);
pendingUpdates.put(documentId, new PendingUpdate(documentId, content, metadata));
return UpdateResult.queued(documentId);
}
try {
return updateService.updateDocument(documentId, content, metadata);
} finally {
releaseLock(documentId);
PendingUpdate pending = pendingUpdates.remove(documentId);
if (pending != null) {
CompletableFuture.runAsync(() ->
updateWithLock(pending.getDocumentId(), pending.getContent(), pending.getMetadata())
);
}
}
}
public boolean shouldApplyUpdate(String documentId, Instant updateTimestamp) {
String key = "doc_last_update:" + documentId;
String existingTimestamp = redisTemplate.opsForValue().get(key);
if (existingTimestamp == null) {
redisTemplate.opsForValue().set(key, updateTimestamp.toString(), Duration.ofDays(7));
return true;
}
Instant existing = Instant.parse(existingTimestamp);
if (updateTimestamp.isAfter(existing)) {
redisTemplate.opsForValue().set(key, updateTimestamp.toString(), Duration.ofDays(7));
return true;
}
log.warn("Discarding stale update for document {}: update={}, existing={}",
documentId, updateTimestamp, existing);
return false;
}
}生产实战:5000文档/天的增量更新系统
系统监控指标
@Component
public class SyncMetrics {
private final MeterRegistry meterRegistry;
private final Counter documentsIndexed;
private final Counter documentsUpdated;
private final Counter documentsDeleted;
private final Counter indexErrors;
private final Timer indexDuration;
private final Gauge queueDepth;
public SyncMetrics(MeterRegistry meterRegistry, BackgroundIndexingService indexingService) {
this.meterRegistry = meterRegistry;
this.documentsIndexed = Counter.builder("rag.sync.documents.indexed")
.description("Total documents indexed")
.register(meterRegistry);
this.documentsUpdated = Counter.builder("rag.sync.documents.updated")
.description("Total documents updated")
.register(meterRegistry);
this.documentsDeleted = Counter.builder("rag.sync.documents.deleted")
.description("Total documents deleted")
.register(meterRegistry);
this.indexErrors = Counter.builder("rag.sync.errors")
.description("Total indexing errors")
.register(meterRegistry);
this.indexDuration = Timer.builder("rag.sync.duration")
.description("Time to index a document")
.percentilePrecision(2)
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
this.queueDepth = Gauge.builder("rag.sync.queue.depth",
indexingService, service -> service.getQueueSize())
.description("Current indexing queue depth")
.register(meterRegistry);
}
public void recordIndexed(long durationMs) {
documentsIndexed.increment();
indexDuration.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordError(String errorType) {
indexErrors.increment(Tags.of("error_type", errorType));
}
}关键性能数据(陈浩团队实测)
| 指标 | 全量重建(改造前) | 增量更新(改造后) |
|---|---|---|
| 最大同步延迟 | 6小时 | 30秒 |
| 日均处理文档数 | N/A(批量) | 5000+ |
| 单文档平均处理时间 | N/A | 1.2秒 |
| 向量库查询QPS | 200(重建期间受影响) | 350(无影响) |
| 系统可用性 | 99.2%(每天6小时重建) | 99.95% |
| 每月embedding费用 | $1,200(全量) | $280(增量) |
生产注意事项
1. 消息顺序保证
// 发送事件时指定key,确保同一文档的所有事件路由到同一分区
kafkaTemplate.send(new ProducerRecord<>(
"document-changes",
event.getDocumentId(), // key = documentId
event
));2. 大文档的处理
if (content.length() > 1_000_000) { // 1MB
log.info("Large document detected: {} bytes, processing asynchronously", content.length());
largeDocumentQueue.submit(documentId, content, metadata);
return IndexResult.queued(documentId);
}3. Embedding服务限流
使用令牌桶算法控制请求速率,并实现指数退避重试。
// 使用Guava RateLimiter限制嵌入请求速率
private final RateLimiter embeddingRateLimiter = RateLimiter.create(100.0); // 100个/秒
public List<float[]> embedBatchWithRateLimit(List<String> texts) {
embeddingRateLimiter.acquire(texts.size()); // 按文本数量获取令牌
return embeddingClient.embedBatch(texts);
}常见问题解答
Q1:CDC和消息队列方案该怎么选?
如果文档主要存在关系型数据库(MySQL/PostgreSQL),且需要100%捕获所有变更,用CDC。如果系统已有消息中间件,文档来源多样,用消息队列。两者不冲突,也可以组合使用。
Q2:增量更新的"增量"怎么判断?
主要通过内容哈希(SHA-256)比较。文档内容变化 → 哈希变化 → 触发重新索引。注意:只比较有效内容(去掉时间戳等无关字段)的哈希。
Q3:如果消息队列积压了怎么办?
一是扩容消费者(增加Kafka分区和Consumer实例),二是临时降级(只处理优先级高的文档),三是针对积压的文档做批量快速处理(降低embedding的精度换速度)。
Q4:软删除还是硬删除向量数据?
建议向量库层面立即硬删除(确保检索不到),元数据层面软删除(保留审计记录)。两者分开处理。
Q5:如何监控增量同步的健康状态?
关键指标:消息队列Lag(积压量)、平均同步延迟、错误率、一致性检查结果。建议设置告警:Lag超过1000条,同步延迟超过5分钟,错误率超过1%。
Q6:版本数据会不会把数据库撑爆?
设置合理的清理策略:保留最近30天 + 最近10个版本,超出的定时清理。实测每文档平均保留3个版本,1200万文档的版本表约3600万行,可接受。
总结
增量更新是千万级知识库走向生产可用的必经之路。
行动清单:
从6小时降到30秒,不是黑魔法,是工程细节的堆叠。每个设计决策都有其代价和收益,结合你的业务场景选择合适的方案。
