第1909篇:Spring AI的Embedding Pipeline——批量向量化与增量更新的工程设计
第1909篇:Spring AI的Embedding Pipeline——批量向量化与增量更新的工程设计
做 RAG 系统,向量化(Embedding)是绕不过去的基础能力。把一个 PDF 文件里的内容变成向量存进向量数据库,这个过程听起来简单,但工程化做好了是另一回事。
特别是当你面对的不是几个文件,而是几万份文档,而且这些文档还在持续更新……你需要的不是一个 demo 里的 ingestor.ingest(document) 调用,你需要的是一套完整的 Embedding Pipeline。
这篇文章就从工程角度讲这套东西:批量向量化的性能优化、增量更新的正确姿势、向量数据的生命周期管理,以及我在这块踩过的坑。
Embedding Pipeline 的整体架构
先建立一个完整的认知框架。一个生产级的 Embedding Pipeline 包含以下环节:
每个环节都有工程细节,来逐一拆开看。
文档加载:支持多数据源
Spring AI 提供了 DocumentReader 接口,支持多种数据源。生产环境往往需要支持多种格式:
@Component
public class MultiSourceDocumentLoader {
@Autowired
private OssClient ossClient;
@Autowired
private DocumentProcessorRegistry processorRegistry;
/**
* 根据文档类型选择对应的 Reader
*/
public List<Document> loadDocument(DocumentSource source) {
return switch (source.getType()) {
case PDF -> loadPdf(source);
case WORD -> loadWord(source);
case MARKDOWN -> loadMarkdown(source);
case HTML -> loadHtml(source);
case TXT -> loadText(source);
case JSON -> loadJson(source);
default -> throw new UnsupportedDocumentTypeException(source.getType().name());
};
}
private List<Document> loadPdf(DocumentSource source) {
try {
// 从 OSS 下载到临时文件
Path tempFile = downloadToTemp(source);
PagePdfDocumentReader reader = new PagePdfDocumentReader(
new FileSystemResource(tempFile),
PdfDocumentReaderConfig.builder()
.withPageTopMargin(0)
.withPageBottomMargin(0)
.withPageExtractedTextFormatter(
ExtractedTextFormatter.builder()
.withNumberOfTopTextLinesToDelete(0)
.build()
)
.withPagesPerDocument(1) // 每页一个 Document
.build()
);
List<Document> docs = reader.get();
// 注入来源元数据
docs.forEach(doc -> {
doc.getMetadata().put("source_url", source.getUrl());
doc.getMetadata().put("source_name", source.getName());
doc.getMetadata().put("document_type", "PDF");
doc.getMetadata().put("load_time", Instant.now().toString());
});
return docs;
} finally {
cleanupTemp(source);
}
}
private List<Document> loadMarkdown(DocumentSource source) {
String content = ossClient.getObjectContent(source.getUrl());
// Markdown 特殊处理:按标题分割
List<Document> docs = splitByHeadings(content, source);
return docs;
}
private List<Document> splitByHeadings(String markdown, DocumentSource source) {
List<Document> docs = new ArrayList<>();
String[] lines = markdown.split("\n");
StringBuilder currentSection = new StringBuilder();
String currentHeading = "Introduction";
int headingLevel = 0;
for (String line : lines) {
if (line.startsWith("# ")) {
saveSection(docs, currentSection, currentHeading, headingLevel, source);
currentHeading = line.substring(2).trim();
headingLevel = 1;
currentSection = new StringBuilder();
} else if (line.startsWith("## ")) {
saveSection(docs, currentSection, currentHeading, headingLevel, source);
currentHeading = line.substring(3).trim();
headingLevel = 2;
currentSection = new StringBuilder();
} else {
currentSection.append(line).append("\n");
}
}
saveSection(docs, currentSection, currentHeading, headingLevel, source);
return docs;
}
private void saveSection(List<Document> docs, StringBuilder content,
String heading, int level, DocumentSource source) {
String text = content.toString().trim();
if (text.isEmpty()) return;
Map<String, Object> metadata = new HashMap<>();
metadata.put("section_heading", heading);
metadata.put("heading_level", level);
metadata.put("source_name", source.getName());
docs.add(new Document(text, metadata));
}
}批量向量化:性能是关键
向量化是最耗时的步骤,特别是调用远程 Embedding API 时,批量处理和并发控制非常重要。
@Service
public class BatchEmbeddingService {
@Autowired
private EmbeddingModel embeddingModel;
@Autowired
private VectorStore vectorStore;
// Embedding API 的批量大小限制(不同模型不同,OpenAI 最大 2048)
@Value("${embedding.batch-size:100}")
private int batchSize;
// 并发请求数(避免触发 rate limit)
@Value("${embedding.concurrency:3}")
private int concurrency;
@Autowired
@Qualifier("embeddingExecutor")
private Executor embeddingExecutor;
/**
* 批量向量化并存储
* 返回处理结果统计
*/
public EmbeddingResult batchEmbed(List<Document> documents) {
log.info("开始批量向量化,文档数: {}", documents.size());
long startTime = System.currentTimeMillis();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
List<String> failedDocIds = Collections.synchronizedList(new ArrayList<>());
// 按批次划分
List<List<Document>> batches = partition(documents, batchSize);
log.info("划分为 {} 个批次,每批 {} 个文档", batches.size(), batchSize);
// 使用信号量控制并发,避免触发 API rate limit
Semaphore semaphore = new Semaphore(concurrency);
List<CompletableFuture<Void>> futures = batches.stream()
.map(batch -> CompletableFuture.runAsync(() -> {
try {
semaphore.acquire();
processBatch(batch, successCount, failCount, failedDocIds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Embedding 被中断", e);
} finally {
semaphore.release();
}
}, embeddingExecutor))
.collect(Collectors.toList());
// 等待所有批次完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
long duration = System.currentTimeMillis() - startTime;
EmbeddingResult result = EmbeddingResult.builder()
.totalDocuments(documents.size())
.successCount(successCount.get())
.failCount(failCount.get())
.failedDocIds(failedDocIds)
.durationMs(duration)
.throughput((double) successCount.get() / (duration / 1000.0))
.build();
log.info("批量向量化完成: 成功={}, 失败={}, 耗时={}s, 吞吐量={:.1f}文档/秒",
result.getSuccessCount(), result.getFailCount(),
duration / 1000, result.getThroughput());
return result;
}
private void processBatch(List<Document> batch,
AtomicInteger successCount,
AtomicInteger failCount,
List<String> failedDocIds) {
try {
// 带重试的向量存储
int retries = 0;
while (retries < 3) {
try {
vectorStore.add(batch);
successCount.addAndGet(batch.size());
return;
} catch (Exception e) {
retries++;
if (retries >= 3) {
throw e;
}
// 指数退避重试
long waitMs = (long) Math.pow(2, retries) * 1000;
log.warn("批次向量化失败,第 {} 次重试,等待 {}ms。原因: {}",
retries, waitMs, e.getMessage());
Thread.sleep(waitMs);
}
}
} catch (Exception e) {
log.error("批次处理失败,跳过本批次", e);
failCount.addAndGet(batch.size());
batch.forEach(doc -> failedDocIds.add(
(String) doc.getMetadata().getOrDefault("id", "unknown")));
}
}
private <T> List<List<T>> partition(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}增量更新:精准识别哪些文档需要重新索引
全量重建向量索引成本很高,增量更新是必须实现的功能。核心是能判断"文档有没有变化"。
@Entity
@Table(name = "document_index")
public class DocumentIndexRecord {
@Id
private String documentId;
private String sourceUrl;
private String contentHash; // 内容的 MD5/SHA256
private Long contentSize;
private Instant lastModifiedTime;
private Instant lastIndexedTime;
private IndexStatus status;
private Integer chunkCount;
private String errorMessage;
// 版本号,用于乐观锁
@Version
private Long version;
}
@Service
public class IncrementalUpdateService {
@Autowired
private DocumentIndexRepository indexRepo;
@Autowired
private MultiSourceDocumentLoader loader;
@Autowired
private DocumentSplitter splitter;
@Autowired
private BatchEmbeddingService embeddingService;
@Autowired
private VectorStore vectorStore;
/**
* 增量更新:只处理有变化的文档
*/
public IncrementalUpdateResult incrementalUpdate(List<DocumentSource> sources) {
int newCount = 0, updatedCount = 0, skippedCount = 0, deletedCount = 0;
// 获取当前所有已索引文档的 ID
Set<String> currentIndexedIds = indexRepo.findAllIds();
Set<String> processedIds = new HashSet<>();
for (DocumentSource source : sources) {
String docId = source.getId();
processedIds.add(docId);
DocumentIndexRecord existing = indexRepo.findById(docId).orElse(null);
String currentHash = computeContentHash(source);
if (existing == null) {
// 新文档,直接索引
log.info("发现新文档: {}", source.getName());
indexDocument(source, currentHash);
newCount++;
} else if (!existing.getContentHash().equals(currentHash)) {
// 文档有变化,删除旧向量,重新索引
log.info("文档内容变化,重新索引: {}", source.getName());
deleteOldVectors(docId);
indexDocument(source, currentHash);
updatedCount++;
} else {
// 内容未变化,跳过
log.debug("文档未变化,跳过: {}", source.getName());
skippedCount++;
}
}
// 删除已不存在的文档的向量
Set<String> deletedIds = new HashSet<>(currentIndexedIds);
deletedIds.removeAll(processedIds);
for (String deletedId : deletedIds) {
log.info("文档已删除,清理向量: {}", deletedId);
deleteOldVectors(deletedId);
indexRepo.deleteById(deletedId);
deletedCount++;
}
return IncrementalUpdateResult.builder()
.newDocuments(newCount)
.updatedDocuments(updatedCount)
.skippedDocuments(skippedCount)
.deletedDocuments(deletedCount)
.build();
}
private void indexDocument(DocumentSource source, String contentHash) {
try {
// 更新索引状态为"处理中"
indexRepo.save(DocumentIndexRecord.builder()
.documentId(source.getId())
.sourceUrl(source.getUrl())
.contentHash(contentHash)
.status(IndexStatus.INDEXING)
.lastModifiedTime(source.getLastModified())
.build());
// 加载文档
List<Document> rawDocs = loader.loadDocument(source);
// 分块
List<TextSegment> segments = new ArrayList<>();
for (Document doc : rawDocs) {
List<Document> chunks = splitter.split(doc);
// 给每个 chunk 加上文档 ID,方便后续删除
chunks.forEach(chunk -> {
chunk.getMetadata().put("document_id", source.getId());
chunk.getMetadata().put("document_name", source.getName());
});
segments.addAll(chunks.stream()
.map(c -> TextSegment.from(c.getContent(), Metadata.from(c.getMetadata())))
.collect(Collectors.toList()));
}
// 批量向量化存储
embeddingService.batchEmbed(segments.stream()
.map(s -> new Document(s.text(), s.metadata().toMap()))
.collect(Collectors.toList()));
// 更新索引状态为"已完成"
indexRepo.updateStatus(source.getId(), IndexStatus.INDEXED,
segments.size(), null);
} catch (Exception e) {
log.error("文档索引失败: {}", source.getName(), e);
indexRepo.updateStatus(source.getId(), IndexStatus.FAILED, 0, e.getMessage());
}
}
private void deleteOldVectors(String documentId) {
// 通过 metadata 过滤删除该文档的所有向量
// 注意:不是所有向量数据库都支持按 metadata 删除,需要检查你的 VectorStore 实现
vectorStore.delete(
Filter.builder()
.eq("document_id", documentId)
.build()
);
}
private String computeContentHash(DocumentSource source) {
// 结合内容和最后修改时间计算 hash
String hashInput = source.getUrl() + "|" + source.getLastModified();
if (source.getContentSnapshot() != null) {
hashInput += "|" + source.getContentSnapshot();
}
return DigestUtils.md5DigestAsHex(hashInput.getBytes(StandardCharsets.UTF_8));
}
}文档分块策略:不是越小越好
文档分块策略对 RAG 效果影响非常大,这块值得单独说几句。
@Configuration
public class SplitterConfig {
/**
* 通用分块策略:递归字符分割
* 适用于大多数文档类型
*/
@Bean("recursiveSplitter")
public DocumentSplitter recursiveSplitter() {
return DocumentSplitters.recursive(
512, // chunk 大小(token 数)
50, // overlap 大小(保留上下文的重叠部分)
new OpenAiTokenizer()
);
}
/**
* 代码文档的分块策略:按函数/类分割
*/
@Bean("codeSplitter")
public DocumentSplitter codeSplitter() {
// 代码按空行分割,保持代码块完整性
return new RegexDocumentSplitter(
"(?m)^\\s*$", // 空行分割
512, 0
);
}
/**
* 问答对文档的分块策略:每个问答对一个 chunk
*/
@Bean("qaSplitter")
public DocumentSplitter qaSplitter() {
return new SentenceBoundaryDocumentSplitter(
1024, // 最大 chunk 大小
0 // 不需要 overlap(每个问答对是完整的)
);
}
}我在实际项目里的经验:
- chunk 大小 512 token 是相对通用的配置,对大多数场景都还行
- overlap 要有,50-100 token,避免一个完整语义被切断后两半都检索不到
- 代码文档要特殊处理,按函数边界分割,而不是简单按字数截断
- 长文档不要全量 embedding,先按章节分割再 embedding,检索时先粗粒度定位章节再细粒度检索句子
定时任务与手动触发的统一调度
@Service
public class EmbeddingPipelineScheduler {
@Autowired
private IncrementalUpdateService updateService;
@Autowired
private DocumentSourceRepository sourceRepo;
@Autowired
private PipelineJobRepository jobRepo;
/**
* 定时增量更新(每天凌晨 2 点)
*/
@Scheduled(cron = "0 0 2 * * ?")
public void scheduledUpdate() {
log.info("开始定时增量更新 Embedding Pipeline");
runPipeline("SCHEDULED", null);
}
/**
* 手动触发(通过管理接口)
*/
public PipelineJob manualTrigger(String operatorId, List<String> sourceIds) {
log.info("手动触发 Embedding Pipeline, operator={}, sourceCount={}",
operatorId, sourceIds != null ? sourceIds.size() : "ALL");
return runPipeline("MANUAL", sourceIds);
}
private PipelineJob runPipeline(String triggerType, List<String> targetSourceIds) {
// 检查是否有正在运行的任务(避免并发执行)
if (jobRepo.hasRunningJob()) {
log.warn("已有 Embedding Pipeline 任务在运行,跳过本次触发");
return null;
}
PipelineJob job = PipelineJob.builder()
.jobId(UUID.randomUUID().toString())
.triggerType(triggerType)
.status(JobStatus.RUNNING)
.startTime(Instant.now())
.build();
jobRepo.save(job);
// 异步执行,不阻塞调用方
CompletableFuture.runAsync(() -> {
try {
List<DocumentSource> sources;
if (targetSourceIds != null && !targetSourceIds.isEmpty()) {
sources = sourceRepo.findByIds(targetSourceIds);
} else {
sources = sourceRepo.findAllActive();
}
IncrementalUpdateResult result = updateService.incrementalUpdate(sources);
job.setStatus(JobStatus.COMPLETED);
job.setResult(result);
job.setEndTime(Instant.now());
jobRepo.save(job);
log.info("Pipeline 完成: jobId={}, 新增={}, 更新={}, 跳过={}, 删除={}",
job.getJobId(), result.getNewDocuments(),
result.getUpdatedDocuments(), result.getSkippedDocuments(),
result.getDeletedDocuments());
} catch (Exception e) {
log.error("Pipeline 执行失败: jobId={}", job.getJobId(), e);
job.setStatus(JobStatus.FAILED);
job.setErrorMessage(e.getMessage());
job.setEndTime(Instant.now());
jobRepo.save(job);
}
});
return job;
}
}踩坑记录
坑1:向量数据库的删除操作不支持 metadata 过滤
在做增量更新时,需要按 document_id 删除旧的向量 chunks。但并不是所有向量数据库都支持按 metadata 条件删除。我们用的 Milvus,初始版本不支持这个操作,只能按 ID 删除。
解决方案:在索引时记录每个 chunk 在向量数据库里的 vector ID,更新时先按记录的 ID 删除,再重新索引。这需要维护一张 document_id -> [vector_ids] 的映射表。
坑2:Embedding API 的 token 限制
OpenAI 的 text-embedding-3-small 每次请求最多 8191 tokens,每个文本最多 8191 tokens。如果你的 chunk 比较大,或者一批里有一个特别长的 chunk,整个批次可能失败。
解决方案:在分批之前先过滤超长 chunks,对超长 chunks 做额外分割:
// 预处理:过滤/分割超长 chunk
private List<Document> preprocessForEmbedding(List<Document> docs) {
List<Document> processed = new ArrayList<>();
int maxTokens = 8000; // 留点余量
for (Document doc : docs) {
int tokenCount = tokenizer.estimateTokensInText(doc.getContent());
if (tokenCount <= maxTokens) {
processed.add(doc);
} else {
// 超长 chunk 强制再分割
log.warn("发现超长 chunk({} tokens),强制分割: {}",
tokenCount, doc.getContent().substring(0, 50));
List<Document> subChunks = emergencySplitter.split(doc);
processed.addAll(subChunks);
}
}
return processed;
}坑3:重复内容导致向量库膨胀
我们有一批文档来自不同渠道,但内容高度重复(比如政策文件被多个部门分别上传)。每次索引都存进去,向量库越来越大,检索时也会返回很多重复结果。
解决方案:在分块后、向量化之前,先做内容去重:
private List<Document> deduplicateChunks(List<Document> chunks) {
Map<String, Document> uniqueChunks = new LinkedHashMap<>();
for (Document chunk : chunks) {
String contentHash = DigestUtils.sha256Hex(chunk.getContent());
if (!uniqueChunks.containsKey(contentHash)) {
uniqueChunks.put(contentHash, chunk);
} else {
log.debug("跳过重复 chunk,hash={}", contentHash.substring(0, 8));
}
}
int removed = chunks.size() - uniqueChunks.size();
if (removed > 0) {
log.info("去重完成,移除了 {} 个重复 chunk", removed);
}
return new ArrayList<>(uniqueChunks.values());
}坑4:大量文档并发 Embedding 时的内存问题
一次性把几万个文档加载到内存,然后批量向量化,很容易 OOM。要改成流式处理:
// 流式处理大批量文档,避免全量加载到内存
public void streamEmbed(Stream<DocumentSource> sourceStream) {
sourceStream
.map(source -> loader.loadDocument(source))
.flatMap(Collection::stream)
.map(doc -> splitter.split(doc))
.flatMap(Collection::stream)
.collect(Collectors.groupingBy(
doc -> doc.getMetadata().get("batch_group"),
Collectors.toList()
))
.forEach((group, docs) -> embeddingService.batchEmbed(docs));
}小结
Embedding Pipeline 是 RAG 系统的"地基",基础打好了上层的检索效果才能有保障:
- 批量向量化用信号量控制并发,避免触发 API rate limit,批次失败要能重试
- 增量更新靠内容 hash 判断变化,避免全量重建的巨大开销
- 文档分块策略要根据内容类型选择,chunk 大小和 overlap 都影响检索质量
- 向量数据的生命周期管理:记录 vector ID 以便按需删除更新
- 超长 chunk 和重复内容要在流水线里提前处理
