第1813篇:流式RAG——实时更新知识库而不重建整个索引
第1813篇:流式RAG——实时更新知识库而不重建整个索引
我们有个内部知识问答系统,基于RAG做的,用来回答员工问题。
刚上线那段时间用得还不错,但慢慢地问题来了:公司政策更新了,产品文档改了,知识库里的内容是旧的,系统给出的答案就开始出错。怎么更新知识库?最简单的办法是全量重建索引——把所有文档重新向量化,写入向量数据库。
这在小规模时没问题。但我们的知识库有20万+文档,全量重建一次要3个多小时,期间要么服务不可用,要么用户还在读旧数据。产品根本无法接受。
这就是流式RAG要解决的核心问题:知识库的增量更新,不停服,不重建。
为什么全量重建是个坑
全量重建的问题不只是慢,还有几个隐藏成本:
向量化费用:用OpenAI的text-embedding-3-small,每次全量重建20万文档大约花费12美元。每天更新一次就是360美元/月,只是索引成本。
索引不一致窗口:重建期间,新文档的向量还没建好,旧文档的向量可能已经被删除。这个窗口期内,查询结果是残缺的。
冷热切换复杂:为了避免不一致,很多团队会做双写——新索引建完了再切换。这需要维护两份数据,运维成本翻倍。
流式RAG的思路是:文档变更作为事件流进入系统,增量更新对应的向量,不影响其他文档。
整体架构
版本追踪是一个关键组件。每次更新,都记录文档的版本号和最后更新时间,这样RAG查询时可以知道当前知识库的"数据鲜度",对不确定性高的回答可以附加说明"该信息最后更新于XX时间"。
文档变更事件模型
// 文档变更事件
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DocumentChangeEvent {
private String eventId;
private String documentId;
private ChangeType changeType;
private String documentTitle;
private String rawContent; // 变更后的完整内容(DELETE时为null)
private String previousVersion; // 前一个版本的内容摘要(用于对比)
private String source; // 文档来源系统
private String category; // 文档分类
private Map<String, String> metadata;
private long changeTimestamp;
public enum ChangeType {
CREATE, // 新文档
UPDATE, // 内容更新
DELETE, // 文档删除
METADATA_ONLY // 只有元数据变化,内容不变
}
}
// 文档分块
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DocumentChunk {
private String chunkId; // documentId + "_" + chunkIndex
private String documentId;
private int chunkIndex;
private int totalChunks;
private String content;
private String title;
private Map<String, String> metadata;
private long documentVersion; // 用于版本管理
}
// 向量化后的分块
@Data
@Builder
public class EmbeddedChunk {
private DocumentChunk chunk;
private float[] embedding;
private String embeddingModel;
private long embeddedAt;
}文档变更处理流水线
@Service
@Slf4j
public class DocumentChangeProcessor {
private final TextSplitter textSplitter;
private final EmbeddingModel embeddingModel;
private final VectorStoreService vectorStoreService;
private final DocumentVersionTracker versionTracker;
// 文档切分策略:重叠滑动窗口
private static final int CHUNK_SIZE = 512; // 每块字符数
private static final int CHUNK_OVERLAP = 64; // 块间重叠
@KafkaListener(topics = "doc-change-events", groupId = "doc-processor")
public void processDocumentChange(DocumentChangeEvent event) {
log.info("Processing document change: {} type: {}",
event.getDocumentId(), event.getChangeType());
switch (event.getChangeType()) {
case CREATE, UPDATE -> handleCreateOrUpdate(event);
case DELETE -> handleDelete(event);
case METADATA_ONLY -> handleMetadataUpdate(event);
}
}
private void handleCreateOrUpdate(DocumentChangeEvent event) {
// Step 1: 切分文档
List<DocumentChunk> chunks = splitDocument(event);
// Step 2: 如果是UPDATE,先删除旧版本的所有chunks
if (event.getChangeType() == DocumentChangeEvent.ChangeType.UPDATE) {
vectorStoreService.deleteByDocumentId(event.getDocumentId());
log.info("Deleted old chunks for document: {}", event.getDocumentId());
}
// Step 3: 批量向量化(批量调用更经济)
List<EmbeddedChunk> embeddedChunks = embeddingInBatch(chunks);
// Step 4: 写入向量数据库
vectorStoreService.upsertBatch(embeddedChunks);
// Step 5: 更新版本追踪
versionTracker.updateVersion(event.getDocumentId(), event.getChangeTimestamp());
log.info("Successfully processed document: {}, chunks: {}",
event.getDocumentId(), chunks.size());
}
private void handleDelete(DocumentChangeEvent event) {
vectorStoreService.deleteByDocumentId(event.getDocumentId());
versionTracker.markDeleted(event.getDocumentId());
log.info("Deleted document: {}", event.getDocumentId());
}
private void handleMetadataUpdate(DocumentChangeEvent event) {
// 只更新元数据,不重新向量化
vectorStoreService.updateMetadata(event.getDocumentId(), event.getMetadata());
}
private List<DocumentChunk> splitDocument(DocumentChangeEvent event) {
String content = event.getRawContent();
List<DocumentChunk> chunks = new ArrayList<>();
// 先按段落分割,保持语义完整性
List<String> paragraphs = Arrays.asList(content.split("\n\n+"));
StringBuilder currentChunk = new StringBuilder();
int chunkIndex = 0;
for (String paragraph : paragraphs) {
// 如果当前块加上新段落超过限制,先保存当前块
if (currentChunk.length() + paragraph.length() > CHUNK_SIZE
&& currentChunk.length() > 0) {
chunks.add(buildChunk(event, chunkIndex++, currentChunk.toString()));
// 保留最后CHUNK_OVERLAP个字符作为重叠(保持上下文连贯)
String overlap = currentChunk.length() > CHUNK_OVERLAP
? currentChunk.substring(currentChunk.length() - CHUNK_OVERLAP)
: currentChunk.toString();
currentChunk = new StringBuilder(overlap);
}
currentChunk.append(paragraph).append("\n\n");
}
// 最后一块
if (currentChunk.length() > 0) {
chunks.add(buildChunk(event, chunkIndex, currentChunk.toString().trim()));
}
// 更新totalChunks
int total = chunks.size();
chunks.forEach(c -> c.setTotalChunks(total));
return chunks;
}
private DocumentChunk buildChunk(DocumentChangeEvent event, int index, String content) {
return DocumentChunk.builder()
.chunkId(event.getDocumentId() + "_" + index)
.documentId(event.getDocumentId())
.chunkIndex(index)
.content(content)
.title(event.getDocumentTitle())
.metadata(event.getMetadata())
.documentVersion(event.getChangeTimestamp())
.build();
}
private List<EmbeddedChunk> embeddingInBatch(List<DocumentChunk> chunks) {
// 批量向量化,减少API调用次数
int batchSize = 50;
List<EmbeddedChunk> results = new ArrayList<>();
for (int i = 0; i < chunks.size(); i += batchSize) {
List<DocumentChunk> batch = chunks.subList(i,
Math.min(i + batchSize, chunks.size()));
List<String> texts = batch.stream()
.map(c -> c.getTitle() + "\n" + c.getContent()) // 标题+内容一起向量化
.collect(Collectors.toList());
// 批量获取embedding
List<Embedding> embeddings = embeddingModel.embedAll(
texts.stream().map(TextSegment::from).collect(Collectors.toList())
).content();
for (int j = 0; j < batch.size(); j++) {
results.add(EmbeddedChunk.builder()
.chunk(batch.get(j))
.embedding(embeddings.get(j).vector())
.embeddingModel(embeddingModel.getClass().getSimpleName())
.embeddedAt(System.currentTimeMillis())
.build());
}
}
return results;
}
}向量数据库服务封装
这里以Milvus为例,但逻辑对其他向量库(Weaviate、Qdrant)也适用:
@Service
@Slf4j
public class VectorStoreService {
private final MilvusServiceClient milvusClient;
private static final String COLLECTION_NAME = "knowledge_base";
private static final int VECTOR_DIM = 1536; // text-embedding-3-small的维度
/**
* 批量upsert向量 - 流式RAG的核心操作
* Milvus原生支持upsert,相同ID的记录会被覆盖
*/
public void upsertBatch(List<EmbeddedChunk> chunks) {
if (chunks.isEmpty()) return;
List<String> chunkIds = new ArrayList<>();
List<String> documentIds = new ArrayList<>();
List<String> contents = new ArrayList<>();
List<String> titles = new ArrayList<>();
List<List<Float>> vectors = new ArrayList<>();
List<Long> versions = new ArrayList<>();
for (EmbeddedChunk embedded : chunks) {
DocumentChunk chunk = embedded.getChunk();
chunkIds.add(chunk.getChunkId());
documentIds.add(chunk.getDocumentId());
contents.add(chunk.getContent());
titles.add(chunk.getTitle() != null ? chunk.getTitle() : "");
// float[]转List<Float>
float[] emb = embedded.getEmbedding();
List<Float> embList = new ArrayList<>(emb.length);
for (float f : emb) embList.add(f);
vectors.add(embList);
versions.add(chunk.getDocumentVersion());
}
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withFields(Arrays.asList(
new InsertParam.Field("chunk_id", chunkIds),
new InsertParam.Field("document_id", documentIds),
new InsertParam.Field("content", contents),
new InsertParam.Field("title", titles),
new InsertParam.Field("embedding", vectors),
new InsertParam.Field("version", versions)
))
.build();
R<MutationResult> result = milvusClient.upsert(UpsertParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withFields(insertParam.getFields())
.build());
if (result.getStatus() != R.Status.Success.getCode()) {
throw new RuntimeException("Milvus upsert failed: " + result.getMessage());
}
log.info("Upserted {} chunks to Milvus", chunks.size());
}
/**
* 按documentId删除所有相关chunks
* UPDATE操作前必须先调这个
*/
public void deleteByDocumentId(String documentId) {
String expr = String.format("document_id == \"%s\"", documentId);
R<MutationResult> result = milvusClient.delete(DeleteParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withExpr(expr)
.build());
if (result.getStatus() != R.Status.Success.getCode()) {
log.warn("Failed to delete chunks for document: {}", documentId);
}
}
/**
* 语义检索 - RAG查询的核心
*/
public List<RetrievedChunk> search(float[] queryVector, int topK,
Map<String, Object> filters) {
SearchParam.Builder searchBuilder = SearchParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withMetricType(MetricType.IP) // 内积相似度
.withTopK(topK)
.withVectors(Collections.singletonList(toFloatList(queryVector)))
.withVectorFieldName("embedding")
.withOutFields(Arrays.asList("chunk_id", "document_id", "content", "title", "version"));
// 添加过滤条件(如只查某个分类的文档)
if (filters != null && !filters.isEmpty()) {
String filterExpr = buildFilterExpression(filters);
searchBuilder.withExpr(filterExpr);
}
R<SearchResults> results = milvusClient.search(searchBuilder.build());
if (results.getStatus() != R.Status.Success.getCode()) {
throw new RuntimeException("Milvus search failed: " + results.getMessage());
}
return parseSearchResults(results.getData());
}
private List<Float> toFloatList(float[] arr) {
List<Float> list = new ArrayList<>(arr.length);
for (float f : arr) list.add(f);
return list;
}
private String buildFilterExpression(Map<String, Object> filters) {
return filters.entrySet().stream()
.map(e -> String.format("%s == \"%s\"", e.getKey(), e.getValue()))
.collect(Collectors.joining(" && "));
}
private List<RetrievedChunk> parseSearchResults(SearchResults data) {
// 解析Milvus结果,省略细节
List<RetrievedChunk> chunks = new ArrayList<>();
// ... 实际解析逻辑
return chunks;
}
public void updateMetadata(String documentId, Map<String, String> metadata) {
// Milvus不支持直接更新字段,需要先查再删再插
// 对于只更新元数据的场景,这里用Redis缓存元数据覆盖,避免重新向量化
log.info("Metadata update for document: {} (handled by Redis overlay)", documentId);
}
}版本追踪与数据一致性
这是流式RAG里最容易被忽略的部分:
@Service
public class DocumentVersionTracker {
private final RedisTemplate<String, String> redisTemplate;
private static final String VERSION_KEY_PREFIX = "doc:version:";
private static final String DELETED_SET_KEY = "doc:deleted";
public void updateVersion(String documentId, long timestamp) {
redisTemplate.opsForValue().set(
VERSION_KEY_PREFIX + documentId,
String.valueOf(timestamp),
Duration.ofDays(90)
);
}
public void markDeleted(String documentId) {
redisTemplate.opsForSet().add(DELETED_SET_KEY, documentId);
redisTemplate.delete(VERSION_KEY_PREFIX + documentId);
}
public Optional<Long> getDocumentVersion(String documentId) {
String version = redisTemplate.opsForValue().get(VERSION_KEY_PREFIX + documentId);
return version != null ? Optional.of(Long.parseLong(version)) : Optional.empty();
}
public boolean isDeleted(String documentId) {
return Boolean.TRUE.equals(redisTemplate.opsForSet().isMember(DELETED_SET_KEY, documentId));
}
/**
* 获取知识库整体的数据鲜度
* 用于在回答中告知用户知识库最后更新时间
*/
public LocalDateTime getLastUpdateTime() {
String key = "doc:last_update";
String ts = redisTemplate.opsForValue().get(key);
if (ts == null) return null;
return LocalDateTime.ofEpochSecond(Long.parseLong(ts) / 1000, 0, ZoneOffset.UTC);
}
}支持流式更新的RAG查询服务
最后把查询链路也写完整:
@Service
@Slf4j
public class StreamingRagService {
private final EmbeddingModel embeddingModel;
private final VectorStoreService vectorStoreService;
private final ChatLanguageModel chatModel;
private final DocumentVersionTracker versionTracker;
public RagAnswer query(String question, QueryContext context) {
long startTime = System.currentTimeMillis();
// Step 1: 向量化问题
float[] questionEmbedding = embeddingModel.embed(question).content().vector();
// Step 2: 检索相关文档
List<RetrievedChunk> retrievedChunks = vectorStoreService.search(
questionEmbedding,
5, // top 5
context.getFilters()
);
// Step 3: 过滤已删除的文档(防止竞态条件:向量库删除有延迟)
retrievedChunks = retrievedChunks.stream()
.filter(chunk -> !versionTracker.isDeleted(chunk.getDocumentId()))
.collect(Collectors.toList());
if (retrievedChunks.isEmpty()) {
return RagAnswer.builder()
.answer("抱歉,我暂时没有找到相关信息。")
.sources(Collections.emptyList())
.confidence(0.0)
.build();
}
// Step 4: 构造上下文Prompt
String contextPrompt = buildContextPrompt(question, retrievedChunks);
// Step 5: 调用LLM生成回答
String answer = chatModel.generate(contextPrompt);
// Step 6: 组装来源信息
List<DocumentSource> sources = retrievedChunks.stream()
.map(chunk -> DocumentSource.builder()
.documentId(chunk.getDocumentId())
.title(chunk.getTitle())
.relevanceScore(chunk.getScore())
.lastUpdated(versionTracker.getDocumentVersion(chunk.getDocumentId())
.map(v -> LocalDateTime.ofEpochSecond(v/1000, 0, ZoneOffset.UTC))
.orElse(null))
.build())
.distinct()
.collect(Collectors.toList());
long elapsed = System.currentTimeMillis() - startTime;
log.info("RAG query completed in {}ms, retrieved {} chunks", elapsed, retrievedChunks.size());
return RagAnswer.builder()
.answer(answer)
.sources(sources)
.confidence(calculateConfidence(retrievedChunks))
.queryTimeMs(elapsed)
.knowledgeBaseUpdatedAt(versionTracker.getLastUpdateTime())
.build();
}
private String buildContextPrompt(String question, List<RetrievedChunk> chunks) {
StringBuilder sb = new StringBuilder();
sb.append("根据以下知识库内容回答问题。如果知识库中没有相关信息,请明确说明。\n\n");
sb.append("## 知识库相关内容\n\n");
for (int i = 0; i < chunks.size(); i++) {
RetrievedChunk chunk = chunks.get(i);
sb.append(String.format("【文档%d】%s\n", i+1, chunk.getTitle()));
sb.append(chunk.getContent()).append("\n\n");
}
sb.append("## 用户问题\n");
sb.append(question).append("\n\n");
sb.append("请根据以上知识库内容,给出准确、简洁的回答:");
return sb.toString();
}
private double calculateConfidence(List<RetrievedChunk> chunks) {
if (chunks.isEmpty()) return 0.0;
return chunks.stream()
.mapToDouble(RetrievedChunk::getScore)
.average()
.orElse(0.0);
}
}处理UPDATE操作的竞态问题
这是流式RAG里最难处理的问题之一,我专门讲一下。
场景:文档A更新了。处理流程是:先删除旧向量 → 再插入新向量。但在删除和插入之间的时间窗口里,用户发起了查询,就会得到一个"空洞"——文档A的内容消失了。
方案一:先插入后删除(Swap模式)
不先删除旧版本,而是先把新版本的chunks插入(用带版本号的新ID),插入成功后再删除旧版本。这样查询请求会看到新旧版本共存,但不会看到空洞。
代价是需要在ID上带版本号,而且短暂的新旧数据共存可能导致重复检索。可以通过在查询时按版本号去重来解决。
方案二:软删除+版本过滤
不真正删除,而是给旧版本的chunks打上deleted=true的标记,在查询时过滤掉。删除和插入操作变成:打标记 → 插入新版本 → 异步清理旧版本。
代价是数据量增长,需要定期清理。但对于小规模系统,这是最简单可靠的方案。
我们最终选择了方案二,因为实现简单,运维成本低。
监控:流式更新延迟
上线后,有个指标我每天必看:文档变更到可检索的延迟(P95)。这个指标反映了流式更新的实际效果。
我们目前的数字:文档变更事件到Kafka的延迟约30ms(CDC捕获),Kafka到向量库写入完成约2-5秒(取决于文档大小和embedding API延迟),合计P95在8秒以内。
跟之前3小时的全量重建比,这已经是质的飞跃了。
