实时数据接入知识库——Kafka + 向量数据库的流式更新
实时数据接入知识库——Kafka + 向量数据库的流式更新
大多数 RAG 教程讲的是静态知识库:把一堆文档处理好,存进向量库,然后开始查询。这没问题,但现实业务里有很多场景的知识是实时变化的。
我见过最典型的几个:
- 产品文档更新了,新功能刚上线,用户问相关问题,AI 还在用旧答案
- 公司政策调整,HR 文档改了,但知识库还是三个月前的
- 竞品情报、市场价格每天都在变,靠定时任务全量更新太慢
这些场景都有一个共同特征:知识有生命周期,而且变化不可预测。
靠定时任务全量重建知识库是最简单的方案,但代价很高:1 万份文档每次全量重建至少要几个小时,期间知识是滞后的,而且计算成本高。
更合理的方案是:监听文档变更事件,触发增量更新。
这就是 Kafka + 向量数据库流式更新的场景。
知识库更新的几种模式
先理清楚,不是所有场景都需要实时更新。
静态知识库:文档基本不变,偶尔手动更新。适合:产品说明书、技术规范、历史档案。
周期批量更新:按天/按小时全量或增量更新。适合:新闻、报告、不太频繁变化的政策文档。
事件驱动实时更新:文档一旦变更,立即触发更新流水线。适合:产品参数、库存信息、实时政策、竞品信息。
今天说的是第三种。
实时更新的触发时机和处理方式
实时更新需要解决三个问题:
问题一:怎么知道文档变了
有几种方式:
- 业务系统主动发消息:文档管理系统在文档保存时主动发 Kafka 消息,最可靠
- 数据库变更监听(CDC):用 Debezium 监听数据库 binlog,自动捕获文档表的增删改
- 文件系统监听:监听某个目录的文件变动(适合简单场景)
- 定时轮询:不算真正的实时,但实现最简单
生产环境推荐方式 1 或方式 2,方式 1 侵入性强但可控,方式 2 零侵入但架构复杂一点。
问题二:更新的原子性
向量库的更新不是简单的覆盖。一份文档可能对应多个 chunk,每个 chunk 对应一个向量。文档更新时,要:
- 删除该文档的所有旧 chunk 向量
- 对新内容分块、embedding
- 插入新的 chunk 向量
这三步如果不是原子的,中间出错会导致知识库里存在文档的"部分旧版本 + 部分新版本",查询结果会混乱。
问题三:并发更新的冲突
同一份文档如果短时间内被多次修改(比如用户快速保存),多个更新消息同时进入处理流水线,可能导致乱序。
需要对同一文档的更新做幂等处理,或者按文档 ID 做顺序处理(Kafka 分区 key 用文档 ID)。
全链路设计:Kafka Consumer -> 文档处理 -> 向量写入
Spring Kafka + Spring AI 的实现
消息格式定义
@Data
public class DocumentChangeEvent {
private String messageId; // 消息唯一ID(用于幂等)
private String documentId; // 文档唯一ID
private ChangeType changeType; // 变更类型
private String content; // 新内容(DELETE时为null)
private String documentName;
private String documentType; // PDF, WORD, TEXT, HTML
private Map<String, String> metadata;
private LocalDateTime changeTime;
private String sourceSystem; // 来源系统
public enum ChangeType {
CREATE, // 新建文档
UPDATE, // 更新文档
DELETE // 删除文档
}
}Kafka Consumer 配置
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, DocumentChangeEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "knowledge-base-updater");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 手动提交offset,确保处理完成后再提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 每次最多拉取10条,避免一次性处理太多导致超时
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
// 处理超时时间:给文档处理流水线足够的时间
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5分钟
JsonDeserializer<DocumentChangeEvent> deserializer = new JsonDeserializer<>(DocumentChangeEvent.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DocumentChangeEvent>
kafkaListenerContainerFactory(ConsumerFactory<String, DocumentChangeEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, DocumentChangeEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 手动提交
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// 并发消费(3个线程,对应3个partition)
factory.setConcurrency(3);
// 错误处理:重试3次后发送到死信队列
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(5000L, 3L)
));
return factory;
}
}核心 Consumer
@Component
@Slf4j
public class DocumentChangeEventConsumer {
private final DocumentUpdateService updateService;
private final IdempotencyService idempotencyService;
public DocumentChangeEventConsumer(DocumentUpdateService updateService,
IdempotencyService idempotencyService) {
this.updateService = updateService;
this.idempotencyService = idempotencyService;
}
@KafkaListener(
topics = "${knowledge.kafka.topic.document-changes}",
containerFactory = "kafkaListenerContainerFactory",
groupId = "knowledge-base-updater"
)
public void handleDocumentChange(
@Payload DocumentChangeEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
log.info("收到文档变更事件: documentId={}, changeType={}, messageId={}",
event.getDocumentId(), event.getChangeType(), event.getMessageId());
try {
// 1. 幂等检查
if (idempotencyService.isProcessed(event.getMessageId())) {
log.info("消息 {} 已处理,跳过", event.getMessageId());
ack.acknowledge();
return;
}
// 2. 标记处理中(防止并发重复处理)
if (!idempotencyService.markProcessing(event.getMessageId(), event.getDocumentId())) {
log.warn("文档 {} 正在被另一个线程处理,稍后重试", event.getDocumentId());
// 不提交offset,让消息重新被消费
return;
}
// 3. 处理变更
UpdateResult result = updateService.processChange(event);
// 4. 记录处理完成
idempotencyService.markCompleted(event.getMessageId(), result);
// 5. 提交offset(处理成功才提交)
ack.acknowledge();
log.info("文档 {} 更新完成,影响chunk数: {}", event.getDocumentId(), result.getAffectedChunkCount());
} catch (Exception e) {
log.error("处理文档变更事件失败: documentId={}, error={}", event.getDocumentId(), e.getMessage(), e);
// 不提交offset,触发重试
// 达到最大重试次数后,DefaultErrorHandler会发送到死信队列
throw e;
}
}
}文档更新服务(核心逻辑)
@Service
@Slf4j
public class DocumentUpdateService {
private final VectorStore vectorStore;
private final DocumentProcessingService processingService;
private final DocumentMetadataRepository metadataRepository;
@Transactional
public UpdateResult processChange(DocumentChangeEvent event) {
return switch (event.getChangeType()) {
case CREATE -> handleCreate(event);
case UPDATE -> handleUpdate(event);
case DELETE -> handleDelete(event);
};
}
private UpdateResult handleCreate(DocumentChangeEvent event) {
log.info("处理文档新建: {}", event.getDocumentId());
// 处理文档,生成chunks
List<ProcessedDocument.DocumentChunk> chunks = processingService.process(
event.getContent(),
event.getDocumentType(),
buildMetadata(event)
);
// 存入向量库
List<Document> vectorDocs = chunks.stream()
.map(chunk -> buildVectorDocument(chunk, event))
.collect(Collectors.toList());
vectorStore.add(vectorDocs);
// 记录文档元数据
saveDocumentMetadata(event, chunks.size());
return UpdateResult.created(event.getDocumentId(), chunks.size());
}
private UpdateResult handleUpdate(DocumentChangeEvent event) {
log.info("处理文档更新: {}", event.getDocumentId());
// 步骤1:删除旧向量(这是原子性的关键)
int deletedCount = deleteDocumentVectors(event.getDocumentId());
log.info("删除旧向量 {} 条", deletedCount);
// 步骤2:处理新内容
List<ProcessedDocument.DocumentChunk> newChunks = processingService.process(
event.getContent(),
event.getDocumentType(),
buildMetadata(event)
);
// 步骤3:写入新向量
List<Document> vectorDocs = newChunks.stream()
.map(chunk -> buildVectorDocument(chunk, event))
.collect(Collectors.toList());
vectorStore.add(vectorDocs);
// 更新元数据
updateDocumentMetadata(event, newChunks.size());
return UpdateResult.updated(event.getDocumentId(), deletedCount, newChunks.size());
}
private UpdateResult handleDelete(DocumentChangeEvent event) {
log.info("处理文档删除: {}", event.getDocumentId());
int deletedCount = deleteDocumentVectors(event.getDocumentId());
deleteDocumentMetadata(event.getDocumentId());
return UpdateResult.deleted(event.getDocumentId(), deletedCount);
}
private int deleteDocumentVectors(String documentId) {
// 根据向量库实现的不同,删除方式会有差异
// 以 Milvus 为例:
// vectorStore.delete(List.of(Filter.eq("documentId", documentId)));
// 以 PgVector 为例(自定义删除):
// 需要通过 metadata 过滤删除
// 这里用一个通用的方式:先查再删
List<Document> existingDocs = vectorStore.similaritySearch(
SearchRequest.query("*")
.withTopK(1000) // 最多1000个chunk
.withFilterExpression("documentId == '" + documentId + "'")
);
if (!existingDocs.isEmpty()) {
List<String> idsToDelete = existingDocs.stream()
.map(Document::getId)
.collect(Collectors.toList());
vectorStore.delete(idsToDelete);
}
return existingDocs.size();
}
private Document buildVectorDocument(ProcessedDocument.DocumentChunk chunk, DocumentChangeEvent event) {
Map<String, Object> metadata = new HashMap<>(chunk.getMetadata());
metadata.put("documentId", event.getDocumentId());
metadata.put("documentName", event.getDocumentName());
metadata.put("sourceSystem", event.getSourceSystem());
metadata.put("lastUpdated", event.getChangeTime().toString());
metadata.put("chunkId", chunk.getChunkId());
return new Document(chunk.getContextualContent(), metadata);
}
private Map<String, String> buildMetadata(DocumentChangeEvent event) {
Map<String, String> metadata = new HashMap<>();
metadata.put("documentId", event.getDocumentId());
metadata.put("documentName", event.getDocumentName());
if (event.getMetadata() != null) {
metadata.putAll(event.getMetadata());
}
return metadata;
}
}幂等服务(防止重复处理)
@Service
public class IdempotencyService {
private final StringRedisTemplate redisTemplate;
private static final String PROCESSED_KEY_PREFIX = "doc:processed:";
private static final String PROCESSING_KEY_PREFIX = "doc:processing:";
private static final Duration PROCESSED_TTL = Duration.ofDays(7);
private static final Duration PROCESSING_TTL = Duration.ofMinutes(10);
public boolean isProcessed(String messageId) {
return Boolean.TRUE.equals(
redisTemplate.hasKey(PROCESSED_KEY_PREFIX + messageId)
);
}
public boolean markProcessing(String messageId, String documentId) {
// 用 SetNX 保证原子性:只有当key不存在时才设置成功
// 这样可以防止两个线程同时处理同一个文档
Boolean result = redisTemplate.opsForValue().setIfAbsent(
PROCESSING_KEY_PREFIX + documentId,
messageId,
PROCESSING_TTL
);
return Boolean.TRUE.equals(result);
}
public void markCompleted(String messageId, UpdateResult result) {
// 记录处理完成
redisTemplate.opsForValue().set(
PROCESSED_KEY_PREFIX + messageId,
result.toString(),
PROCESSED_TTL
);
// 清除处理中标记
redisTemplate.delete(PROCESSING_KEY_PREFIX + result.getDocumentId());
}
}业务系统发送变更事件
@Service
public class DocumentChangePublisher {
private final KafkaTemplate<String, DocumentChangeEvent> kafkaTemplate;
@Value("${knowledge.kafka.topic.document-changes}")
private String topic;
public void publishCreate(String documentId, String content, String documentName,
String documentType, Map<String, String> metadata) {
DocumentChangeEvent event = new DocumentChangeEvent();
event.setMessageId(UUID.randomUUID().toString());
event.setDocumentId(documentId);
event.setChangeType(DocumentChangeEvent.ChangeType.CREATE);
event.setContent(content);
event.setDocumentName(documentName);
event.setDocumentType(documentType);
event.setMetadata(metadata);
event.setChangeTime(LocalDateTime.now());
event.setSourceSystem("cms");
// 以documentId作为partition key,保证同一文档的消息顺序处理
kafkaTemplate.send(topic, documentId, event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("发送文档变更事件失败: documentId={}", documentId, ex);
// 降级:写入本地重试队列
saveToRetryQueue(event);
} else {
log.debug("文档变更事件已发送: documentId={}, partition={}, offset={}",
documentId, result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
public void publishUpdate(String documentId, String newContent, String documentName,
String documentType, Map<String, String> metadata) {
DocumentChangeEvent event = new DocumentChangeEvent();
event.setMessageId(UUID.randomUUID().toString());
event.setDocumentId(documentId);
event.setChangeType(DocumentChangeEvent.ChangeType.UPDATE);
event.setContent(newContent);
event.setDocumentName(documentName);
event.setDocumentType(documentType);
event.setMetadata(metadata);
event.setChangeTime(LocalDateTime.now());
event.setSourceSystem("cms");
kafkaTemplate.send(topic, documentId, event);
}
public void publishDelete(String documentId) {
DocumentChangeEvent event = new DocumentChangeEvent();
event.setMessageId(UUID.randomUUID().toString());
event.setDocumentId(documentId);
event.setChangeType(DocumentChangeEvent.ChangeType.DELETE);
event.setChangeTime(LocalDateTime.now());
event.setSourceSystem("cms");
kafkaTemplate.send(topic, documentId, event);
}
private void saveToRetryQueue(DocumentChangeEvent event) {
// 保存到数据库重试队列,由定时任务重新发送
// ...
}
}性能优化:批量处理和背压控制
当文档变更频率很高时(比如批量导入 1000 份文档),需要做批量处理和背压控制,避免把 Embedding API 和向量库打垮。
批量 Embedding(减少 API 调用次数)
@Service
public class BatchEmbeddingService {
private final EmbeddingModel embeddingModel;
private static final int BATCH_SIZE = 50; // 每批50个chunk
public List<float[]> batchEmbed(List<String> texts) {
List<float[]> allEmbeddings = new ArrayList<>();
// 分批处理
for (int i = 0; i < texts.size(); i += BATCH_SIZE) {
List<String> batch = texts.subList(i, Math.min(i + BATCH_SIZE, texts.size()));
// 限流:避免触发API的QPS限制
try {
EmbeddingResponse response = embeddingModel.embedForResponse(batch);
response.getResults().forEach(result ->
allEmbeddings.add(result.getOutput())
);
// 小批量之间加入延迟,避免触发限流
if (i + BATCH_SIZE < texts.size()) {
Thread.sleep(100);
}
} catch (Exception e) {
log.error("Embedding失败,批次 {}-{}", i, Math.min(i + BATCH_SIZE, texts.size()), e);
throw new EmbeddingException("批量Embedding失败", e);
}
}
return allEmbeddings;
}
}令牌桶限流
@Component
public class DocumentUpdateRateLimiter {
// 每秒最多处理5个文档更新(根据Embedding API限制设置)
private final RateLimiter rateLimiter = RateLimiter.create(5.0);
public void acquire() {
rateLimiter.acquire();
}
public boolean tryAcquire(long timeout, TimeUnit unit) {
return rateLimiter.tryAcquire(timeout, unit);
}
}监控:知道系统健不健康
实时更新系统必须有监控,否则出问题了根本不知道。
@Component
@Slf4j
public class KnowledgeBaseHealthMetrics {
private final MeterRegistry meterRegistry;
// 更新延迟(从事件产生到向量库写入完成的时间)
private final Timer updateLatencyTimer;
// 处理队列深度(积压的更新数量)
private final Gauge pendingUpdatesGauge;
// 成功/失败计数
private final Counter successCounter;
private final Counter failureCounter;
public KnowledgeBaseHealthMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.updateLatencyTimer = Timer.builder("knowledge.update.latency")
.description("文档更新延迟")
.register(meterRegistry);
this.successCounter = Counter.builder("knowledge.update.success")
.description("成功更新计数")
.register(meterRegistry);
this.failureCounter = Counter.builder("knowledge.update.failure")
.description("失败更新计数")
.register(meterRegistry);
this.pendingUpdatesGauge = Gauge.builder("knowledge.update.pending",
this, KnowledgeBaseHealthMetrics::getPendingCount)
.description("待处理更新数量")
.register(meterRegistry);
}
public void recordUpdateSuccess(Duration latency) {
updateLatencyTimer.record(latency);
successCounter.increment();
}
public void recordUpdateFailure() {
failureCounter.increment();
}
private double getPendingCount() {
// 从 Kafka consumer lag 获取
return 0; // 实际实现需要查 Kafka consumer lag
}
}一个坑:向量库的删除不是即时的
这是我在实际项目里踩到的坑。
我以为向量库的删除和写入是原子的,删完旧的再写新的,中间用户查询不会看到旧内容。
实际上并不是这样。以 Milvus 为例,删除操作是"软删除"(标记删除),不是立即物理删除。在 compact 操作执行之前,被标记删除的向量仍然可能出现在搜索结果里(取决于一致性级别配置)。
更坑的是:在删除和写入的中间窗口期,如果有查询进来,查询到的要么是旧内容(删除还没完成),要么是空(删除完成但新内容还没写入)。
解决方案一:版本号机制。新内容写入时带上新版本号,查询时用最新版本过滤。
// 写入时带版本号
metadata.put("version", event.getChangeTime().toEpochSecond(ZoneOffset.UTC));
// 查询时过滤最新版本
// 先查所有相关文档的最大版本号,再用该版本号过滤解决方案二:蓝绿切换。维护两份向量库(蓝/绿),更新时先在备用库写入,写入完成后切换指针,再删旧库的数据。复杂但彻底解决问题。
对于大多数场景,方案一够用了,中间窗口期通常只有几秒,业务可以接受。
总结
Kafka + 向量数据库的实时知识库更新,核心思路不复杂,但工程细节很多:
- 幂等性:用 Redis SetNX 保证同一消息不被重复处理
- 顺序性:Kafka partition key 用文档 ID,保证同一文档的消息顺序处理
- 原子性:删旧写新要尽量原子,中间窗口期用版本号机制兜底
- 限流:批量更新时要做限流,别把 Embedding API 打挂
- 监控:延迟、积压、成功率,三个指标必须有
实时知识库更新不是一件小事,但做好了,RAG 系统的时效性会有质的飞跃,用户体验完全不同。
