第2083篇:LangChain4j的EmbeddingStoreIngestor——文档处理流水线的工程实践
大约 11 分钟
第2083篇:LangChain4j的EmbeddingStoreIngestor——文档处理流水线的工程实践
适读人群:正在构建RAG知识库的Java工程师 | 阅读时长:约19分钟 | 核心价值:掌握EmbeddingStoreIngestor的完整工作流,包括文档加载、转换、分块、向量化的pipeline设计和生产级优化
构建RAG知识库时,大多数人把精力放在检索和生成上,却低估了"文档入库"这个环节的复杂度。
我们内部有一句话:垃圾进,垃圾出——无论你的检索算法多精巧,如果文档入库时处理粗糙,结果一定差。EmbeddingStoreIngestor就是LangChain4j提供的文档入库流水线,这篇文章说说怎么把它用好。
EmbeddingStoreIngestor的核心设计
/**
* EmbeddingStoreIngestor:文档入库流水线
*
* 流程:Document → Transform → Split → Embed → Store
*/
EmbeddingStoreIngestor ingestor = EmbeddingStoreIngestor.builder()
// 1. 文档转换:原始文档预处理
.documentTransformer(document -> {
// 可以在这里做清洗、过滤等操作
return document;
})
// 2. 文档分割:把长文档切成chunk
.documentSplitter(DocumentSplitters.recursive(500, 50))
// 3. 文本转换:chunk级别的处理
.textSegmentTransformer(segment -> segment)
// 4. 嵌入模型:chunk → 向量
.embeddingModel(embeddingModel)
// 5. 向量存储
.embeddingStore(embeddingStore)
.build();
// 入库(支持单个或批量)
ingestor.ingest(document);
ingestor.ingest(documents);看起来很简单,但每个环节都有讲究。
文档加载:处理多种格式
/**
* 各种格式的文档加载器
* LangChain4j内置了很多,但生产环境往往需要定制
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentLoaderService {
/**
* 根据文件类型自动选择加载器
*/
public Document loadDocument(Path filePath) {
String fileName = filePath.getFileName().toString().toLowerCase();
if (fileName.endsWith(".pdf")) {
return loadPdf(filePath);
} else if (fileName.endsWith(".docx")) {
return loadDocx(filePath);
} else if (fileName.endsWith(".md")) {
return loadMarkdown(filePath);
} else if (fileName.endsWith(".txt") || fileName.endsWith(".log")) {
return loadText(filePath);
} else if (fileName.endsWith(".html") || fileName.endsWith(".htm")) {
return loadHtml(filePath);
} else {
throw new UnsupportedDocumentTypeException("不支持的文件类型: " + fileName);
}
}
private Document loadPdf(Path filePath) {
// LangChain4j内置Apache PDFBox解析器
Document doc = FileSystemDocumentLoader.loadDocument(filePath,
new ApachePdfBoxDocumentParser());
// PDF解析后可能有很多换行符和特殊字符,需要清理
String cleanedText = cleanPdfText(doc.text());
return Document.from(cleanedText, doc.metadata()
.add("sourceFile", filePath.toString())
.add("fileType", "pdf"));
}
private Document loadDocx(Path filePath) {
// Apache POI解析Word文档
Document doc = FileSystemDocumentLoader.loadDocument(filePath,
new MsOfficeDocumentParser(OfficeDocumentType.DOCX));
return Document.from(doc.text(), doc.metadata()
.add("sourceFile", filePath.toString())
.add("fileType", "docx"));
}
private Document loadMarkdown(Path filePath) {
// Markdown文档保留结构信息
try {
String content = Files.readString(filePath, StandardCharsets.UTF_8);
return Document.from(content, Metadata.from(Map.of(
"sourceFile", filePath.toString(),
"fileType", "markdown",
// 从文件名推断主题
"topic", extractTopicFromFileName(filePath.getFileName().toString())
)));
} catch (IOException e) {
throw new DocumentLoadException("加载Markdown失败: " + filePath, e);
}
}
private Document loadText(Path filePath) {
return FileSystemDocumentLoader.loadDocument(filePath,
new TextDocumentParser());
}
private Document loadHtml(Path filePath) {
// HTML需要去掉标签,只保留正文
Document doc = FileSystemDocumentLoader.loadDocument(filePath,
new HtmlDocumentParser());
return Document.from(doc.text(), doc.metadata()
.add("sourceFile", filePath.toString())
.add("fileType", "html"));
}
/**
* 清理PDF解析后的文本
* PDF解析常见问题:多余空白、断行、连字符等
*/
private String cleanPdfText(String rawText) {
return rawText
// 处理PDF中的连字符换行(如 "en-\nvironment" → "environment")
.replaceAll("-(\\s*\\n\\s*)", "")
// 多个连续空白行合并为一个
.replaceAll("(\\n\\s*){3,}", "\n\n")
// 去掉行首行尾的多余空白
.lines()
.map(String::strip)
.collect(Collectors.joining("\n"))
.trim();
}
private String extractTopicFromFileName(String fileName) {
return fileName.replaceAll("\\.[^.]+$", "") // 去掉扩展名
.replaceAll("[_\\-]", " ") // 下划线/横线替换为空格
.trim();
}
}文档转换器:入库前的清洗
/**
* 生产级文档转换器
* 多个转换步骤组合使用
*/
@Component
@Slf4j
public class ProductionDocumentTransformer implements DocumentTransformer {
// 质量过滤:太短或太长的文档可能有问题
private static final int MIN_DOCUMENT_LENGTH = 100;
private static final int MAX_DOCUMENT_LENGTH = 500_000; // 50万字符警告
@Override
public Document transform(Document document) {
String text = document.text();
// 1. 长度检查
if (text.length() < MIN_DOCUMENT_LENGTH) {
log.warn("文档过短({}字符),跳过: {}",
text.length(), document.metadata().getString("sourceFile"));
return null; // 返回null表示跳过这个文档
}
if (text.length() > MAX_DOCUMENT_LENGTH) {
log.warn("文档过长({}字符),可能影响处理质量: {}",
text.length(), document.metadata().getString("sourceFile"));
}
// 2. 文本清洗
String cleaned = cleanText(text);
// 3. 语言检测(可选)
String language = detectLanguage(cleaned);
// 4. 生成文档摘要元数据(用于后续过滤)
String docHash = generateDocumentHash(cleaned);
// 返回处理后的文档,保留原始元数据并追加新的
return Document.from(cleaned, document.metadata()
.add("language", language)
.add("docHash", docHash)
.add("originalLength", String.valueOf(text.length()))
.add("processedLength", String.valueOf(cleaned.length()))
.add("ingestTime", LocalDateTime.now().toString()));
}
/**
* 批量转换(可以在这里做去重)
*/
@Override
public List<Document> transformAll(List<Document> documents) {
// 先去重
Map<String, Document> deduped = new LinkedHashMap<>();
for (Document doc : documents) {
String hash = generateDocumentHash(doc.text());
deduped.putIfAbsent(hash, doc);
}
int dupCount = documents.size() - deduped.size();
if (dupCount > 0) {
log.info("发现并去除{}个重复文档", dupCount);
}
// 逐个转换(跳过null)
return deduped.values().stream()
.map(this::transform)
.filter(Objects::nonNull)
.toList();
}
private String cleanText(String text) {
return text
// 去掉零宽字符
.replaceAll("[\\x00-\\x08\\x0B\\x0C\\x0E-\\x1F\\x7F]", "")
// 统一换行符
.replace("\r\n", "\n")
.replace("\r", "\n")
// 合并多余空行
.replaceAll("\\n{3,}", "\n\n")
.trim();
}
private String detectLanguage(String text) {
// 简单的中英文检测:统计中文字符比例
long chineseChars = text.chars()
.filter(c -> c >= 0x4E00 && c <= 0x9FFF)
.count();
double ratio = (double) chineseChars / text.length();
return ratio > 0.1 ? "zh" : "en";
}
private String generateDocumentHash(String text) {
// 用内容哈希做去重(取前50字符+后50字符+长度)
String fingerprint = text.substring(0, Math.min(50, text.length())) +
text.substring(Math.max(0, text.length() - 50)) +
text.length();
return String.valueOf(fingerprint.hashCode());
}
}分割策略:这是最关键的环节
/**
* 针对不同文档类型的分割策略选择
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class AdaptiveDocumentSplitter {
/**
* 根据文档类型选择最适合的分割策略
*/
public DocumentSplitter selectSplitter(Document document) {
String fileType = document.metadata().getString("fileType");
String language = document.metadata().getString("language");
boolean isChinese = "zh".equals(language);
return switch (fileType != null ? fileType : "text") {
case "markdown" -> createMarkdownSplitter(isChinese);
case "pdf" -> createPdfSplitter(isChinese);
case "html" -> createHtmlSplitter(isChinese);
default -> createDefaultSplitter(isChinese);
};
}
/**
* Markdown分割器:按标题层次分割
* 保证每个chunk不跨越大标题
*/
private DocumentSplitter createMarkdownSplitter(boolean isChinese) {
// 中文文档的chunk大小可以稍小,因为中文信息密度更高
int chunkSize = isChinese ? 300 : 500;
int overlap = isChinese ? 50 : 100;
return DocumentSplitters.recursive(chunkSize, overlap,
new HierarchicalChunkingConfig()
.separators(List.of(
"\n# ", // H1
"\n## ", // H2
"\n### ", // H3
"\n\n", // 段落
"\n", // 行
" " // 词
))
);
}
/**
* PDF分割器:考虑PDF的特殊格式问题
* PDF解析后段落可能不完整,用较大的overlap保证连续性
*/
private DocumentSplitter createPdfSplitter(boolean isChinese) {
int chunkSize = isChinese ? 400 : 600;
int overlap = 100; // PDF用较大overlap,防止段落切割导致语义断裂
return DocumentSplitters.recursive(chunkSize, overlap);
}
private DocumentSplitter createHtmlSplitter(boolean isChinese) {
return DocumentSplitters.recursive(
isChinese ? 300 : 500,
isChinese ? 50 : 100
);
}
private DocumentSplitter createDefaultSplitter(boolean isChinese) {
return DocumentSplitters.recursive(
isChinese ? 300 : 500,
isChinese ? 50 : 100
);
}
}TextSegmentTransformer:chunk级别处理
/**
* chunk级别的转换器
* 在向量化之前对每个chunk做进一步处理
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class EnrichingTextSegmentTransformer implements TextSegmentTransformer {
private final ChatLanguageModel llm;
@Override
public TextSegment transform(TextSegment segment) {
String text = segment.text();
// 1. 过滤太短的chunk(可能是标题或噪声)
if (text.trim().length() < 30) {
log.debug("跳过过短的chunk: '{}'", text.substring(0, Math.min(30, text.length())));
return null;
}
// 2. 为chunk生成摘要关键词,提高召回率
// 注意:这会调用LLM,成本较高,只在高价值文档上用
boolean isHighValueDoc = "true".equals(
segment.metadata().getString("highValue"));
String enrichedText;
if (isHighValueDoc) {
enrichedText = enrichWithKeywords(text);
} else {
enrichedText = text;
}
// 3. 清理chunk头部的孤立标题
enrichedText = cleanOrphanedHeaders(enrichedText);
return TextSegment.from(enrichedText, segment.metadata());
}
/**
* 在chunk前面加上关键词摘要
* 提高检索时的语义匹配度
*
* 原理:嵌入向量是对整个chunk的语义压缩
* 在chunk前面加上明确的关键词,能让向量更精准地表达内容
*/
private String enrichWithKeywords(String text) {
if (text.length() < 100) return text; // 太短的不需要
try {
String prompt = String.format("""
从以下文本中提取3-5个核心关键词(用逗号分隔),不需要解释:
%s
关键词:""",
text.substring(0, Math.min(500, text.length())));
String keywords = llm.generate(prompt).trim();
// 把关键词加到chunk开头
return "【关键词:" + keywords + "】\n" + text;
} catch (Exception e) {
log.warn("关键词提取失败,使用原始文本: {}", e.getMessage());
return text;
}
}
/**
* 清理孤立的标题(chunk开头只有标题没有正文的情况)
*/
private String cleanOrphanedHeaders(String text) {
String[] lines = text.split("\n");
if (lines.length <= 2) return text;
// 如果chunk几乎全是标题行(以#开头),说明分割点不好
long headerLines = Arrays.stream(lines)
.filter(l -> l.startsWith("#"))
.count();
if (headerLines > lines.length * 0.5) {
log.debug("检测到标题密集的chunk,可能是分割问题");
}
return text;
}
}完整的入库流水线
/**
* 生产级入库流水线
* 集成文档加载、转换、分割、向量化的完整流程
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductionIngestPipeline {
private final DocumentLoaderService documentLoader;
private final ProductionDocumentTransformer documentTransformer;
private final AdaptiveDocumentSplitter splitterSelector;
private final EmbeddingModel embeddingModel;
private final EmbeddingStore<TextSegment> embeddingStore;
// 并发入库的线程池
private final ExecutorService ingestExecutor = Executors.newFixedThreadPool(4);
/**
* 入库单个文件
*/
public IngestResult ingestFile(Path filePath) {
long startTime = System.currentTimeMillis();
try {
// 1. 加载文档
Document rawDocument = documentLoader.loadDocument(filePath);
log.info("文档加载完成: {}, 长度={}", filePath.getFileName(),
rawDocument.text().length());
// 2. 转换(清洗)
Document cleanedDocument = documentTransformer.transform(rawDocument);
if (cleanedDocument == null) {
return IngestResult.skipped(filePath.toString(), "文档质量不达标");
}
// 3. 选择分割策略
DocumentSplitter splitter = splitterSelector.selectSplitter(cleanedDocument);
// 4. 构建Ingestor并执行
EmbeddingStoreIngestor ingestor = EmbeddingStoreIngestor.builder()
.documentSplitter(splitter)
.embeddingModel(embeddingModel)
.embeddingStore(embeddingStore)
.build();
ingestor.ingest(cleanedDocument);
long elapsed = System.currentTimeMillis() - startTime;
log.info("文档入库完成: {}, 耗时={}ms", filePath.getFileName(), elapsed);
return IngestResult.success(filePath.toString(), elapsed);
} catch (Exception e) {
log.error("文档入库失败: {}", filePath, e);
return IngestResult.failed(filePath.toString(), e.getMessage());
}
}
/**
* 批量入库目录下的所有文档
*/
public BatchIngestResult ingestDirectory(Path dirPath, boolean recursive) {
// 收集所有文件
List<Path> files;
try {
int maxDepth = recursive ? Integer.MAX_VALUE : 1;
files = Files.walk(dirPath, maxDepth)
.filter(Files::isRegularFile)
.filter(this::isSupportedFileType)
.sorted()
.toList();
} catch (IOException e) {
throw new IngestException("遍历目录失败: " + dirPath, e);
}
log.info("开始批量入库: 目录={}, 文件数={}", dirPath, files.size());
// 并发处理,收集结果
List<CompletableFuture<IngestResult>> futures = files.stream()
.map(file -> CompletableFuture.supplyAsync(
() -> ingestFile(file), ingestExecutor))
.toList();
List<IngestResult> results = futures.stream()
.map(CompletableFuture::join)
.toList();
// 统计结果
long successCount = results.stream().filter(r -> r.status() == IngestStatus.SUCCESS).count();
long skipCount = results.stream().filter(r -> r.status() == IngestStatus.SKIPPED).count();
long failCount = results.stream().filter(r -> r.status() == IngestStatus.FAILED).count();
log.info("批量入库完成: 总数={}, 成功={}, 跳过={}, 失败={}",
files.size(), successCount, skipCount, failCount);
return new BatchIngestResult(files.size(), successCount, skipCount, failCount, results);
}
private boolean isSupportedFileType(Path path) {
String name = path.getFileName().toString().toLowerCase();
return name.endsWith(".pdf") || name.endsWith(".docx") ||
name.endsWith(".md") || name.endsWith(".txt") ||
name.endsWith(".html");
}
public enum IngestStatus { SUCCESS, SKIPPED, FAILED }
public record IngestResult(
String filePath, IngestStatus status,
long elapsedMs, String message
) {
public static IngestResult success(String path, long elapsed) {
return new IngestResult(path, IngestStatus.SUCCESS, elapsed, "");
}
public static IngestResult skipped(String path, String reason) {
return new IngestResult(path, IngestStatus.SKIPPED, 0, reason);
}
public static IngestResult failed(String path, String error) {
return new IngestResult(path, IngestStatus.FAILED, 0, error);
}
}
public record BatchIngestResult(
long total, long success, long skipped, long failed,
List<IngestResult> details
) {}
}增量更新:处理文档变更
/**
* 增量更新策略
* 只处理新增和修改的文档,避免全量重建
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class IncrementalIngestService {
private final ProductionIngestPipeline pipeline;
private final EmbeddingStore<TextSegment> embeddingStore;
private final DocumentChangeTracker changeTracker;
/**
* 增量同步目录
* 对比上次同步状态,只处理有变化的文件
*/
public IncrementalSyncResult syncDirectory(Path dirPath) {
// 获取当前目录状态
Map<String, FileState> currentState = scanDirectory(dirPath);
// 获取上次同步状态
Map<String, FileState> lastState = changeTracker.getLastState(dirPath.toString());
List<Path> toAdd = new ArrayList<>();
List<Path> toUpdate = new ArrayList<>();
List<String> toDelete = new ArrayList<>();
// 找出新增和修改的文件
for (Map.Entry<String, FileState> entry : currentState.entrySet()) {
String filePath = entry.getKey();
FileState current = entry.getValue();
FileState last = lastState.get(filePath);
if (last == null) {
toAdd.add(Path.of(filePath));
} else if (!current.contentHash().equals(last.contentHash())) {
toUpdate.add(Path.of(filePath));
}
}
// 找出已删除的文件
for (String filePath : lastState.keySet()) {
if (!currentState.containsKey(filePath)) {
toDelete.add(filePath);
}
}
log.info("增量同步分析: 新增={}, 修改={}, 删除={}",
toAdd.size(), toUpdate.size(), toDelete.size());
// 处理删除
for (String deletedFile : toDelete) {
deleteDocumentFromStore(deletedFile);
}
// 处理更新(先删旧的,再入新的)
for (Path updatedFile : toUpdate) {
deleteDocumentFromStore(updatedFile.toString());
pipeline.ingestFile(updatedFile);
}
// 处理新增
for (Path newFile : toAdd) {
pipeline.ingestFile(newFile);
}
// 更新状态记录
changeTracker.saveState(dirPath.toString(), currentState);
return new IncrementalSyncResult(toAdd.size(), toUpdate.size(), toDelete.size());
}
/**
* 从向量存储中删除指定文件的所有chunk
* 通过元数据过滤找到所有属于这个文件的向量
*/
private void deleteDocumentFromStore(String sourceFile) {
// 通过元数据过滤找到所有属于这个文件的chunk ID
// 注意:不同VectorStore的删除API不同
// pgvector:直接SQL DELETE WHERE metadata->>'sourceFile' = ?
// Qdrant:delete by payload filter
// LangChain4j的统一接口:
// 需要先查出这些chunk的ID,再批量删除
// 这里是简化实现
log.info("删除文档的向量: {}", sourceFile);
// embeddingStore.delete(...)
}
private Map<String, FileState> scanDirectory(Path dirPath) {
Map<String, FileState> state = new HashMap<>();
try {
Files.walk(dirPath)
.filter(Files::isRegularFile)
.forEach(file -> {
try {
byte[] content = Files.readAllBytes(file);
String hash = DigestUtils.md5DigestAsHex(content);
long lastModified = Files.getLastModifiedTime(file).toMillis();
state.put(file.toString(), new FileState(hash, lastModified));
} catch (IOException e) {
log.warn("无法读取文件状态: {}", file);
}
});
} catch (IOException e) {
log.error("扫描目录失败: {}", dirPath, e);
}
return state;
}
public record FileState(String contentHash, long lastModifiedMs) {}
public record IncrementalSyncResult(int added, int updated, int deleted) {}
}嵌入质量监控
/**
* 入库质量监控
* 确保嵌入过程的数据质量
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class IngestQualityMonitor {
private final EmbeddingModel embeddingModel;
/**
* 抽样验证入库质量
* 对一批chunk执行测试查询,验证语义匹配度
*/
public QualityReport validateSample(
List<TextSegment> chunks,
List<String> testQueries,
EmbeddingStore<TextSegment> store) {
List<QueryQuality> queryQualities = new ArrayList<>();
for (String testQuery : testQueries) {
float[] queryEmbedding = embeddingModel.embed(testQuery);
List<EmbeddingMatch<TextSegment>> results = store.search(
EmbeddingSearchRequest.builder()
.queryEmbedding(Embedding.from(queryEmbedding))
.maxResults(3)
.build())
.matches();
if (results.isEmpty()) {
log.warn("测试查询无结果: {}", testQuery);
queryQualities.add(new QueryQuality(testQuery, 0, List.of()));
continue;
}
double topScore = results.get(0).score();
List<String> topResults = results.stream()
.map(m -> m.embedded().text().substring(0,
Math.min(80, m.embedded().text().length())))
.toList();
queryQualities.add(new QueryQuality(testQuery, topScore, topResults));
if (topScore < 0.6) {
log.warn("测试查询匹配度偏低: query='{}', topScore={:.3f}",
testQuery, topScore);
}
}
double avgTopScore = queryQualities.stream()
.mapToDouble(QueryQuality::topScore)
.average()
.orElse(0);
boolean passesQualityBar = avgTopScore >= 0.65;
if (!passesQualityBar) {
log.error("入库质量未达标!平均最高匹配度={:.3f},低于0.65阈值", avgTopScore);
}
return new QualityReport(queryQualities, avgTopScore, passesQualityBar);
}
public record QueryQuality(String query, double topScore, List<String> topResults) {}
public record QualityReport(List<QueryQuality> details, double avgTopScore, boolean passes) {}
}生产部署建议
关于并发控制
批量入库时并发数要谨慎。EmbeddingModel的API有频率限制,太高的并发会触发429错误。我们的经验是:并发4线程,每批500个chunk之间加100ms停顿,对OpenAI的embedding API来说比较稳定。
关于幂等性
文档入库应该是幂等的:同一个文档多次入库,结果应该一样。实现方式是用文档的内容哈希作为去重key,每次入库前检查是否已存在。
关于embedding模型切换
如果你后来换了embedding模型(即使是同一家的不同版本),所有历史向量都需要重新生成——因为不同模型的向量空间不同,用新模型的查询向量去匹配旧模型的存储向量,结果会很差。
这是生产环境里最大的迁移成本之一,选模型要谨慎,确定了就不要轻易换。
文档入库不是一次性的工作,是持续运行的系统。设计好增量更新、质量监控、版本管理,才能让知识库随着业务增长保持高质量。
