数据质量 AI 评估——让 AI 做数据的守门人
数据质量 AI 评估——让 AI 做数据的守门人
适读人群:负责数据管道或 ETL 开发的工程师 | 阅读时长:约 12 分钟 | 核心价值:掌握在数据管道里加 AI 数据质量检查的工程方案,发现规则引擎漏掉的问题类型
去年我在做一个电商数据仓库项目,有个 ETL 管道每天从十几个数据源同步数据进来。
项目里有一套规则引擎做数据质量检查:非空校验、格式校验、范围校验、唯一性校验,几十条规则,看起来挺完善的。
直到有一天,分析师过来问我:"老张,为什么这个用户的年龄是 1024?"
我去查了一下,那条数据确实通过了所有规则——年龄字段是 integer 类型,1024 是有效的整数,范围校验设的是 0 到 65535(当时认为"大于 0 就行"),非空也满足。
问题在于,这是一个语义上不合理的值,而规则引擎处理的是格式和范围,不处理语义。
这次问题让我开始研究怎么把 AI 加进数据质量检查流程。这篇文章写的就是那之后的工程实践。
规则引擎 vs AI 检查:各自擅长什么
明确这一点很重要,否则你会做出一个设计上混乱的系统。
规则引擎擅长:
- 格式验证(日期格式、邮箱格式、手机号格式)
- 非空/非零校验
- 精确的范围约束(金额不能为负数)
- 跨字段一致性(结束时间必须晚于开始时间)
- 引用完整性(外键是否存在)
AI 擅长:
- 语义合理性(年龄 1024 不合理,即使格式正确)
- 异常值检测(这个值在历史分布里极端异常)
- 模糊的格式问题(地址字段里混进了手机号)
- 多字段语义一致性(职业是"学生"但月收入是 50 万)
- 枚举值的语义错误("北京" 和 "beijing" 和 "BJ" 应该是同一个城市)
两者是互补关系,不是替代关系。
工程方案设计
我的设计是在规则引擎之后加一个 AI 检查层,规则引擎通过的数据再走 AI 检查。
数据源
|
v
[格式校验层] -- 规则引擎 -- 不通过 -> 错误日志
|
v
[AI 语义检查层] -- 不通过 -> 异常队列(人工审核)
|
v
[数据仓库]关键设计决策:AI 检查层的结果不直接丢弃数据,而是发送到人工审核队列。因为 AI 会有误报,直接丢弃可能导致数据丢失。
核心实现
我把 AI 检查拆成三种类型,分别实现。
类型 1:单条记录的语义检查
import json
from openai import OpenAI
from dataclasses import dataclass
from typing import Optional
@dataclass
class DataQualityIssue:
field: str
value: str
issue_type: str
description: str
severity: str # HIGH / MEDIUM / LOW
def check_record_semantics(record: dict, schema_description: str) -> list[DataQualityIssue]:
"""
对单条记录做语义合理性检查
"""
client = OpenAI()
prompt = f"""你是一个数据质量检查系统。请分析以下数据记录是否存在语义异常。
数据表结构说明:
{schema_description}
待检查的数据记录:
{json.dumps(record, ensure_ascii=False, indent=2)}
请检查以下几类问题:
1. 字段值语义不合理(如年龄 = 1024,身高 = 0.1cm)
2. 字段间语义矛盾(如职业=学生,月收入=50万)
3. 明显的测试数据(如姓名="test",手机号="13800138000")
4. 格式虽然正确但内容错误(如地址字段填写了手机号)
返回 JSON 格式,结构如下:
{{
"has_issues": true/false,
"issues": [
{{
"field": "字段名",
"value": "问题值",
"issue_type": "SEMANTIC_INVALID|FIELD_CONFLICT|TEST_DATA|FORMAT_CONTENT_MISMATCH",
"description": "问题描述",
"severity": "HIGH|MEDIUM|LOW"
}}
]
}}
如果没有问题,返回 {{"has_issues": false, "issues": []}}
只报告明确的问题,不要为了找问题而报告模糊的猜测。"""
response = client.chat.completions.create(
model="gpt-4o-mini", # 单条检查用小模型降低成本
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0.1
)
result = json.loads(response.choices[0].message.content)
if not result.get("has_issues"):
return []
return [
DataQualityIssue(
field=issue["field"],
value=str(issue["value"]),
issue_type=issue["issue_type"],
description=issue["description"],
severity=issue["severity"]
)
for issue in result.get("issues", [])
]类型 2:批量异常值检测
单条检查成本太高,对于数值型字段,可以先做批量的统计异常检测:
import numpy as np
from scipy import stats
def detect_statistical_outliers(values: list[float], field_name: str) -> list[dict]:
"""
基于统计方法检测数值异常,然后用 AI 解释
"""
if len(values) < 10:
return []
arr = np.array(values)
# Z-score 方法
z_scores = np.abs(stats.zscore(arr))
outlier_indices = np.where(z_scores > 3)[0]
# IQR 方法
q1, q3 = np.percentile(arr, [25, 75])
iqr = q3 - q1
iqr_outliers = np.where((arr < q1 - 3 * iqr) | (arr > q3 + 3 * iqr))[0]
# 两种方法都判断为异常的
confirmed_outliers = set(outlier_indices) & set(iqr_outliers)
if not confirmed_outliers:
return []
outlier_values = [arr[i] for i in confirmed_outliers]
# 用 AI 给出解释
client = OpenAI()
prompt = f"""字段 "{field_name}" 的数据统计信息:
- 总数:{len(values)} 条
- 均值:{np.mean(arr):.2f}
- 标准差:{np.std(arr):.2f}
- 最小值:{np.min(arr):.2f}
- 最大值:{np.max(arr):.2f}
- 25%分位:{q1:.2f}
- 75%分位:{q3:.2f}
以下值被统计方法判断为异常值:{outlier_values}
请评估这些异常值是否真的有问题,还是可能是合理的极端值。
给出简短的业务解读。返回 JSON:
{{"is_real_issue": true/false, "explanation": "解释"}}"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0.1
)
result = json.loads(response.choices[0].message.content)
if not result.get("is_real_issue"):
return []
return [{
"field": field_name,
"outlier_values": outlier_values,
"explanation": result["explanation"]
}]类型 3:枚举值归一化检查
这是我觉得 AI 效果最好的场景:
def check_enum_consistency(values: list[str], field_name: str, expected_values: list[str]) -> dict:
"""
检查枚举字段的语义一致性
比如:"北京", "beijing", "BJ", "北京市" 应该是同一个值
"""
unique_values = list(set(values))
# 如果独特值太多,只取高频的
from collections import Counter
counter = Counter(values)
top_values = [v for v, _ in counter.most_common(50)]
client = OpenAI()
prompt = f"""字段 "{field_name}" 的期望枚举值为:
{expected_values}
实际数据中出现的值(取频率最高的50个):
{top_values}
请找出:
1. 哪些值是同一个概念的不同写法(需要归一化)
2. 哪些值不在预期范围内且无法映射到任何预期值(真正的脏数据)
返回 JSON:
{{
"normalization_map": {{"原始值": "标准值", ...}},
"invalid_values": ["无效值1", "无效值2", ...]
}}"""
response = client.chat.completions.create(
model="gpt-4o", # 这个任务需要更强的语义理解能力
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0.1
)
return json.loads(response.choices[0].message.content)实际效果
在那个 ETL 项目里,加了 AI 检查层之后,一个月内发现的问题:
- 语义异常值:年龄 > 120 或 < 0 的(规则引擎漏掉了,因为上限设得太高)
- 测试数据混入:姓名为 "test"、"测试用户"、"admin" 的记录,有 43 条混进了生产数据
- 字段内容错误:地址字段里有 18 条实际上填的是手机号或身份证号
- 枚举归一化:城市字段里 "上海" 和 "上海市" 被统一映射
- 职业-收入不一致:发现了 12 条"学生"但收入超过 30 万的记录,全部是测试数据
其中最有价值的是测试数据混入那条——这类问题规则引擎完全检不出来,但 AI 通过语义理解("test" 显然不是真实姓名)能轻松识别。
成本控制
全量跑 AI 检查成本会比较高。我的实践是:
- 枚举一致性检查:每周跑一次,而不是每条数据都跑
- 语义检查:只对新增或修改的记录跑,已验证过的历史数据不重复跑
- 统计异常检测:先用 Python 的 scipy 做统计筛选,只把统计上的异常值再喂给 AI 判断
这样每天的 AI 调用量从"全量数据量级"降低到"异常数据量级",成本降了两个数量级。
AI 做数据质量检查,核心价值不是替代规则引擎,而是把数据质量的检查维度从"格式+范围"扩展到"语义+常识"。这个扩展,是规则引擎做不到的。
