第2106篇:非结构化数据的AI提取流水线——合同、报告、邮件的批量结构化处理
2026/4/30大约 9 分钟
第2106篇:非结构化数据的AI提取流水线——合同、报告、邮件的批量结构化处理
适读人群:需要从大量文档中提取信息的工程师 | 阅读时长:约19分钟 | 核心价值:掌握LLM信息提取的管道设计,包括字段定义、批量处理、准确率评估和人工纠错的闭环
企业里有大量"信息困在文档里"的场景:几千份采购合同、几万封客服邮件、每年积累的财报和报告。
这些信息本来很有价值,但因为是非结构化的,无法直接用于数据分析、风控模型或自动化决策。人工录入效率低、出错率高。LLM天然适合做这件事。
这篇文章的核心是:如何让AI信息提取达到"可以用于业务流程"的质量要求,而不是"看上去还行"的演示级别。
信息提取的质量要求
/**
* 演示级别 vs 生产级别的差距
*
* 演示级别(很多人停在这里):
* - 在清晰标准的文档上,90%的字段提取正确
* - 错误会导致界面展示异常,但用户可以手动纠正
*
* 生产级别(驱动业务流程的要求):
* - 关键字段(金额、日期、合同主体)准确率≥99%
* - 非关键字段准确率≥95%
* - 对不确定的字段,能识别出来让人工复核(而不是给一个错误答案)
* - 对格式异常的文档,能优雅降级而不是崩溃
*
* 差距的原因:
* 1. 真实文档格式千变万化(手写、扫描、格式混乱)
* 2. 边缘案例:缺少某字段、字段有多个值
* 3. LLM在不确定时会猜(而不是说"不知道")
* 4. 批量处理时的稳定性要求
*/提取Schema定义
/**
* 提取字段的声明式定义
*
* 把"要提取什么"和"如何提取"分离
*/
@Data
@Builder
public class ExtractionSchema {
private String schemaName; // 如:purchase_contract
private List<FieldDefinition> fields;
@Data
@Builder
public static class FieldDefinition {
private String fieldName;
private String description; // 字段的自然语言描述,给LLM看
private FieldType type; // 字段类型
private boolean required; // 是否必填
private boolean critical; // 是否关键字段(关键字段需要更高置信度)
private String validationPattern; // 正则校验(可选)
private List<String> enumValues; // 如果是枚举类型,列出所有值
private String extractionHint; // 额外提示(告诉LLM在文档哪里可能找到这个字段)
public enum FieldType {
STRING, NUMBER, DATE, BOOLEAN, ENUM, LIST_OF_STRINGS
}
}
/**
* 采购合同的Schema定义
*/
public static ExtractionSchema purchaseContractSchema() {
return ExtractionSchema.builder()
.schemaName("purchase_contract")
.fields(List.of(
FieldDefinition.builder()
.fieldName("contractNumber")
.description("合同编号,通常在文档开头或右上角")
.type(FieldDefinition.FieldType.STRING)
.required(true)
.critical(true)
.extractionHint("格式通常为:年份-序号,如2024-PC-001")
.build(),
FieldDefinition.builder()
.fieldName("partyA")
.description("甲方(采购方/买方)的公司全称")
.type(FieldDefinition.FieldType.STRING)
.required(true)
.critical(true)
.build(),
FieldDefinition.builder()
.fieldName("partyB")
.description("乙方(供应商/卖方)的公司全称")
.type(FieldDefinition.FieldType.STRING)
.required(true)
.critical(true)
.build(),
FieldDefinition.builder()
.fieldName("totalAmount")
.description("合同总金额(不含税)")
.type(FieldDefinition.FieldType.NUMBER)
.required(true)
.critical(true)
.extractionHint("注意:可能有含税金额和不含税金额,取不含税")
.build(),
FieldDefinition.builder()
.fieldName("currency")
.description("合同币种")
.type(FieldDefinition.FieldType.ENUM)
.required(false)
.critical(false)
.enumValues(List.of("CNY", "USD", "EUR", "JPY"))
.build(),
FieldDefinition.builder()
.fieldName("signDate")
.description("合同签署日期")
.type(FieldDefinition.FieldType.DATE)
.required(true)
.critical(false)
.validationPattern("\\d{4}-\\d{2}-\\d{2}")
.build(),
FieldDefinition.builder()
.fieldName("deliveryItems")
.description("交付物清单(产品/服务名称列表)")
.type(FieldDefinition.FieldType.LIST_OF_STRINGS)
.required(false)
.critical(false)
.build()
))
.build();
}
}LLM提取执行器
/**
* 基于Schema的LLM提取执行器
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class LlmExtractionExecutor {
private final ChatLanguageModel llm;
/**
* 执行提取
*/
public ExtractionResult extract(String documentText, ExtractionSchema schema) {
String schemaDescription = buildSchemaDescription(schema);
// 长文档分段处理
if (documentText.length() > 8000) {
return extractLongDocument(documentText, schema, schemaDescription);
}
return extractSingleChunk(documentText, schema, schemaDescription);
}
private ExtractionResult extractSingleChunk(
String text, ExtractionSchema schema, String schemaDesc) {
String prompt = """
请从以下文档中提取结构化信息。
**提取字段说明**:
%s
**文档内容**:
---
%s
---
**重要规则**:
1. 如果某字段在文档中不存在,返回null,不要猜测
2. 如果某字段存在但不确定,用 "__UNCERTAIN__" 标记
3. 日期统一格式为:YYYY-MM-DD
4. 金额只返回数字(不要货币符号和逗号)
5. 公司名称提取完整全称(不要简称)
返回JSON(严格按字段名),不要其他内容。
""".formatted(schemaDesc, truncate(text, 6000));
try {
String response = llm.generate(prompt);
return parseExtractionResult(response, schema);
} catch (Exception e) {
log.error("提取失败: {}", e.getMessage());
return ExtractionResult.failed(e.getMessage());
}
}
/**
* 长文档分段提取后合并
*
* 策略:先从文档开头(通常包含关键信息)提取,
* 如果有缺失字段,再从后续段落补充
*/
private ExtractionResult extractLongDocument(
String text, ExtractionSchema schema, String schemaDesc) {
// 策略1:先用前4000字符提取(合同关键信息通常在前几页)
ExtractionResult firstChunkResult = extractSingleChunk(
text.substring(0, Math.min(4000, text.length())), schema, schemaDesc);
// 检查关键字段是否都提取到了
List<String> missingCritical = getMissingCriticalFields(firstChunkResult, schema);
if (missingCritical.isEmpty()) {
return firstChunkResult;
}
// 策略2:从后续段落补充缺失的关键字段
log.debug("关键字段缺失,尝试从后续段落提取: missing={}", missingCritical);
// 取中间段落
int midStart = Math.min(4000, text.length() / 3);
int midEnd = Math.min(midStart + 4000, text.length());
String midSection = midStart < midEnd ? text.substring(midStart, midEnd) : "";
if (!midSection.isBlank()) {
ExtractionResult midResult = extractSingleChunk(midSection, schema, schemaDesc);
firstChunkResult = mergeResults(firstChunkResult, midResult);
}
return firstChunkResult;
}
private String buildSchemaDescription(ExtractionSchema schema) {
return schema.getFields().stream()
.map(f -> {
StringBuilder desc = new StringBuilder();
desc.append("- **").append(f.getFieldName()).append("**");
desc.append(" (").append(f.getType().name());
if (f.isRequired()) desc.append(", 必填");
if (f.isCritical()) desc.append(", 关键字段");
desc.append("): ").append(f.getDescription());
if (f.getExtractionHint() != null) {
desc.append("。提示:").append(f.getExtractionHint());
}
if (f.getEnumValues() != null && !f.getEnumValues().isEmpty()) {
desc.append("。可选值:").append(String.join("/", f.getEnumValues()));
}
return desc.toString();
})
.collect(Collectors.joining("\n"));
}
private ExtractionResult parseExtractionResult(String json, ExtractionSchema schema) {
try {
String cleaned = extractJson(json);
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(cleaned);
Map<String, FieldExtractionResult> fields = new LinkedHashMap<>();
for (ExtractionSchema.FieldDefinition fieldDef : schema.getFields()) {
JsonNode value = node.get(fieldDef.getFieldName());
if (value == null || value.isNull()) {
fields.put(fieldDef.getFieldName(), FieldExtractionResult.notFound());
continue;
}
String rawValue = value.asText("");
boolean isUncertain = "__UNCERTAIN__".equals(rawValue);
// 校验和标准化
FieldExtractionResult result = isUncertain
? FieldExtractionResult.uncertain(rawValue)
: validateAndNormalize(fieldDef, rawValue);
fields.put(fieldDef.getFieldName(), result);
}
return new ExtractionResult(true, fields, null);
} catch (Exception e) {
log.warn("提取结果解析失败: {}", e.getMessage());
return ExtractionResult.failed("结果解析失败: " + e.getMessage());
}
}
private FieldExtractionResult validateAndNormalize(
ExtractionSchema.FieldDefinition fieldDef, String rawValue) {
// 类型校验
if (fieldDef.getType() == ExtractionSchema.FieldDefinition.FieldType.NUMBER) {
try {
double num = Double.parseDouble(rawValue.replaceAll("[,,]", ""));
return FieldExtractionResult.ok(String.valueOf(num));
} catch (NumberFormatException e) {
return FieldExtractionResult.invalid("不是有效的数字: " + rawValue);
}
}
if (fieldDef.getType() == ExtractionSchema.FieldDefinition.FieldType.DATE) {
if (!rawValue.matches("\\d{4}-\\d{2}-\\d{2}")) {
return FieldExtractionResult.invalid("日期格式不正确: " + rawValue);
}
}
if (fieldDef.getType() == ExtractionSchema.FieldDefinition.FieldType.ENUM) {
if (fieldDef.getEnumValues() != null &&
!fieldDef.getEnumValues().contains(rawValue.toUpperCase())) {
return FieldExtractionResult.invalid("枚举值不在范围内: " + rawValue);
}
}
if (fieldDef.getValidationPattern() != null) {
if (!rawValue.matches(fieldDef.getValidationPattern())) {
return FieldExtractionResult.invalid("格式校验失败: " + rawValue);
}
}
return FieldExtractionResult.ok(rawValue);
}
private List<String> getMissingCriticalFields(
ExtractionResult result, ExtractionSchema schema) {
return schema.getFields().stream()
.filter(ExtractionSchema.FieldDefinition::isCritical)
.filter(f -> {
FieldExtractionResult r = result.fields().get(f.getFieldName());
return r == null || r.status() == FieldExtractionResult.Status.NOT_FOUND;
})
.map(ExtractionSchema.FieldDefinition::getFieldName)
.toList();
}
private ExtractionResult mergeResults(ExtractionResult primary, ExtractionResult supplement) {
Map<String, FieldExtractionResult> merged = new LinkedHashMap<>(primary.fields());
supplement.fields().forEach((fieldName, supplementResult) -> {
FieldExtractionResult primaryResult = merged.get(fieldName);
if (primaryResult == null ||
primaryResult.status() == FieldExtractionResult.Status.NOT_FOUND) {
merged.put(fieldName, supplementResult);
}
});
return new ExtractionResult(true, merged, null);
}
private String extractJson(String s) {
int start = s.indexOf('{');
int end = s.lastIndexOf('}');
return (start >= 0 && end > start) ? s.substring(start, end + 1) : s;
}
private String truncate(String text, int maxLen) {
return text.length() > maxLen ? text.substring(0, maxLen) : text;
}
// 数据类
public record ExtractionResult(
boolean success,
Map<String, FieldExtractionResult> fields,
String errorMessage
) {
static ExtractionResult failed(String error) {
return new ExtractionResult(false, Map.of(), error);
}
}
public record FieldExtractionResult(Status status, String value, String message) {
enum Status { OK, NOT_FOUND, UNCERTAIN, INVALID }
static FieldExtractionResult ok(String value) {
return new FieldExtractionResult(Status.OK, value, null);
}
static FieldExtractionResult notFound() {
return new FieldExtractionResult(Status.NOT_FOUND, null, null);
}
static FieldExtractionResult uncertain(String value) {
return new FieldExtractionResult(Status.UNCERTAIN, value, "不确定");
}
static FieldExtractionResult invalid(String message) {
return new FieldExtractionResult(Status.INVALID, null, message);
}
}
}准确率评估和人工纠错闭环
/**
* 提取质量评估和人工纠错
*
* 核心思路:
* 1. 自动标记低置信度的结果(UNCERTAIN/INVALID)
* 2. 抽样验证(随机抽10%让人工核实)
* 3. 错误模式分析(哪类文档/字段最容易出错)
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ExtractionQualityManager {
private final ExtractionResultRepository resultRepo;
/**
* 评估提取结果是否需要人工审核
*/
public ReviewDecision assessForReview(
LlmExtractionExecutor.ExtractionResult result,
ExtractionSchema schema) {
List<String> reviewReasons = new ArrayList<>();
// 1. 关键字段缺失
schema.getFields().stream()
.filter(ExtractionSchema.FieldDefinition::isCritical)
.filter(f -> {
LlmExtractionExecutor.FieldExtractionResult r = result.fields().get(f.getFieldName());
return r == null || r.status() != LlmExtractionExecutor.FieldExtractionResult.Status.OK;
})
.forEach(f -> reviewReasons.add("关键字段缺失或异常: " + f.getFieldName()));
// 2. 有不确定字段
result.fields().entrySet().stream()
.filter(e -> e.getValue().status() ==
LlmExtractionExecutor.FieldExtractionResult.Status.UNCERTAIN)
.forEach(e -> reviewReasons.add("字段不确定: " + e.getKey()));
// 3. 有校验失败
result.fields().entrySet().stream()
.filter(e -> e.getValue().status() ==
LlmExtractionExecutor.FieldExtractionResult.Status.INVALID)
.forEach(e -> reviewReasons.add("字段校验失败: " + e.getKey() +
" - " + e.getValue().message()));
if (!reviewReasons.isEmpty()) {
return ReviewDecision.needsReview(reviewReasons);
}
// 随机抽样(10%)
if (Math.random() < 0.1) {
return ReviewDecision.randomSample();
}
return ReviewDecision.autoApprove();
}
/**
* 接收人工纠错结果,更新准确率统计
*/
public void submitCorrection(
String documentId,
Map<String, String> corrections) {
// 保存纠错数据
resultRepo.saveCorrections(documentId, corrections);
// 更新字段准确率统计(用于持续评估)
updateFieldAccuracyStats(corrections);
log.info("收到人工纠错: documentId={}, correctedFields={}",
documentId, corrections.keySet());
}
private void updateFieldAccuracyStats(Map<String, String> corrections) {
// 每次纠错都更新字段的历史准确率
// 准确率低的字段,下次提取时可以在Prompt里加更多提示
}
public record ReviewDecision(boolean needsReview, List<String> reasons, boolean isRandomSample) {
static ReviewDecision autoApprove() {
return new ReviewDecision(false, List.of(), false);
}
static ReviewDecision needsReview(List<String> reasons) {
return new ReviewDecision(true, reasons, false);
}
static ReviewDecision randomSample() {
return new ReviewDecision(true, List.of("随机抽样审核"), true);
}
}
}批量处理管道
/**
* 批量文档提取管道
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class BatchExtractionPipeline {
private final LlmExtractionExecutor extractor;
private final ExtractionQualityManager qualityManager;
private final HumanReviewQueue reviewQueue;
private final ExtractionResultRepository resultRepo;
// 控制并发(避免LLM API限流)
private final Semaphore rateLimiter = new Semaphore(5);
public BatchExtractionJob process(List<DocumentToExtract> documents,
ExtractionSchema schema) {
String jobId = "batch-extract-" + System.currentTimeMillis();
log.info("批量提取任务启动: jobId={}, 文档数={}", jobId, documents.size());
AtomicInteger processed = new AtomicInteger(0);
AtomicInteger autoApproved = new AtomicInteger(0);
AtomicInteger sentToReview = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
List<CompletableFuture<Void>> futures = documents.stream()
.map(doc -> CompletableFuture.runAsync(() -> {
try {
rateLimiter.acquire();
try {
// 提取
LlmExtractionExecutor.ExtractionResult result =
extractor.extract(doc.getContent(), schema);
// 评估是否需要人工审核
ExtractionQualityManager.ReviewDecision decision =
qualityManager.assessForReview(result, schema);
// 保存结果
resultRepo.save(doc.getDocumentId(), result, decision);
// 需要审核的送到队列
if (decision.needsReview()) {
reviewQueue.enqueue(doc.getDocumentId(), result,
decision.reasons());
sentToReview.incrementAndGet();
} else {
autoApproved.incrementAndGet();
}
} finally {
rateLimiter.release();
}
} catch (Exception e) {
log.error("文档提取失败: documentId={}", doc.getDocumentId(), e);
failed.incrementAndGet();
} finally {
int total = processed.incrementAndGet();
if (total % 50 == 0) {
log.info("进度: {}/{}, 自动通过={}, 待审核={}, 失败={}",
total, documents.size(), autoApproved.get(),
sentToReview.get(), failed.get());
}
}
}))
.toList();
// 等待所有完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
log.info("批量提取完成: 总文档={}, 自动通过={}, 待审核={}, 失败={}",
documents.size(), autoApproved.get(),
sentToReview.get(), failed.get());
});
return new BatchExtractionJob(jobId, documents.size(), "RUNNING");
}
public record DocumentToExtract(String documentId, String content) {}
public record BatchExtractionJob(String jobId, int totalDocuments, String status) {}
}实践建议
先定义质量标准,再构建系统
信息提取项目最常见的失败模式是:花三个月构建系统,最后发现关键字段准确率只有85%,而业务要求是99%。正确的顺序是:先用100份文档做人工测试,确认LLM在这类文档上能达到目标准确率,再开始工程化。如果达不到,先用Prompt工程优化,还达不到就考虑微调。
先用高质量文档,再扩展到难文档
不要一开始就用最混乱的文档。按文档质量分类:标准格式的数字合同→略有格式变化的合同→扫描版合同→手写合同。每类分别调优,逐步扩展支持范围。很多项目把所有文档混在一起测试,结果被最难的10%拉低了整体准确率,误判系统能力。
UNCERTAIN标记是你最好的朋友
让LLM在不确定时输出UNCERTAIN,而不是猜一个答案,这是提升系统可靠性最重要的设计。起初工程师会抱怨"UNCERTAIN太多了",但这比静默地输出错误答案要好得多。随着Prompt优化和文档质量提升,UNCERTAIN比例会自然下降。
