文档预处理流水线——清洗、标准化、增强的完整链路
文档预处理流水线——清洗、标准化、增强的完整链路
很多人说 RAG 的核心是向量检索,这话没错,但有一个前提没人提:向量检索的质量,高度依赖被向量化的文本质量。
你把一段乱七八糟的文本塞进 embedding 模型,得到的向量也是乱的。乱的向量放进向量库,检索出来的就是不相关的内容。
我自己踩过一个坑:接了一个客户的项目,文档全是从各种系统导出的,有 PDF、有 Word、有从网页抓下来的 HTML。我直接用 LangChain 的 loader 把内容提取出来,分块、embedding、存向量库,上线测试,效果烂得一塌糊涂。
后来逐条检查才发现:很多 chunk 里面夹着页眉页脚的内容("第 3 页,共 12 页"、"机密文件,仅供内部使用"),有的 chunk 是一张表格被硬塞成一行文本,行列关系全没了,有的段落里有好几个相同的标题因为 PDF 分栏被拼在一起。
那次之后我就认认真真地把文档预处理当成一个独立工程问题来对待。
为什么说预处理是让文档"更适合被 AI 理解"
这是一个角度问题。
很多人把文档预处理理解成"文本提取",就是把 PDF、Word 里的文字读出来。这是必要条件,但远远不够。
真正的目标是:让每一个被向量化的文本片段,都能让 LLM 在没有其他上下文的情况下正确理解其含义。
这句话拆开来看有几层含义:
1. 完整性:每个 chunk 要能独立成意。"详见上节" 这种表述在 chunk 化后就成了废话,因为"上节"可能不在同一个 chunk 里。
2. 准确性:格式转换不能引入噪音。表格、代码块、有序列表,这些有结构的内容,要以 LLM 能理解的形式保留,而不是被拍平成乱文本。
3. 语义增强:原始文档里有些信息是"隐含"的。比如一份产品手册,正文里提到"此功能",但没说是哪个功能,因为读者在翻书的时候看得到章节标题。chunk 化后这个上下文就丢了。增强处理要把这些隐含信息显式化。
完整处理步骤详解
我把整个流水线分成四个阶段:格式清洗、章节识别、标题层级恢复、关键信息标注。
阶段一:格式清洗
格式清洗的目标是去除噪音,保留有效内容。
需要清理的内容:
- 页眉页脚(页码、文档标题重复、机密标记等)
- 水印文字("机密"、"草稿"等文字水印被识别为正文)
- 装饰性字符(过多的分隔线、重复的符号)
- 无意义的空白字符(多余的空行、制表符混乱)
- 链接和图片的 alt 文字(如果与上下文无关)
需要保留并转换的内容:
- 表格:转成 Markdown 表格格式,保留行列关系
- 代码块:保留代码,加上语言标记
- 有序列表:保持有序列表格式
- 公式:尽量保留,至少保留文字描述
阶段二:章节识别
这一步要把文档的层级结构识别出来,建立"章节树"。
为什么重要:知道每个段落属于哪个章节,在后续生成 chunk 的时候可以把章节标题作为上下文注入,大幅提升 chunk 的语义自给能力。
阶段三:标题层级恢复
PDF 转文本后,标题的层级信息通常丢失。需要根据字体大小、位置、内容模式来重建标题层级。
阶段四:关键信息标注
这是最容易被忽视的一步,也是增益最大的一步。
对文档中的关键信息(定义、警告、重要结论)进行显式标注,方便后续 RAG 时优先召回这些高信息密度的内容。
基于 Spring Batch 的文档处理流水线
为什么选 Spring Batch?因为文档处理天然是批处理场景:
- 大量文档需要处理,需要分批并发
- 处理失败需要重试和跳过
- 需要记录每个文档的处理状态
- 需要支持重新处理(文档更新后重跑)
Spring Batch 对这些场景的支持非常成熟。
依赖配置
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<!-- PDF处理 -->
<dependency>
<groupId>org.apache.pdfbox</groupId>
<artifactId>pdfbox</artifactId>
<version>3.0.1</version>
</dependency>
<!-- Word处理 -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.2.5</version>
</dependency>
<!-- HTML处理 -->
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.17.2</version>
</dependency>
</dependencies>核心数据模型
@Data
@Entity
@Table(name = "document_processing_record")
public class DocumentProcessingRecord {
@Id
private String documentId;
private String originalPath;
private String documentType; // PDF, WORD, HTML, TXT
@Enumerated(EnumType.STRING)
private ProcessingStatus status;
private String processedContent; // 处理后的内容
private String structureJson; // 章节结构 JSON
private LocalDateTime processedAt;
private String errorMessage;
private int retryCount;
// 处理质量指标
private double qualityScore;
private int chunkCount;
public enum ProcessingStatus {
PENDING, PROCESSING, COMPLETED, FAILED, SKIPPED
}
}
@Data
public class ProcessedDocument {
private String documentId;
private String title;
private String cleanedContent;
private List<DocumentSection> sections;
private List<DocumentChunk> chunks;
private Map<String, String> metadata;
@Data
public static class DocumentSection {
private String sectionId;
private String title;
private int level; // 1=一级标题, 2=二级标题
private String content;
private String parentSectionId;
private List<String> childSectionIds;
}
@Data
public static class DocumentChunk {
private String chunkId;
private String content;
private String contextualContent; // 包含上下文增强的内容
private String sectionPath; // 所属章节路径,如 "产品概述 > 功能说明 > 搜索功能"
private Map<String, Object> metadata;
private int startChar;
private int endChar;
}
}Job 配置
@Configuration
@EnableBatchProcessing
public class DocumentProcessingJobConfig {
@Bean
public Job documentProcessingJob(
JobRepository jobRepository,
Step extractStep,
Step cleanStep,
Step structureStep,
Step chunkStep,
Step vectorizeStep) {
return new JobBuilder("documentProcessingJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(extractStep)
.next(cleanStep)
.next(structureStep)
.next(chunkStep)
.next(vectorizeStep)
.build();
}
@Bean
public Step extractStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
DocumentExtractItemReader reader,
DocumentExtractItemProcessor processor,
DocumentExtractItemWriter writer) {
return new StepBuilder("extractStep", jobRepository)
.<DocumentProcessingRecord, DocumentProcessingRecord>chunk(10, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.retryLimit(3)
.retry(IOException.class)
.skipLimit(100)
.skip(DocumentProcessingException.class)
.build();
}
// 其他 Step 配置类似...
}文本提取处理器(支持多种格式)
@Component
public class DocumentExtractProcessor implements ItemProcessor<DocumentProcessingRecord, DocumentProcessingRecord> {
private final Map<String, DocumentExtractor> extractors;
public DocumentExtractProcessor(
PdfDocumentExtractor pdfExtractor,
WordDocumentExtractor wordExtractor,
HtmlDocumentExtractor htmlExtractor) {
this.extractors = Map.of(
"PDF", pdfExtractor,
"WORD", wordExtractor,
"HTML", htmlExtractor
);
}
@Override
public DocumentProcessingRecord process(DocumentProcessingRecord record) throws Exception {
DocumentExtractor extractor = extractors.get(record.getDocumentType());
if (extractor == null) {
record.setStatus(DocumentProcessingRecord.ProcessingStatus.SKIPPED);
record.setErrorMessage("不支持的文档类型: " + record.getDocumentType());
return record;
}
String rawContent = extractor.extract(record.getOriginalPath());
record.setProcessedContent(rawContent);
record.setStatus(DocumentProcessingRecord.ProcessingStatus.PROCESSING);
return record;
}
}
@Component
public class PdfDocumentExtractor implements DocumentExtractor {
@Override
public String extract(String filePath) throws IOException {
try (PDDocument document = Loader.loadPDF(new File(filePath))) {
PDFTextStripper stripper = new PDFTextStripper();
// 保留段落结构
stripper.setParagraphStart("<PARA>");
stripper.setParagraphEnd("</PARA>");
return stripper.getText(document);
}
}
}
@Component
public class WordDocumentExtractor implements DocumentExtractor {
@Override
public String extract(String filePath) throws IOException {
try (XWPFDocument document = new XWPFDocument(new FileInputStream(filePath))) {
StringBuilder sb = new StringBuilder();
for (IBodyElement element : document.getBodyElements()) {
if (element instanceof XWPFParagraph) {
XWPFParagraph para = (XWPFParagraph) element;
String style = para.getStyle();
String text = para.getText();
if (text.trim().isEmpty()) continue;
// 根据样式判断是否为标题
if (style != null && style.startsWith("Heading")) {
int level = Integer.parseInt(style.replace("Heading", "").trim());
sb.append("#".repeat(level)).append(" ").append(text).append("\n\n");
} else {
sb.append(text).append("\n\n");
}
} else if (element instanceof XWPFTable) {
// 表格转Markdown
sb.append(convertTableToMarkdown((XWPFTable) element));
sb.append("\n\n");
}
}
return sb.toString();
}
}
private String convertTableToMarkdown(XWPFTable table) {
StringBuilder sb = new StringBuilder();
List<XWPFTableRow> rows = table.getRows();
if (rows.isEmpty()) return "";
// 第一行作为表头
XWPFTableRow headerRow = rows.get(0);
sb.append("| ");
headerRow.getTableCells().forEach(cell ->
sb.append(cell.getText().trim()).append(" | ")
);
sb.append("\n|");
headerRow.getTableCells().forEach(cell -> sb.append("---|"));
sb.append("\n");
// 数据行
for (int i = 1; i < rows.size(); i++) {
sb.append("| ");
rows.get(i).getTableCells().forEach(cell ->
sb.append(cell.getText().trim()).append(" | ")
);
sb.append("\n");
}
return sb.toString();
}
}清洗处理器(去噪)
@Component
public class ContentCleanProcessor implements ItemProcessor<DocumentProcessingRecord, DocumentProcessingRecord> {
// 页眉页脚常见模式
private static final List<Pattern> HEADER_FOOTER_PATTERNS = Arrays.asList(
Pattern.compile("第\\s*\\d+\\s*页[,,]?\\s*共\\s*\\d+\\s*页"),
Pattern.compile("Page\\s+\\d+\\s+of\\s+\\d+", Pattern.CASE_INSENSITIVE),
Pattern.compile("^(机密|保密|内部资料|Confidential)\\s*$", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE),
Pattern.compile("\\d{4}[-/]\\d{1,2}[-/]\\d{1,2}\\s+版权所有"),
Pattern.compile("Copyright\\s*©?\\s*\\d{4}", Pattern.CASE_INSENSITIVE)
);
// 连续重复字符(装饰性分隔线)
private static final Pattern DECORATIVE_PATTERN = Pattern.compile("[=\\-_\\*~]{5,}");
@Override
public DocumentProcessingRecord process(DocumentProcessingRecord record) {
String content = record.getProcessedContent();
// 1. 清理页眉页脚
for (Pattern pattern : HEADER_FOOTER_PATTERNS) {
content = pattern.matcher(content).replaceAll("");
}
// 2. 清理装饰性字符
content = DECORATIVE_PATTERN.matcher(content).replaceAll("\n---\n");
// 3. 清理多余空白
content = content.replaceAll("\n{3,}", "\n\n"); // 最多保留两个换行
content = content.replaceAll("[ \t]+\n", "\n"); // 行尾空白
content = content.replaceAll("\n[ \t]+", "\n"); // 行首空白(注意不能用此规则处理代码块)
// 4. 清理<PARA>标记(PDF提取时添加的,现在用不到了)
content = content.replace("<PARA>", "").replace("</PARA>", "\n\n");
record.setProcessedContent(content.trim());
return record;
}
}章节识别与标题层级恢复
@Component
public class StructureAnalysisProcessor implements ItemProcessor<DocumentProcessingRecord, DocumentProcessingRecord> {
// Markdown标题模式
private static final Pattern MD_HEADING = Pattern.compile("^(#{1,6})\\s+(.+)$", Pattern.MULTILINE);
// 数字编号标题(1. 章节名、1.1 子章节名)
private static final Pattern NUMBERED_HEADING = Pattern.compile(
"^(\\d+(?:\\.\\d+)*)\\s+([\\u4e00-\\u9fa5A-Za-z].{2,50})$", Pattern.MULTILINE
);
@Override
public DocumentProcessingRecord process(DocumentProcessingRecord record) throws JsonProcessingException {
String content = record.getProcessedContent();
List<ProcessedDocument.DocumentSection> sections = parseStructure(content);
ObjectMapper mapper = new ObjectMapper();
record.setStructureJson(mapper.writeValueAsString(sections));
return record;
}
private List<ProcessedDocument.DocumentSection> parseStructure(String content) {
List<ProcessedDocument.DocumentSection> sections = new ArrayList<>();
String[] lines = content.split("\n");
ProcessedDocument.DocumentSection currentSection = null;
StringBuilder currentContent = new StringBuilder();
int sectionIndex = 0;
for (String line : lines) {
// 尝试识别为标题
HeadingInfo heading = detectHeading(line);
if (heading != null) {
// 保存上一个章节
if (currentSection != null) {
currentSection.setContent(currentContent.toString().trim());
sections.add(currentSection);
}
// 开始新章节
currentSection = new ProcessedDocument.DocumentSection();
currentSection.setSectionId("sec-" + (++sectionIndex));
currentSection.setTitle(heading.getTitle());
currentSection.setLevel(heading.getLevel());
currentContent = new StringBuilder();
// 建立父子关系
establishParentChild(currentSection, sections);
} else if (currentSection != null) {
currentContent.append(line).append("\n");
}
}
// 保存最后一个章节
if (currentSection != null) {
currentSection.setContent(currentContent.toString().trim());
sections.add(currentSection);
}
return sections;
}
private HeadingInfo detectHeading(String line) {
// 优先检测 Markdown 格式标题
Matcher mdMatcher = MD_HEADING.matcher(line);
if (mdMatcher.matches()) {
return new HeadingInfo(mdMatcher.group(2).trim(), mdMatcher.group(1).length());
}
// 检测数字编号格式标题
Matcher numMatcher = NUMBERED_HEADING.matcher(line);
if (numMatcher.matches()) {
String numbering = numMatcher.group(1);
int level = numbering.split("\\.").length;
return new HeadingInfo(line.trim(), Math.min(level, 6));
}
return null;
}
private void establishParentChild(ProcessedDocument.DocumentSection newSection,
List<ProcessedDocument.DocumentSection> existingSections) {
// 找到最近的、层级比当前低的章节作为父节点
for (int i = existingSections.size() - 1; i >= 0; i--) {
ProcessedDocument.DocumentSection candidate = existingSections.get(i);
if (candidate.getLevel() < newSection.getLevel()) {
newSection.setParentSectionId(candidate.getSectionId());
if (candidate.getChildSectionIds() == null) {
candidate.setChildSectionIds(new ArrayList<>());
}
candidate.getChildSectionIds().add(newSection.getSectionId());
break;
}
}
}
@Data
@AllArgsConstructor
private static class HeadingInfo {
private String title;
private int level;
}
}关键信息标注 + 语义增强 Chunk 生成
@Component
public class ChunkGenerationProcessor implements ItemProcessor<DocumentProcessingRecord, DocumentProcessingRecord> {
private static final int CHUNK_SIZE = 500; // 目标chunk大小(字符)
private static final int CHUNK_OVERLAP = 100; // 相邻chunk重叠
private static final int CONTEXT_HEADER_MAX = 200; // 上下文头信息最大长度
// 关键信息标记
private static final Pattern WARNING_PATTERN = Pattern.compile("(?i)(注意|警告|重要|⚠|Warning|Important|Note)[::]");
private static final Pattern DEFINITION_PATTERN = Pattern.compile("(?i)(定义|术语|概念|Definition)[::]");
@Override
public DocumentProcessingRecord process(DocumentProcessingRecord record) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
List<ProcessedDocument.DocumentSection> sections = mapper.readValue(
record.getStructureJson(),
new TypeReference<List<ProcessedDocument.DocumentSection>>() {}
);
List<ProcessedDocument.DocumentChunk> chunks = new ArrayList<>();
for (ProcessedDocument.DocumentSection section : sections) {
if (section.getContent() == null || section.getContent().trim().isEmpty()) {
continue;
}
// 构建章节路径(用于上下文注入)
String sectionPath = buildSectionPath(section, sections);
// 对章节内容进行分块
List<String> rawChunks = splitIntoChunks(section.getContent());
for (int i = 0; i < rawChunks.size(); i++) {
ProcessedDocument.DocumentChunk chunk = new ProcessedDocument.DocumentChunk();
chunk.setChunkId(section.getSectionId() + "-chunk-" + i);
chunk.setContent(rawChunks.get(i));
chunk.setSectionPath(sectionPath);
// 语义增强:在内容前加上章节路径上下文
String contextualContent = buildContextualContent(
sectionPath,
rawChunks.get(i),
i > 0 ? rawChunks.get(i - 1) : null // 前一个chunk的最后一句作为衔接
);
chunk.setContextualContent(contextualContent);
// 标注关键信息类型
Map<String, Object> metadata = new HashMap<>();
metadata.put("sectionPath", sectionPath);
metadata.put("sectionLevel", section.getLevel());
metadata.put("chunkIndex", i);
metadata.put("totalChunks", rawChunks.size());
// 是否包含关键信息
if (WARNING_PATTERN.matcher(rawChunks.get(i)).find()) {
metadata.put("containsWarning", true);
metadata.put("priority", "HIGH");
}
if (DEFINITION_PATTERN.matcher(rawChunks.get(i)).find()) {
metadata.put("containsDefinition", true);
metadata.put("priority", "HIGH");
}
chunk.setMetadata(metadata);
chunks.add(chunk);
}
}
// 将chunks序列化存入record(后续vectorize step使用)
record.setProcessedContent(mapper.writeValueAsString(chunks));
record.setChunkCount(chunks.size());
return record;
}
private String buildSectionPath(ProcessedDocument.DocumentSection section,
List<ProcessedDocument.DocumentSection> allSections) {
List<String> pathParts = new ArrayList<>();
pathParts.add(section.getTitle());
// 向上追溯父节点
String parentId = section.getParentSectionId();
while (parentId != null) {
final String pid = parentId;
ProcessedDocument.DocumentSection parent = allSections.stream()
.filter(s -> pid.equals(s.getSectionId()))
.findFirst()
.orElse(null);
if (parent == null) break;
pathParts.add(0, parent.getTitle());
parentId = parent.getParentSectionId();
}
return String.join(" > ", pathParts);
}
private String buildContextualContent(String sectionPath, String content, String previousChunk) {
StringBuilder sb = new StringBuilder();
// 添加章节路径作为上下文头
sb.append("[章节:").append(sectionPath).append("]\n\n");
// 如果有前一个chunk,添加最后一句作为衔接上下文
if (previousChunk != null && !previousChunk.trim().isEmpty()) {
String lastSentence = extractLastSentence(previousChunk);
if (lastSentence.length() > 10) {
sb.append("[上文末尾:").append(lastSentence).append("]\n\n");
}
}
sb.append(content);
return sb.toString();
}
private String extractLastSentence(String text) {
// 找最后一个句号、问号、感叹号
int lastEnd = Math.max(
Math.max(text.lastIndexOf('。'), text.lastIndexOf('.')),
Math.max(text.lastIndexOf('?'), text.lastIndexOf('!'))
);
if (lastEnd < 0 || lastEnd == text.length() - 1) {
// 没有句子结束符,返回最后50字
return text.substring(Math.max(0, text.length() - 50));
}
// 找倒数第二个句子结束符
int secondLastEnd = Math.max(
Math.max(text.lastIndexOf('。', lastEnd - 1), text.lastIndexOf('.', lastEnd - 1)),
Math.max(text.lastIndexOf('?', lastEnd - 1), text.lastIndexOf('!', lastEnd - 1))
);
return text.substring(secondLastEnd + 1, lastEnd + 1).trim();
}
private List<String> splitIntoChunks(String content) {
List<String> chunks = new ArrayList<>();
if (content.length() <= CHUNK_SIZE) {
chunks.add(content);
return chunks;
}
// 按段落分割,尽量在段落边界处分块
String[] paragraphs = content.split("\n\n");
StringBuilder currentChunk = new StringBuilder();
for (String paragraph : paragraphs) {
if (currentChunk.length() + paragraph.length() > CHUNK_SIZE && currentChunk.length() > 0) {
chunks.add(currentChunk.toString().trim());
// 保留重叠部分(最后CHUNK_OVERLAP字符)
String overlap = currentChunk.length() > CHUNK_OVERLAP
? currentChunk.substring(currentChunk.length() - CHUNK_OVERLAP)
: currentChunk.toString();
currentChunk = new StringBuilder(overlap).append("\n\n");
}
currentChunk.append(paragraph).append("\n\n");
}
if (currentChunk.length() > 0) {
chunks.add(currentChunk.toString().trim());
}
return chunks;
}
}向量化写入 Step
@Component
public class VectorizeItemWriter implements ItemWriter<DocumentProcessingRecord> {
private final VectorStore vectorStore;
private final ObjectMapper objectMapper;
@Override
public void write(Chunk<? extends DocumentProcessingRecord> items) throws Exception {
List<Document> documentsToStore = new ArrayList<>();
for (DocumentProcessingRecord record : items) {
if (record.getStatus() == DocumentProcessingRecord.ProcessingStatus.SKIPPED) {
continue;
}
List<ProcessedDocument.DocumentChunk> chunks = objectMapper.readValue(
record.getProcessedContent(),
new TypeReference<List<ProcessedDocument.DocumentChunk>>() {}
);
for (ProcessedDocument.DocumentChunk chunk : chunks) {
// 使用contextualContent进行向量化(包含章节上下文)
// 但存储的content是原始内容(LLM生成答案时使用)
Document doc = new Document(
chunk.getContextualContent(), // 用于embedding
chunk.getMetadata()
);
doc.getMetadata().put("originalContent", chunk.getContent());
doc.getMetadata().put("documentId", record.getDocumentId());
doc.getMetadata().put("chunkId", chunk.getChunkId());
documentsToStore.add(doc);
}
}
if (!documentsToStore.isEmpty()) {
vectorStore.add(documentsToStore);
}
}
}文档处理流水线各阶段 Mermaid 图
处理效果验证
流水线上线后,我对比了处理前后的效果。验证方式:用同一组问题,分别在"原始文档入库"和"预处理流水线入库"的知识库上测试。
格式清洗的效果:
以一份 60 页的产品手册为例,原始提取后的 chunk 数量是 342 个(因为格式问题导致很多小碎片),预处理后是 167 个 chunk,数量减少了一半,但信息覆盖率更高。
章节上下文注入的效果:
这个最直观。有一个测试问题:"系统的默认超时时间是多少?",原始知识库找到的 chunk 是:"默认超时时间为 30 秒。",这个 chunk 单独看完全不知道是哪个系统哪个操作的超时时间。注入上下文后,chunk 是:"[章节:系统配置 > 网络设置 > 连接参数] 默认超时时间为 30 秒。",LLM 就能正确回答了。
总结
文档预处理是 RAG 工程里最容易被低估、但实际收益最大的环节之一。
它不只是"把文件读出来",而是一个系统性的工程:格式清洗去噪音,章节识别建结构,上下文注入补语义。每一步都在为最终的检索和生成质量打基础。
Spring Batch 的分步骤、可重试、可监控的特性,非常适合这类批量数据处理场景。一旦流水线搭好,后续文档批量导入就变成了全自动的事情。
下一篇说半结构化数据(Excel、CSV)的处理,那又是另一套逻辑了。
