第2138篇:企业AI系统的数据管道设计——从原始数据到可检索知识的工程实践
第2138篇:企业AI系统的数据管道设计——从原始数据到可检索知识的工程实践
适读人群:负责企业AI知识库建设的数据工程师和后端工程师 | 阅读时长:约20分钟 | 核心价值:掌握AI数据管道的完整设计,从文档采集、清洗、切分到向量化入库,构建高质量的可检索知识库
我们去年给一家大型制造企业做内部知识库,第一版上线后效果很差。工程师检索技术手册,AI经常返回错误答案,或者找到的内容是过期版本。排查了两周才搞清楚问题出在哪:不是模型不好,不是RAG架构有问题,是数据管道设计得太草率了。
原始数据管道是这样的:把所有PDF扔进一个文件夹,跑个脚本提取文本,按固定500字切分,全部向量化入库。完事。
问题在哪?PDF提取出来的文本是乱的(表格变成一行行数字、图片说明文字和正文混在一起);500字切分不考虑章节边界(一个步骤被切成两半);旧版本和新版本文档同时在库里,AI不知道哪个更新;元数据缺失,无法按文档类型过滤。
这篇文章讲的就是如何正确设计这个数据管道,让知识库的数据质量达到生产可用的标准。
数据管道的整体架构
/**
* 企业AI数据管道的整体设计
*
* ===== 数据来源 =====
*
* 1. 文件系统(PDF、Word、Excel、PPT)
* 2. 企业Wiki(Confluence、Notion)
* 3. 代码仓库(README、注释、文档)
* 4. 数据库(产品信息、FAQ表)
* 5. 网页爬取(官方文档、技术博客)
*
* ===== 管道阶段 =====
*
* 采集 → 提取 → 清洗 → 切分 → 增强 → 向量化 → 入库
*
* Ingest Extract Clean Chunk Enrich Embed Index
*
* 每个阶段都有:
* - 输入/输出格式规范
* - 质量检测(拒绝低质量文档继续流转)
* - 错误处理和恢复
* - 监控指标
*
* ===== 增量更新 =====
*
* 文档变更时,不需要全量重建:
* - 内容哈希检测变化
* - 只重新处理变化的文档
* - 版本管理:旧版本标记为archived
*
* ===== 关键设计决策 =====
*
* Q: 为什么不直接用现成的文档处理库?
* A: 企业文档质量参差不齐,通用库的效果往往不够好。
* 真实场景:某公司的PDF是用Word另存为的,
* 通用库提取的文本顺序是错乱的。需要自定义处理。
*/文档采集与版本追踪
/**
* 文档采集服务
*
* 负责:
* 1. 从各数据源采集原始文档
* 2. 计算内容哈希,检测变化
* 3. 维护文档版本历史
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentIngestionService {
private final DocumentRepository documentRepo;
private final DocumentPipelineOrchestrator pipeline;
/**
* 采集单个文档
*
* 判断是新文档还是更新的文档,
* 只有内容真正变化才触发重新处理
*/
public IngestionResult ingestDocument(RawDocument rawDoc) {
// 计算内容哈希(MD5足够,不需要加密级别的强哈希)
String contentHash = DigestUtils.md5Hex(rawDoc.getContent());
// 查找已有文档记录
Optional<DocumentRecord> existing = documentRepo
.findBySourcePathAndStatus(rawDoc.getSourcePath(), DocumentStatus.ACTIVE);
if (existing.isPresent()) {
DocumentRecord record = existing.get();
// 内容没变,跳过
if (record.getContentHash().equals(contentHash)) {
log.debug("文档内容未变化,跳过: path={}", rawDoc.getSourcePath());
return IngestionResult.skipped(rawDoc.getSourcePath());
}
// 内容变了,把旧版本归档
record.setStatus(DocumentStatus.ARCHIVED);
record.setArchivedAt(LocalDateTime.now());
documentRepo.save(record);
log.info("文档有更新,归档旧版本: path={}, oldHash={}",
rawDoc.getSourcePath(), record.getContentHash());
}
// 创建新版本记录
DocumentRecord newRecord = DocumentRecord.builder()
.sourceId(generateSourceId(rawDoc.getSourcePath()))
.sourcePath(rawDoc.getSourcePath())
.sourceType(rawDoc.getSourceType())
.title(rawDoc.getTitle())
.contentHash(contentHash)
.contentLength(rawDoc.getContent().length())
.metadata(rawDoc.getMetadata())
.status(DocumentStatus.PROCESSING)
.ingestedAt(LocalDateTime.now())
.build();
documentRepo.save(newRecord);
// 触发处理管道
try {
pipeline.process(rawDoc, newRecord.getId());
newRecord.setStatus(DocumentStatus.ACTIVE);
documentRepo.save(newRecord);
return IngestionResult.success(rawDoc.getSourcePath(), newRecord.getId());
} catch (Exception e) {
newRecord.setStatus(DocumentStatus.FAILED);
newRecord.setErrorMessage(e.getMessage());
documentRepo.save(newRecord);
log.error("文档处理失败: path={}", rawDoc.getSourcePath(), e);
return IngestionResult.failed(rawDoc.getSourcePath(), e.getMessage());
}
}
/**
* 批量采集目录下的文档
*/
public BatchIngestionReport ingestDirectory(String dirPath, List<String> allowedExtensions) {
log.info("开始批量采集: dir={}, types={}", dirPath, allowedExtensions);
File dir = new File(dirPath);
if (!dir.exists() || !dir.isDirectory()) {
throw new IllegalArgumentException("目录不存在: " + dirPath);
}
// 递归找所有符合条件的文件
List<File> files = findFiles(dir, allowedExtensions);
log.info("发现文档: count={}", files.size());
int succeeded = 0, skipped = 0, failed = 0;
List<String> failedPaths = new ArrayList<>();
for (File file : files) {
try {
RawDocument rawDoc = readFile(file);
IngestionResult result = ingestDocument(rawDoc);
switch (result.getStatus()) {
case SUCCESS -> succeeded++;
case SKIPPED -> skipped++;
case FAILED -> { failed++; failedPaths.add(file.getPath()); }
}
} catch (Exception e) {
failed++;
failedPaths.add(file.getPath());
log.warn("读取文件失败: path={}", file.getPath(), e);
}
}
log.info("批量采集完成: total={}, success={}, skipped={}, failed={}",
files.size(), succeeded, skipped, failed);
return new BatchIngestionReport(files.size(), succeeded, skipped, failed, failedPaths);
}
private List<File> findFiles(File dir, List<String> extensions) {
List<File> result = new ArrayList<>();
File[] children = dir.listFiles();
if (children == null) return result;
for (File f : children) {
if (f.isDirectory()) {
result.addAll(findFiles(f, extensions));
} else {
String ext = getExtension(f.getName()).toLowerCase();
if (extensions.contains(ext)) result.add(f);
}
}
return result;
}
private String getExtension(String filename) {
int dotIdx = filename.lastIndexOf('.');
return dotIdx >= 0 ? filename.substring(dotIdx + 1) : "";
}
private String generateSourceId(String sourcePath) {
return DigestUtils.md5Hex(sourcePath).substring(0, 16);
}
private RawDocument readFile(File file) throws IOException {
// 根据文件类型读取内容(实现略)
return RawDocument.builder()
.sourcePath(file.getAbsolutePath())
.sourceType(getSourceType(file))
.title(file.getName())
.content(Files.readString(file.toPath()))
.metadata(Map.of(
"fileName", file.getName(),
"fileSize", String.valueOf(file.length()),
"lastModified", String.valueOf(file.lastModified())
))
.build();
}
private String getSourceType(File file) {
return getExtension(file.getName()).toUpperCase();
}
@Builder
public record RawDocument(String sourcePath, String sourceType, String title,
String content, Map<String, String> metadata) {}
public record IngestionResult(String path, IngestionStatus status,
Long documentId, String errorMessage) {
public static IngestionResult success(String path, Long docId) {
return new IngestionResult(path, IngestionStatus.SUCCESS, docId, null);
}
public static IngestionResult skipped(String path) {
return new IngestionResult(path, IngestionStatus.SKIPPED, null, null);
}
public static IngestionResult failed(String path, String error) {
return new IngestionResult(path, IngestionStatus.FAILED, null, error);
}
}
public record BatchIngestionReport(int total, int succeeded, int skipped,
int failed, List<String> failedPaths) {}
enum IngestionStatus { SUCCESS, SKIPPED, FAILED }
}文档提取与清洗
/**
* 文档提取与清洗服务
*
* 这是整个管道里坑最多的地方。
*
* 常见的数据质量问题:
* 1. PDF提取乱序(两列布局的PDF,内容混成一行)
* 2. 页眉页脚混入正文("第3页共20页"出现在内容里)
* 3. 大量特殊字符(\u0000, \r\n\r\n\r\n...)
* 4. 表格变成随机数字序列
* 5. 标题和正文没有语义区分
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentExtractionService {
/**
* 提取并清洗文档文本
*
* 返回结构化的ExtractedDocument,包含:
* - 清洗后的正文
* - 识别出的章节结构
* - 提取的元数据(标题、作者、创建时间等)
*/
public ExtractedDocument extractAndClean(
DocumentIngestionService.RawDocument rawDoc) {
String rawText = rawDoc.content();
// 步骤1:基础文本清洗
String cleaned = basicClean(rawText);
// 步骤2:去除噪声(页眉页脚、水印等)
cleaned = removeNoise(cleaned);
// 步骤3:修复格式问题
cleaned = fixFormatting(cleaned);
// 步骤4:质量检测
TextQuality quality = assessQuality(cleaned);
if (quality.score() < 0.4) {
log.warn("文档质量过低,可能影响检索效果: path={}, score={}",
rawDoc.sourcePath(), quality.score());
}
// 步骤5:识别章节结构
List<Section> sections = extractSections(cleaned);
return ExtractedDocument.builder()
.sourceId(rawDoc.sourcePath())
.cleanedText(cleaned)
.sections(sections)
.wordCount(countWords(cleaned))
.quality(quality)
.build();
}
/**
* 基础文本清洗
*/
private String basicClean(String text) {
if (text == null) return "";
return text
// 移除null字节和其他控制字符(保留换行、制表符)
.replaceAll("[\\x00-\\x08\\x0B\\x0C\\x0E-\\x1F\\x7F]", "")
// 统一换行符
.replace("\r\n", "\n").replace("\r", "\n")
// 压缩连续空行(最多保留2个)
.replaceAll("\n{4,}", "\n\n\n")
// 移除行尾空格
.replaceAll("[ \\t]+\n", "\n")
// 移除行首空格(保留缩进的前4个空格,用于代码)
.replaceAll("(?m)^[ \\t]{5,}", " ")
.trim();
}
/**
* 去除噪声内容
*
* 识别并移除页眉页脚、版权声明等重复噪声
*/
private String removeNoise(String text) {
String[] lines = text.split("\n");
// 统计各行出现频率,频繁出现的短行可能是页眉页脚
Map<String, Long> lineFrequency = Arrays.stream(lines)
.filter(line -> line.trim().length() < 80) // 只看短行
.collect(Collectors.groupingBy(String::trim, Collectors.counting()));
// 出现超过5次的短行,可能是噪声
Set<String> noiseLines = lineFrequency.entrySet().stream()
.filter(e -> e.getValue() > 5 && !e.getKey().isEmpty())
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
if (!noiseLines.isEmpty()) {
log.debug("识别到噪声行: {}", noiseLines);
}
// 过滤掉噪声行
return Arrays.stream(lines)
.filter(line -> !noiseLines.contains(line.trim()))
.collect(Collectors.joining("\n"));
}
/**
* 修复格式问题
*/
private String fixFormatting(String text) {
return text
// 修复:中文句子被错误断行(两个中文字符之间的换行不是段落分隔)
.replaceAll("(?<=[\\u4e00-\\u9fa5,。!?;:])\n(?=[\\u4e00-\\u9fa5])", "")
// 修复:英文单词被断行
.replaceAll("(?<=[a-z])-\n(?=[a-z])", "")
// 修复:多余的空格
.replaceAll("[ ]{3,}", " ");
}
/**
* 文本质量评估
*
* 用几个启发式指标判断文本是否有意义
*/
private TextQuality assessQuality(String text) {
if (text.length() < 50) {
return new TextQuality(0.1, "内容太短");
}
// 指标1:汉字/英文字符比例(正常文档应该有大量字母/汉字)
long alphaCount = text.chars()
.filter(c -> Character.isLetter(c))
.count();
double alphaRatio = (double) alphaCount / text.length();
// 指标2:平均行长度(太短可能是表格乱码,太长可能是提取乱序)
String[] lines = text.split("\n");
double avgLineLength = Arrays.stream(lines)
.mapToInt(String::length)
.average()
.orElse(0);
// 指标3:是否有大量连续特殊字符(乱码的特征)
boolean hasGarbled = text.matches(".*[^\\x20-\\x7E\\u4e00-\\u9fa5\\n\\t]{5,}.*");
double score = 0.5;
if (alphaRatio > 0.4) score += 0.3;
if (avgLineLength > 10 && avgLineLength < 200) score += 0.2;
if (hasGarbled) score -= 0.3;
score = Math.max(0, Math.min(1, score));
String assessment = score > 0.7 ? "良好" : score > 0.4 ? "一般" : "较差";
return new TextQuality(score, assessment);
}
/**
* 识别章节结构
*
* 通过标题特征(数字编号、字体大小暗示的星号、全大写等)
* 识别文档的章节层次
*/
private List<Section> extractSections(String text) {
List<Section> sections = new ArrayList<>();
String[] lines = text.split("\n");
Section currentSection = null;
StringBuilder currentContent = new StringBuilder();
for (String line : lines) {
String trimmed = line.trim();
// 检测标题:数字编号格式(1. 1.1 一、 等)
boolean isHeading = isHeadingLine(trimmed);
if (isHeading && trimmed.length() > 2) {
// 保存前一个章节
if (currentSection != null) {
sections.add(currentSection.withContent(currentContent.toString().trim()));
currentContent = new StringBuilder();
}
currentSection = new Section(trimmed, detectHeadingLevel(trimmed), "");
} else {
currentContent.append(line).append("\n");
}
}
// 保存最后一个章节
if (currentSection != null) {
sections.add(currentSection.withContent(currentContent.toString().trim()));
} else if (currentContent.length() > 0) {
// 没有识别到章节结构,整个文档作为一个章节
sections.add(new Section("正文", 1, currentContent.toString().trim()));
}
return sections;
}
private boolean isHeadingLine(String line) {
// 常见标题模式
return line.matches("^[一二三四五六七八九十]+[、.].*") || // 中文数字标题
line.matches("^\\d+(\\.\\d+)*\\s+\\S.*") || // 数字标题 1. 1.1
line.matches("^#{1,4}\\s+.*") || // Markdown标题
(line.length() < 50 && line.endsWith(":") && !line.contains("。")); // 短句冒号结尾
}
private int detectHeadingLevel(String line) {
if (line.startsWith("# ")) return 1;
if (line.startsWith("## ")) return 2;
if (line.startsWith("### ")) return 3;
if (line.matches("^[一二三四五六七八九十]+[、.].*")) return 1;
if (line.matches("^\\d+\\.\\d+\\.\\d+.*")) return 3;
if (line.matches("^\\d+\\.\\d+.*")) return 2;
if (line.matches("^\\d+\\..*")) return 1;
return 2;
}
private int countWords(String text) {
// 中文按字符计,英文按单词计
long chineseChars = text.chars()
.filter(c -> c >= 0x4e00 && c <= 0x9fa5)
.count();
long englishWords = Arrays.stream(text.split("\\s+"))
.filter(w -> w.matches("[a-zA-Z]+"))
.count();
return (int) (chineseChars + englishWords);
}
@Builder
public record ExtractedDocument(String sourceId, String cleanedText,
List<Section> sections, int wordCount,
TextQuality quality) {}
public record Section(String title, int level, String content) {
public Section withContent(String newContent) {
return new Section(title, level, newContent);
}
}
public record TextQuality(double score, String assessment) {}
}语义切分策略
/**
* 语义感知的文档切分服务
*
* 核心原则:切分点不能破坏语义完整性
*
* 不好的切分:
* - 按固定字符数切(可能切断一个步骤的中间)
* - 按固定句子数切(不考虑段落语义)
*
* 好的切分:
* - 沿章节边界切(一个章节是一个chunk)
* - 大章节进一步按段落切
* - 每个chunk有完整语义
* - 相邻chunk有适当重叠(避免检索时丢失上下文)
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SemanticChunkingService {
// 目标chunk大小(字符数)
private static final int TARGET_CHUNK_SIZE = 800;
// 最大chunk大小
private static final int MAX_CHUNK_SIZE = 1500;
// 相邻chunk重叠大小
private static final int OVERLAP_SIZE = 150;
/**
* 对文档进行语义切分
*
* 优先沿章节边界切分,大章节再按段落切
*/
public List<DocumentChunk> chunk(
DocumentExtractionService.ExtractedDocument doc) {
List<DocumentChunk> chunks = new ArrayList<>();
List<DocumentExtractionService.Section> sections = doc.sections();
if (sections.isEmpty()) {
// 没有章节结构,直接按段落切
chunks.addAll(chunkByParagraph(doc.cleanedText(), doc.sourceId(), "全文", 0));
} else {
int chunkIndex = 0;
for (DocumentExtractionService.Section section : sections) {
if (section.content().length() <= MAX_CHUNK_SIZE) {
// 章节够小,直接作为一个chunk
String chunkText = buildChunkWithTitle(section);
chunks.add(DocumentChunk.builder()
.id(generateChunkId(doc.sourceId(), chunkIndex))
.sourceId(doc.sourceId())
.sectionTitle(section.title())
.sectionLevel(section.level())
.content(chunkText)
.chunkIndex(chunkIndex)
.build());
chunkIndex++;
} else {
// 大章节,按段落细分
List<DocumentChunk> subChunks = chunkByParagraph(
section.content(), doc.sourceId(), section.title(), chunkIndex);
chunks.addAll(subChunks);
chunkIndex += subChunks.size();
}
}
}
// 添加相邻chunk的重叠
addOverlap(chunks);
log.debug("文档切分完成: sourceId={}, chunks={}", doc.sourceId(), chunks.size());
return chunks;
}
/**
* 按段落切分
*
* 把连续的段落合并,直到接近目标大小
*/
private List<DocumentChunk> chunkByParagraph(
String text, String sourceId, String sectionTitle, int startIndex) {
List<DocumentChunk> chunks = new ArrayList<>();
// 按空行分割段落
String[] paragraphs = text.split("\n{2,}");
StringBuilder currentChunk = new StringBuilder();
int chunkIndex = startIndex;
for (String paragraph : paragraphs) {
String trimmed = paragraph.trim();
if (trimmed.isEmpty()) continue;
// 如果加入这个段落后超过最大大小,先保存当前chunk
if (currentChunk.length() > 0 &&
currentChunk.length() + trimmed.length() > MAX_CHUNK_SIZE) {
chunks.add(DocumentChunk.builder()
.id(generateChunkId(sourceId, chunkIndex))
.sourceId(sourceId)
.sectionTitle(sectionTitle)
.content(currentChunk.toString().trim())
.chunkIndex(chunkIndex)
.build());
chunkIndex++;
currentChunk = new StringBuilder();
}
currentChunk.append(trimmed).append("\n\n");
}
// 保存最后一个chunk
if (currentChunk.length() > 0) {
chunks.add(DocumentChunk.builder()
.id(generateChunkId(sourceId, chunkIndex))
.sourceId(sourceId)
.sectionTitle(sectionTitle)
.content(currentChunk.toString().trim())
.chunkIndex(chunkIndex)
.build());
}
return chunks;
}
/**
* 为相邻chunk添加重叠内容
*
* 重叠的目的:检索到一个chunk时,能有足够的前后上下文
* 实现:把前一个chunk的最后几句话加到当前chunk开头
*/
private void addOverlap(List<DocumentChunk> chunks) {
for (int i = 1; i < chunks.size(); i++) {
DocumentChunk prev = chunks.get(i - 1);
DocumentChunk curr = chunks.get(i);
// 只在同一章节内的chunk之间添加重叠
if (!prev.sourceId().equals(curr.sourceId())) continue;
if (!prev.sectionTitle().equals(curr.sectionTitle())) continue;
// 取前一个chunk的最后OVERLAP_SIZE个字符
String prevContent = prev.content();
String overlap = prevContent.length() > OVERLAP_SIZE ?
prevContent.substring(prevContent.length() - OVERLAP_SIZE) : prevContent;
// 找到重叠部分的句子边界
int sentenceBoundary = findSentenceBoundary(overlap);
if (sentenceBoundary > 0) {
overlap = overlap.substring(sentenceBoundary);
}
// 把重叠内容加到当前chunk前面
if (!overlap.isBlank()) {
chunks.set(i, curr.withPrefixOverlap(overlap.trim()));
}
}
}
/**
* 找句子边界(用于重叠内容的起始点)
*/
private int findSentenceBoundary(String text) {
// 从前往后找第一个句子结束标志
for (int i = 0; i < text.length(); i++) {
char c = text.charAt(i);
if (c == '。' || c == '.' || c == '!' || c == '!' || c == '?' || c == '?') {
return i + 1;
}
}
return 0;
}
private String buildChunkWithTitle(DocumentExtractionService.Section section) {
if (section.title().isBlank()) return section.content();
return section.title() + "\n\n" + section.content();
}
private String generateChunkId(String sourceId, int index) {
return sourceId.hashCode() + "-chunk-" + index;
}
@Builder
public record DocumentChunk(String id, String sourceId, String sectionTitle,
int sectionLevel, String content, int chunkIndex,
String prefixOverlap) {
public String fullContent() {
if (prefixOverlap != null && !prefixOverlap.isEmpty()) {
return "[前文摘要] " + prefixOverlap + "\n\n" + content;
}
return content;
}
public DocumentChunk withPrefixOverlap(String overlap) {
return DocumentChunk.builder()
.id(id).sourceId(sourceId).sectionTitle(sectionTitle)
.sectionLevel(sectionLevel).content(content)
.chunkIndex(chunkIndex).prefixOverlap(overlap).build();
}
}
}向量化与元数据增强
/**
* 向量化与入库服务
*
* 把清洗、切分后的文档向量化,写入向量数据库
*
* 元数据设计很重要:
* 好的元数据让用户可以按类型、部门、时间过滤,
* 大幅减少无关文档干扰。
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class EmbeddingAndIndexingService {
private final EmbeddingModel embeddingModel;
private final VectorStore vectorStore;
private final ChatLanguageModel llm;
// 向量化批次大小(避免OOM)
private static final int EMBEDDING_BATCH_SIZE = 50;
/**
* 向量化并写入索引
*
* 包含元数据增强:用LLM生成每个chunk的关键词和摘要
*/
public IndexingResult indexChunks(
List<SemanticChunkingService.DocumentChunk> chunks,
Map<String, String> documentMetadata) {
int indexed = 0, failed = 0;
// 分批处理
for (int batchStart = 0; batchStart < chunks.size(); batchStart += EMBEDDING_BATCH_SIZE) {
int batchEnd = Math.min(batchStart + EMBEDDING_BATCH_SIZE, chunks.size());
List<SemanticChunkingService.DocumentChunk> batch =
chunks.subList(batchStart, batchEnd);
try {
List<VectorStore.Document> vectorDocs = prepareBatch(batch, documentMetadata);
vectorStore.addAll(vectorDocs);
indexed += batch.size();
log.debug("批次入库完成: batch={}-{}", batchStart, batchEnd);
} catch (Exception e) {
failed += batch.size();
log.error("批次入库失败: batch={}-{}", batchStart, batchEnd, e);
}
}
log.info("索引完成: total={}, indexed={}, failed={}",
chunks.size(), indexed, failed);
return new IndexingResult(chunks.size(), indexed, failed);
}
/**
* 准备向量化批次
*
* 包含:
* 1. 生成向量
* 2. 提取关键词(用于全文检索的补充)
* 3. 构建元数据
*/
private List<VectorStore.Document> prepareBatch(
List<SemanticChunkingService.DocumentChunk> chunks,
Map<String, String> documentMetadata) {
List<VectorStore.Document> result = new ArrayList<>();
for (SemanticChunkingService.DocumentChunk chunk : chunks) {
// 生成向量(用fullContent,包含重叠上下文)
String textToEmbed = chunk.fullContent();
float[] vector = embeddingModel.embed(textToEmbed).content().vector();
// 构建元数据(供过滤使用)
Map<String, Object> metadata = buildChunkMetadata(chunk, documentMetadata);
result.add(VectorStore.Document.builder()
.id(chunk.id())
.content(chunk.content()) // 存储原始内容(不含重叠)
.vector(vector)
.metadata(metadata)
.build());
}
return result;
}
/**
* 构建chunk的元数据
*
* 元数据设计原则:存储所有可能用于过滤的字段
*/
private Map<String, Object> buildChunkMetadata(
SemanticChunkingService.DocumentChunk chunk,
Map<String, String> documentMetadata) {
Map<String, Object> metadata = new HashMap<>();
// 来自文档的元数据
metadata.putAll(documentMetadata);
// chunk自身的元数据
metadata.put("sourceId", chunk.sourceId());
metadata.put("chunkIndex", chunk.chunkIndex());
metadata.put("sectionTitle", chunk.sectionTitle());
metadata.put("sectionLevel", chunk.sectionLevel());
metadata.put("contentLength", chunk.content().length());
// 计算文本统计
metadata.put("hasCode", chunk.content().contains("```") ||
chunk.content().matches(".*\\{[^}]{20,}\\}.*"));
metadata.put("hasTable", chunk.content().contains("|") &&
chunk.content().contains("---"));
// 时间戳(用于时效性过滤)
metadata.put("indexedAt", LocalDateTime.now().toString());
return metadata;
}
/**
* 增量更新:只更新指定sourceId的chunks
*
* 先删除旧版本的所有chunks,再写入新的
*/
public void reindexDocument(String sourceId,
List<SemanticChunkingService.DocumentChunk> newChunks,
Map<String, String> documentMetadata) {
// 删除旧版本
vectorStore.deleteByMetadata("sourceId", sourceId);
log.info("已删除旧版本索引: sourceId={}", sourceId);
// 写入新版本
IndexingResult result = indexChunks(newChunks, documentMetadata);
log.info("新版本索引完成: sourceId={}, indexed={}", sourceId, result.indexed());
}
public record IndexingResult(int total, int indexed, int failed) {}
}管道编排与监控
/**
* 数据管道编排器
*
* 把采集→提取→切分→向量化串联起来,
* 并收集管道各阶段的质量指标
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentPipelineOrchestrator {
private final DocumentExtractionService extractionService;
private final SemanticChunkingService chunkingService;
private final EmbeddingAndIndexingService indexingService;
private final PipelineMetricsRepository metricsRepo;
/**
* 处理单个文档(完整管道)
*/
public PipelineResult process(
DocumentIngestionService.RawDocument rawDoc, Long documentRecordId) {
long pipelineStart = System.currentTimeMillis();
// 阶段1:提取与清洗
long extractStart = System.currentTimeMillis();
DocumentExtractionService.ExtractedDocument extracted;
try {
extracted = extractionService.extractAndClean(rawDoc);
} catch (Exception e) {
throw new PipelineException("提取阶段失败: " + e.getMessage(), e);
}
long extractMs = System.currentTimeMillis() - extractStart;
// 质量检查:质量太低则拒绝继续
if (extracted.quality().score() < 0.2) {
log.warn("文档质量过低,终止处理: source={}, score={}",
rawDoc.sourcePath(), extracted.quality().score());
return PipelineResult.rejected(rawDoc.sourcePath(),
"文档质量分 " + extracted.quality().score() + " 低于阈值");
}
// 阶段2:语义切分
long chunkStart = System.currentTimeMillis();
List<SemanticChunkingService.DocumentChunk> chunks;
try {
chunks = chunkingService.chunk(extracted);
} catch (Exception e) {
throw new PipelineException("切分阶段失败: " + e.getMessage(), e);
}
long chunkMs = System.currentTimeMillis() - chunkStart;
if (chunks.isEmpty()) {
return PipelineResult.rejected(rawDoc.sourcePath(), "切分结果为空");
}
// 阶段3:向量化入库
long indexStart = System.currentTimeMillis();
EmbeddingAndIndexingService.IndexingResult indexingResult;
try {
indexingResult = indexingService.indexChunks(chunks, rawDoc.metadata());
} catch (Exception e) {
throw new PipelineException("入库阶段失败: " + e.getMessage(), e);
}
long indexMs = System.currentTimeMillis() - indexStart;
long totalMs = System.currentTimeMillis() - pipelineStart;
// 记录管道指标
PipelineMetrics metrics = PipelineMetrics.builder()
.documentRecordId(documentRecordId)
.sourcePath(rawDoc.sourcePath())
.wordCount(extracted.wordCount())
.qualityScore(extracted.quality().score())
.sectionCount(extracted.sections().size())
.chunkCount(chunks.size())
.indexedChunks(indexingResult.indexed())
.extractMs(extractMs)
.chunkMs(chunkMs)
.indexMs(indexMs)
.totalMs(totalMs)
.createdAt(LocalDateTime.now())
.build();
metricsRepo.save(metrics);
log.info("文档管道完成: source={}, words={}, chunks={}, totalMs={}",
rawDoc.sourcePath(), extracted.wordCount(), chunks.size(), totalMs);
return PipelineResult.success(rawDoc.sourcePath(), chunks.size(), totalMs);
}
/**
* 生成管道健康报告
*
* 定期运行,发现质量问题
*/
public PipelineHealthReport generateHealthReport(LocalDateTime since) {
List<PipelineMetrics> recentMetrics = metricsRepo.findBySince(since);
if (recentMetrics.isEmpty()) {
return new PipelineHealthReport(0, 0, 0, 0, 0, 0);
}
long totalProcessed = recentMetrics.size();
long lowQualityCount = recentMetrics.stream()
.filter(m -> m.qualityScore() < 0.5)
.count();
double avgQuality = recentMetrics.stream()
.mapToDouble(PipelineMetrics::qualityScore)
.average().orElse(0);
double avgChunksPerDoc = recentMetrics.stream()
.mapToInt(PipelineMetrics::chunkCount)
.average().orElse(0);
double avgProcessMs = recentMetrics.stream()
.mapToLong(PipelineMetrics::totalMs)
.average().orElse(0);
long failedCount = recentMetrics.stream()
.filter(m -> m.indexedChunks() == 0)
.count();
return new PipelineHealthReport(
totalProcessed, lowQualityCount, failedCount,
avgQuality, avgChunksPerDoc, avgProcessMs
);
}
public record PipelineResult(String sourcePath, PipelineStatus status,
int chunkCount, long processingMs, String rejectReason) {
public static PipelineResult success(String path, int chunks, long ms) {
return new PipelineResult(path, PipelineStatus.SUCCESS, chunks, ms, null);
}
public static PipelineResult rejected(String path, String reason) {
return new PipelineResult(path, PipelineStatus.REJECTED, 0, 0, reason);
}
}
@Builder
public record PipelineMetrics(Long documentRecordId, String sourcePath,
int wordCount, double qualityScore,
int sectionCount, int chunkCount, int indexedChunks,
long extractMs, long chunkMs, long indexMs, long totalMs,
LocalDateTime createdAt) {}
public record PipelineHealthReport(long totalProcessed, long lowQualityCount,
long failedCount, double avgQualityScore,
double avgChunksPerDoc, double avgProcessMs) {}
enum PipelineStatus { SUCCESS, REJECTED, FAILED }
static class PipelineException extends RuntimeException {
public PipelineException(String message, Throwable cause) {
super(message, cause);
}
}
}实践建议
数据清洗的投入产出比是整个管道最高的
我们当时花了两周专门做文档清洗,最终知识库的检索准确率从45%提升到78%。同样的时间如果花在调优RAG架构或更换更好的Embedding模型上,不可能有这么大的提升。原因是:垃圾进垃圾出(Garbage In, Garbage Out)——向量模型再强,也没办法从乱码中找到语义。优先做数据质量,再做算法优化。
章节边界是比字符数更好的切分基准
很多RAG教程直接说"按512 token切分",这其实是个懒办法。真实场景:一份操作手册,某个步骤是"2.3.1 配置数据库连接",这个章节只有200字,但包含完整的操作步骤。按固定字符切可能把它切成两半,导致前半截的步骤找不到后续,后半截没有前提条件。按章节切,这200字作为一个独立chunk,检索到后完整呈现给LLM,质量远优于碎片化的切法。
版本管理是知识库"失忆"问题的根源
做了版本管理前,我们的知识库有个奇怪现象:有时候给出的是一年前的旧答案。排查后发现:文档更新了,但旧版本的chunks还在向量库里,检索时新旧都能召回,LLM选了旧的。解决方案:文档更新时,先把旧版本的所有chunks从向量库删掉(按sourceId批量删),再写入新版本。加上文档的activeStatus管理,任何时候向量库里只有最新版本的chunks。这个改动把答案时效性问题从高频降到了偶发。
