ETL 流程智能化——数据管道里的 AI 改造
ETL 流程智能化——数据管道里的 AI 改造
适读人群:做数据仓库或数据管道开发的工程师 | 阅读时长:约 13 分钟 | 核心价值:了解 ETL 流程哪些环节 AI 真的省时间,哪些不值得,避免无效投入
去年我们做了一个数据仓库项目,数据源有 15 个,格式五花八门:有结构化的关系型数据库、半结构化的 JSON 日志、还有几个第三方系统吐出来的 Excel 文件。
项目启动前,技术负责人说:"我们要做 AI 驱动的 ETL,让 AI 自动完成数据映射和清洗。"
我当时保持沉默,心里有些担忧——因为我大概知道哪些能做,哪些是画饼。
做完之后的总结:AI 在某些环节确实帮我们省了大量时间,但也有几个环节是我们走弯路浪费的。
这篇文章写实战结论,不写理论。
环节 1:字段映射(AI 真的好用)
这是整个项目里 AI 帮我们节省时间最多的环节。
问题背景:我们需要把 15 个数据源的字段映射到统一的数仓 Schema。每个数据源的字段命名都不一样:
数据源 A: user_id, user_name, reg_date, last_login_time
数据源 B: userId, userName, registerDate, lastLoginAt
数据源 C: 用户ID, 姓名, 注册日期, 最后登录
数据源 D: member_no, member_name, create_dt, active_dt目标 Schema: user_id, display_name, registration_date, last_active_time
传统做法:DBA 或数据工程师手动对照,写映射文档,然后写 ETL 代码。15 个数据源,这个过程大概要 3-4 天。
AI 做法:
import json
from openai import OpenAI
def generate_field_mapping(
source_fields: list[dict], # [{"name": "user_id", "type": "int", "sample_values": [1, 2, 3]}]
target_schema: list[dict], # [{"name": "user_id", "type": "bigint", "description": "用户唯一标识"}]
source_name: str
) -> dict:
"""
自动生成字段映射关系
"""
client = OpenAI()
prompt = f"""你是一个数据工程师,需要为数据源 "{source_name}" 生成字段映射。
目标 Schema(数据仓库标准字段):
{json.dumps(target_schema, ensure_ascii=False, indent=2)}
数据源字段:
{json.dumps(source_fields, ensure_ascii=False, indent=2)}
请生成字段映射关系。对于每个源字段:
1. 如果能映射到目标字段,给出映射和置信度(0-1)
2. 如果需要类型转换,说明转换方式
3. 如果源字段没有对应的目标字段,标记为 UNMAPPED
4. 如果需要多个源字段合并才能得到目标字段,说明合并逻辑
返回 JSON:
{{
"mappings": [
{{
"source_field": "源字段名",
"target_field": "目标字段名或 UNMAPPED",
"confidence": 0.95,
"transform": "转换说明,如不需要则为 null",
"notes": "备注"
}}
],
"unmapped_targets": ["无法从源数据获取的目标字段列表"],
"confidence_summary": "整体映射置信度评估"
}}"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0.1
)
return json.loads(response.choices[0].message.content)实际效果:AI 对 15 个数据源的字段映射,准确率大概是 85%。我们只需要 review 置信度低于 0.8 的映射(大约占 20%),其余直接采纳。总共花了不到 1 天,比手动快了 3 天。
环节 2:清洗规则生成(AI 好用但需要验证)
有了字段映射之后,还需要数据清洗规则。比如日期字段,不同源系统的格式不同:
数据源 A: "2024-01-15" (ISO 格式)
数据源 B: "15/01/2024" (欧式格式)
数据源 C: "20240115" (紧凑格式)
数据源 D: "2024年1月15日" (中文格式)
数据源 E: 1705276800 (Unix 时间戳)手动写这些转换规则很枯燥。我让 AI 看了一批真实样本数据,让它生成清洗代码:
def generate_cleaning_code(
field_name: str,
sample_values: list,
target_format: str
) -> str:
"""
根据样本数据生成字段清洗代码
"""
client = OpenAI()
prompt = f"""生成 Python 代码,把字段 "{field_name}" 的值清洗并转换为目标格式。
目标格式:{target_format}
样本数据(实际出现的值,包括各种格式和异常值):
{sample_values}
要求:
1. 生成一个名为 clean_{field_name}(value) 的函数
2. 处理样本中出现的所有格式变体
3. 对于无法解析的值,返回 None 并记录日志
4. 代码要实际可运行,不要使用不存在的库
5. 只返回代码,不要解释"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.1
)
code = response.choices[0].message.content
# 去掉代码块标记
code = code.replace("```python", "").replace("```", "").strip()
return code这个方案有个坑:AI 生成的代码不能直接用,必须测试。我们有一次 AI 生成了处理中文日期的代码,用了 lunar_calendar 库(不存在的库名),或者处理时区的逻辑有细微错误。
现在的做法是:AI 生成代码 → 跑单元测试(用样本数据)→ 通过测试才纳入正式流程。
环节 3:异常数据修复建议(有限价值)
这是一个我们花了时间但最终效果一般的环节。
我们想让 AI 给出异常数据的"修复建议"——比如手机号少了一位,AI 能不能推断出正确的?
实践下来:大多数情况下,AI 给不出有价值的修复建议。
原因很简单:AI 没有你的业务上下文。它不知道手机号 "1381234567" 到底是 "13812345678" 还是 "13812345679" 少了哪一位。
AI 能做的是:
- 识别出异常("这个值不像是有效手机号")
- 给出修复思路("建议从原始数据源重新拉取")
不能做的是:
- 猜测正确值(除非有非常明确的规律,比如身份证校验码)
我建议:异常数据修复还是走人工审核流程,AI 只负责识别和分类,不要期望它能自动修复。
环节 4:ETL 调度逻辑生成(不值得)
这是我们走弯路最多的环节。
技术负责人想用 AI 来自动生成 Airflow DAG 的调度逻辑。想法是:描述数据依赖关系,AI 自动生成 DAG 代码。
我们花了大约 2 周时间在这上面,最终结论是:不值得。
原因:
- ETL 调度逻辑强依赖业务上下文(哪张表依赖哪张表,什么时候刷新),这些知识存在人脑里,很难用自然语言完整描述
- AI 生成的 DAG 代码质量参差不齐,错误率高,调试成本比手写还高
- Airflow DAG 本身就是 Python 代码,结构固定,手写也不复杂
结论:调度逻辑还是手写,但可以用 AI 做 code review,检查是否有依赖关系缺失或循环依赖。
整体总结
做了这个项目之后,我对 ETL 里 AI 的价值有了清晰的认知:
| ETL 环节 | AI 价值 | 时间节省 | 建议 |
|---|---|---|---|
| 字段映射 | 高 | 60-70% | 重点投入,真实有效 |
| 格式转换代码生成 | 中 | 40-50% | 用 AI 生成初稿,必须测试 |
| 数据质量规则建议 | 中 | 30-40% | 辅助,人工最终决策 |
| 异常数据修复 | 低 | 10% | 只做识别,不做修复 |
| ETL 调度逻辑 | 极低 | -20%(反而浪费时间) | 不推荐 |
AI 在 ETL 里的核心价值是处理命名歧义和格式多样性——这两类问题规则引擎很难处理好,但 LLM 的语义理解能力天然适合。
其他环节,如果你没有强烈的理由相信 AI 能帮上忙,老老实实手写更快。
