第2482篇:AI驱动的数据清洗——自动化数据质量修复的工程实践
第2482篇:AI驱动的数据清洗——自动化数据质量修复的工程实践
适读人群:数据工程师、Java后端工程师、AI工程师 | 阅读时长:约14分钟 | 核心价值:用AI能力替代人工数据清洗,构建自动化数据质量修复流水线
有一次我们做一个电商分析项目,原始数据来自三个不同的业务系统,汇总到数仓之后,数据质量一塌糊涂。
商品类目字段里,同样是"笔记本电脑",有人填的是"笔记本",有人填的是"laptop",有人填的是"note book",有人填的是"笔记本电脑-14寸",还有一条神奇的填成了"小米12"——这显然是复制粘贴错了。
手动清洗?2400万条数据,100多个字段。我们算了一下,如果靠人工加规则,写清洗脚本 + 逐字段处理,至少需要两个月。
于是决定:用 AI 来做。
一、数据质量问题的分类
在动手之前,先把问题分类清楚。数据质量问题大体上可以分成五类:
1. 格式错误:日期格式不统一("2024/1/1" vs "20240101" vs "2024年1月1日"),手机号前缀不规范,等等。这类问题用正则规则就能解决,不需要 AI。
2. 值域违规:年龄字段填了 -5,折扣率填了 200%,这类靠约束检查就能发现。
3. 语义歧义:同一个概念有多种表达方式,比如上面的商品类目。这是 AI 最擅长处理的地方。
4. 缺失值:字段为空。有些可以用统计方法填补,有些需要 AI 根据上下文推断。
5. 逻辑矛盾:同一条记录里,"出生日期"是 2010 年,"工龄"却填了 15 年。这类需要多字段联合判断。
二、整体架构设计
我们设计的系统叫 DataCleaner,核心思路是:规则优先,AI 兜底,人工审核高置信度之外的修复。
为什么不全用 AI?因为 AI 处理简单的格式问题又慢又贵,而规则引擎处理格式问题又快又准。AI 的价值在于处理那些需要"理解语义"才能处理的问题。
架构分三层:
- 快通道:格式类问题,进规则引擎,毫秒级处理
- 慢通道:语义类问题,进 AI 修复管道,秒级处理
- 人工审核队列:AI 置信度低于阈值的修复建议,推给人工确认
三、核心实现代码
3.1 数据质量检测器
@Service
@Slf4j
public class DataQualityDetector {
// 检测一行数据的质量问题
public List<DataQualityIssue> detect(DataRecord record, SchemaDefinition schema) {
List<DataQualityIssue> issues = new ArrayList<>();
for (FieldDefinition field : schema.getFields()) {
Object value = record.get(field.getName());
// 空值检测
if (value == null || isBlank(value)) {
if (field.isRequired()) {
issues.add(DataQualityIssue.builder()
.fieldName(field.getName())
.issueType(IssueType.MISSING_VALUE)
.currentValue(null)
.severity(field.isRequired() ? Severity.HIGH : Severity.LOW)
.build());
}
continue;
}
String strValue = String.valueOf(value);
// 格式检测
if (field.getPattern() != null && !strValue.matches(field.getPattern())) {
issues.add(DataQualityIssue.builder()
.fieldName(field.getName())
.issueType(IssueType.FORMAT_ERROR)
.currentValue(strValue)
.expectedPattern(field.getPattern())
.severity(Severity.MEDIUM)
.build());
}
// 值域检测
if (field.getEnumValues() != null && !field.getEnumValues().isEmpty()) {
if (!field.getEnumValues().contains(strValue)) {
issues.add(DataQualityIssue.builder()
.fieldName(field.getName())
.issueType(IssueType.SEMANTIC_MISMATCH)
.currentValue(strValue)
.validValues(field.getEnumValues())
.severity(Severity.MEDIUM)
.build());
}
}
// 范围检测
if (field.getMinValue() != null || field.getMaxValue() != null) {
try {
double numValue = Double.parseDouble(strValue);
if (field.getMinValue() != null && numValue < field.getMinValue()) {
issues.add(DataQualityIssue.builder()
.fieldName(field.getName())
.issueType(IssueType.RANGE_VIOLATION)
.currentValue(strValue)
.severity(Severity.HIGH)
.build());
}
} catch (NumberFormatException e) {
// 数值字段填了非数字
issues.add(DataQualityIssue.builder()
.fieldName(field.getName())
.issueType(IssueType.TYPE_MISMATCH)
.currentValue(strValue)
.severity(Severity.HIGH)
.build());
}
}
}
// 逻辑一致性检测(跨字段)
issues.addAll(detectLogicalInconsistency(record, schema));
return issues;
}
private List<DataQualityIssue> detectLogicalInconsistency(
DataRecord record, SchemaDefinition schema) {
List<DataQualityIssue> issues = new ArrayList<>();
for (ConsistencyRule rule : schema.getConsistencyRules()) {
if (!rule.evaluate(record)) {
issues.add(DataQualityIssue.builder()
.fieldName(rule.getPrimaryField())
.issueType(IssueType.LOGICAL_INCONSISTENCY)
.description(rule.getDescription())
.relatedFields(rule.getRelatedFields())
.severity(Severity.HIGH)
.build());
}
}
return issues;
}
private boolean isBlank(Object value) {
return value instanceof String && ((String) value).trim().isEmpty();
}
}3.2 AI 语义修复器
@Service
@Slf4j
public class AISemanticRepairService {
private final ChatClient chatClient;
private final RepairCacheService cacheService;
// 修复语义不匹配的字段值
public RepairResult repairSemanticMismatch(DataQualityIssue issue) {
// 先查缓存:同样的字段+同样的错误值,不重复调用 AI
String cacheKey = issue.getFieldName() + ":" + issue.getCurrentValue();
RepairResult cached = cacheService.get(cacheKey);
if (cached != null) {
return cached;
}
String prompt = buildRepairPrompt(issue);
try {
String response = chatClient.call(prompt);
RepairResult result = parseRepairResponse(response, issue.getValidValues());
// 缓存结果
cacheService.put(cacheKey, result, Duration.ofHours(24));
log.info("语义修复: {} -> {} (置信度: {})",
issue.getCurrentValue(), result.getSuggestedValue(), result.getConfidence());
return result;
} catch (Exception e) {
log.error("AI修复失败", e);
return RepairResult.failed(issue.getCurrentValue(), "AI调用失败");
}
}
private String buildRepairPrompt(DataQualityIssue issue) {
return String.format("""
你是一个数据质量专家,负责修复数据中的语义错误。
字段名称:%s
当前值:"%s"
合法的值列表:%s
请判断当前值最可能对应合法列表中的哪个值,返回JSON格式:
{
"suggested_value": "最匹配的合法值",
"confidence": 0.0到1.0之间的置信度,
"reason": "判断理由"
}
如果无法判断,将suggested_value设为null,confidence设为0。
只返回JSON,不要其他内容。
""",
issue.getFieldName(),
issue.getCurrentValue(),
issue.getValidValues()
);
}
private RepairResult parseRepairResponse(String response, List<String> validValues) {
try {
// 解析 JSON 响应
JsonNode node = objectMapper.readTree(response.trim());
String suggestedValue = node.path("suggested_value").asText(null);
double confidence = node.path("confidence").asDouble(0.0);
String reason = node.path("reason").asText("");
// 验证建议值确实在合法列表中
if (suggestedValue != null && validValues != null
&& !validValues.contains(suggestedValue)) {
// 做模糊匹配兜底
suggestedValue = findClosestMatch(suggestedValue, validValues);
confidence = confidence * 0.8; // 降低置信度
}
return RepairResult.builder()
.suggestedValue(suggestedValue)
.confidence(confidence)
.reason(reason)
.repairStatus(confidence >= 0.8 ? RepairStatus.AUTO_REPAIRED
: confidence >= 0.5 ? RepairStatus.NEED_REVIEW
: RepairStatus.CANNOT_REPAIR)
.build();
} catch (Exception e) {
log.error("解析AI响应失败: {}", response, e);
return RepairResult.failed(null, "解析失败");
}
}
// 当AI建议值不在列表中时,做字符串相似度兜底
private String findClosestMatch(String value, List<String> validValues) {
return validValues.stream()
.max(Comparator.comparingDouble(v ->
calculateSimilarity(value.toLowerCase(), v.toLowerCase())))
.orElse(null);
}
private double calculateSimilarity(String s1, String s2) {
// 简单的 Levenshtein 相似度
int maxLen = Math.max(s1.length(), s2.length());
if (maxLen == 0) return 1.0;
int distance = levenshteinDistance(s1, s2);
return 1.0 - (double) distance / maxLen;
}
private int levenshteinDistance(String s1, String s2) {
int[][] dp = new int[s1.length() + 1][s2.length() + 1];
for (int i = 0; i <= s1.length(); i++) dp[i][0] = i;
for (int j = 0; j <= s2.length(); j++) dp[0][j] = j;
for (int i = 1; i <= s1.length(); i++) {
for (int j = 1; j <= s2.length(); j++) {
dp[i][j] = s1.charAt(i-1) == s2.charAt(j-1) ? dp[i-1][j-1]
: 1 + Math.min(dp[i-1][j-1], Math.min(dp[i-1][j], dp[i][j-1]));
}
}
return dp[s1.length()][s2.length()];
}
}3.3 批量处理流水线
@Service
@Slf4j
public class DataCleaningPipeline {
private final DataQualityDetector detector;
private final RuleBasedRepairService ruleRepairService;
private final AISemanticRepairService aiRepairService;
private final HumanReviewQueue reviewQueue;
private final DataCleaningMetrics metrics;
// 处理配置
private static final double AUTO_REPAIR_THRESHOLD = 0.8;
private static final double REVIEW_THRESHOLD = 0.5;
private static final int PARALLEL_WORKERS = 8;
public CleaningReport processDataset(Dataset dataset, SchemaDefinition schema) {
log.info("开始数据清洗,总记录数: {}", dataset.getSize());
CleaningReport.Builder reportBuilder = CleaningReport.builder()
.startTime(Instant.now())
.totalRecords(dataset.getSize());
AtomicInteger processed = new AtomicInteger(0);
AtomicInteger repaired = new AtomicInteger(0);
AtomicInteger needsReview = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
// 并行处理,利用多核
ExecutorService executor = Executors.newFixedThreadPool(PARALLEL_WORKERS);
List<CompletableFuture<Void>> futures = new ArrayList<>();
// 批量迭代数据集
for (List<DataRecord> batch : dataset.batches(500)) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
for (DataRecord record : batch) {
try {
CleaningResult result = processRecord(record, schema);
processed.incrementAndGet();
if (result.isRepaired()) repaired.incrementAndGet();
if (result.needsReview()) needsReview.incrementAndGet();
// 每1000条打一次日志
if (processed.get() % 1000 == 0) {
log.info("已处理 {} / {} 条", processed.get(), dataset.getSize());
}
} catch (Exception e) {
log.error("处理记录失败: {}", record.getId(), e);
failed.incrementAndGet();
}
}
}, executor);
futures.add(future);
}
// 等待所有批次完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executor.shutdown();
return reportBuilder
.endTime(Instant.now())
.processedRecords(processed.get())
.repairedRecords(repaired.get())
.needsReviewRecords(needsReview.get())
.failedRecords(failed.get())
.build();
}
private CleaningResult processRecord(DataRecord record, SchemaDefinition schema) {
// 1. 检测质量问题
List<DataQualityIssue> issues = detector.detect(record, schema);
if (issues.isEmpty()) {
return CleaningResult.clean(record);
}
CleaningResult.Builder resultBuilder = CleaningResult.builder().originalRecord(record);
DataRecord repairedRecord = record.copy();
for (DataQualityIssue issue : issues) {
switch (issue.getIssueType()) {
case FORMAT_ERROR:
// 规则引擎处理格式错误
RepairResult ruleRepair = ruleRepairService.repair(issue);
if (ruleRepair.isSuccess()) {
repairedRecord.set(issue.getFieldName(), ruleRepair.getSuggestedValue());
metrics.incrementRuleRepair();
}
break;
case SEMANTIC_MISMATCH:
// AI 处理语义歧义
RepairResult aiRepair = aiRepairService.repairSemanticMismatch(issue);
if (aiRepair.getConfidence() >= AUTO_REPAIR_THRESHOLD) {
repairedRecord.set(issue.getFieldName(), aiRepair.getSuggestedValue());
resultBuilder.repaired(true);
metrics.incrementAIAutoRepair();
} else if (aiRepair.getConfidence() >= REVIEW_THRESHOLD) {
reviewQueue.submit(ReviewItem.builder()
.record(record)
.issue(issue)
.aiSuggestion(aiRepair)
.build());
resultBuilder.needsReview(true);
metrics.incrementNeedsReview();
} else {
resultBuilder.unrepairable(issue);
metrics.incrementUnrepairable();
}
break;
case MISSING_VALUE:
// 缺失值:尝试从上下文推断
RepairResult inferredRepair = inferMissingValue(record, issue, schema);
if (inferredRepair.isSuccess()
&& inferredRepair.getConfidence() >= AUTO_REPAIR_THRESHOLD) {
repairedRecord.set(issue.getFieldName(), inferredRepair.getSuggestedValue());
resultBuilder.repaired(true);
}
break;
default:
resultBuilder.unrepairable(issue);
}
}
return resultBuilder.repairedRecord(repairedRecord).build();
}
private RepairResult inferMissingValue(DataRecord record, DataQualityIssue issue,
SchemaDefinition schema) {
// 先看有没有统计默认值
FieldDefinition field = schema.getField(issue.getFieldName());
if (field.getDefaultValue() != null) {
return RepairResult.withConfidence(field.getDefaultValue(), 0.9, "使用字段默认值");
}
// 再用 AI 从上下文推断
String prompt = String.format("""
根据以下记录的其他字段,推断 "%s" 字段最可能的值:
记录内容: %s
字段说明: %s
返回JSON: {"value": "推断值", "confidence": 0-1, "reason": "理由"}
如果无法推断,confidence返回0。
""",
issue.getFieldName(),
record.toJsonString(),
field.getDescription()
);
String response = chatClient.call(prompt);
return parseRepairResponse(response, field.getEnumValues());
}
}四、效果评估与上线策略
不要直接在生产数据上做自动修复。正确的上线策略是:
- 先在历史数据的 1% 样本上跑一遍,人工抽查修复结果
- 确认修复准确率 > 95% 后,扩展到 10%
- 全量上线时,保留原始值备份,修复结果写入新字段
- 运行一个月后,由业务方确认修复质量,再决定是否覆盖原始字段
我们项目的最终结果:
- 格式类问题:规则引擎解决了 100%
- 语义歧义:AI 自动修复了 78%,推人工审核 15%,无法修复 7%
- 缺失值:AI 推断补全了 45%(置信度 > 0.8 的部分)
- 整体数据质量评分从 61 分提升到 89 分
最重要的是,原来需要两个月的数据清洗工作,现在在周末跑一个晚上就完成了。
