第1743篇:ETL管道的现代化——用Apache Airflow编排AI数据处理流水线
第1743篇:ETL管道的现代化——用Apache Airflow编排AI数据处理流水线
三年前,我们团队的数据处理方式是这样的:一大堆 Python 脚本,用 crontab 调度,出了问题发现不了,失败了没有重试,上下游依赖靠人脑记。有一次凌晨三点,训练数据没跑出来,原因是某个脚本里的数据库连接超时,没人知道,任务静默失败了。
那次之后,我们下决心引入 Apache Airflow,把所有 AI 数据处理流水线统一用 DAG 来编排。这篇文章把我们在这个过程中的实践和踩坑全写出来,特别是 Java 服务和 Airflow 的结合方式。
一、为什么 AI 数据处理流水线需要专门编排
先说说 AI 数据管道和普通 ETL 的区别:
普通 ETL:数据从 A 搬到 B,转换一下格式,基本是线性流程。 AI 数据管道:涉及数据清洗、特征工程、模型预计算、Embedding 生成、向量入库……每个步骤都有可能失败,步骤之间有复杂依赖,不同步骤的执行频率可能不一样(有的每小时,有的每天),还要支持回填历史数据。
用 crontab + 裸脚本来干这件事,迟早翻车。
Airflow 解决的核心问题:
- 依赖管理:Task A 完成后 Task B 才能跑,失败了不继续
- 可视化监控:DAG 图清楚展示每个步骤的状态
- 失败重试:指数退避重试,减少人工干预
- 历史回填:可以对历史时间段重跑,补数据
- 参数化:同一个 DAG 支持不同配置的运行
二、AI 数据流水线的典型 DAG 结构
以一个文本分类模型的训练数据流水线为例:
这个 DAG 里有几个关键设计决策:
- 质量校验设了一个分支,不达标的不继续跑,防止垃圾数据流入训练集
- Embedding 生成和统计特征计算并行执行,缩短总耗时
- 最后一步记录数据版本快照,和上一篇讲的数据版本管理对接
三、Airflow DAG 的实际编写
3.1 基础 DAG 框架
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import json
default_args = {
'owner': 'ai-data-team',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['data-alert@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, # 指数退避重试
'max_retry_delay': timedelta(minutes=30),
}
with DAG(
dag_id='ai_training_data_pipeline',
default_args=default_args,
description='AI训练数据处理流水线',
schedule_interval='0 2 * * *', # 每天凌晨 2 点
catchup=False, # 不回填历史
max_active_runs=1, # 防止并发执行产生冲突
tags=['ai', 'training-data'],
) as dag:
# Task 1: 数据采集
extract_task = PythonOperator(
task_id='extract_raw_data',
python_callable=extract_raw_data,
op_kwargs={
'source_db': 'prod_mysql',
'batch_date': '{{ ds }}', # Airflow 模板变量,当前执行日期
},
)
# Task 2: 数据清洗
clean_task = PythonOperator(
task_id='clean_data',
python_callable=clean_data,
op_kwargs={'batch_date': '{{ ds }}'},
)
# Task 3: 质量校验(带分支)
quality_check_task = BranchPythonOperator(
task_id='quality_check',
python_callable=check_data_quality,
op_kwargs={'batch_date': '{{ ds }}', 'min_quality_score': 0.85},
)
# Task 4a: 质量通过 -> 特征工程
feature_engineering_task = PythonOperator(
task_id='feature_engineering',
python_callable=run_feature_engineering,
op_kwargs={'batch_date': '{{ ds }}'},
)
# Task 4b: 质量不通过 -> 发告警
quality_alert_task = PythonOperator(
task_id='quality_alert',
python_callable=send_quality_alert,
)
# Task 5a 和 5b 并行:Embedding 生成 + 统计特征
embedding_task = PythonOperator(
task_id='generate_embeddings',
python_callable=generate_embeddings,
op_kwargs={'batch_date': '{{ ds }}', 'batch_size': 100},
pool='embedding_api_pool', # 限制并发,防止 API 限流
)
stats_feature_task = PythonOperator(
task_id='compute_stats_features',
python_callable=compute_stats_features,
op_kwargs={'batch_date': '{{ ds }}'},
)
# Task 6: 合并特征
merge_task = PythonOperator(
task_id='merge_features',
python_callable=merge_features,
op_kwargs={'batch_date': '{{ ds }}'},
)
# Task 7: 数据版本快照(调用 Java 服务 API)
snapshot_task = SimpleHttpOperator(
task_id='create_data_version_snapshot',
http_conn_id='data_service_api',
endpoint='/api/v1/data-version/snapshot',
method='POST',
data=json.dumps({
'datasetName': 'training_data_v2',
'batchDate': '{{ ds }}',
'changeDescription': 'daily_pipeline_{{ ds }}'
}),
headers={'Content-Type': 'application/json'},
response_check=lambda response: response.json()['success'],
)
# 设置依赖关系
extract_task >> clean_task >> quality_check_task
quality_check_task >> [feature_engineering_task, quality_alert_task]
feature_engineering_task >> [embedding_task, stats_feature_task]
[embedding_task, stats_feature_task] >> merge_task >> snapshot_task3.2 关键 Task 的实现
def extract_raw_data(batch_date: str, source_db: str, **context):
"""从数据库抽取原始数据,写入临时存储"""
from airflow.hooks.mysql_hook import MySqlHook
import pandas as pd
hook = MySqlHook(mysql_conn_id=source_db)
# 增量抽取:只取当天新增数据
query = f"""
SELECT id, content, label, created_at
FROM training_samples
WHERE DATE(created_at) = '{batch_date}'
AND status = 'ACTIVE'
LIMIT 100000
"""
df = hook.get_pandas_df(query)
if df.empty:
raise ValueError(f"批次 {batch_date} 无数据,任务终止")
# 写入临时位置(可以是 S3、本地文件系统或数据库临时表)
output_path = f"/tmp/raw_data_{batch_date}.parquet"
df.to_parquet(output_path, index=False)
# 通过 XCom 传递路径给下游 Task
context['task_instance'].xcom_push('output_path', output_path)
context['task_instance'].xcom_push('record_count', len(df))
print(f"抽取完成,共 {len(df)} 条记录 -> {output_path}")
def check_data_quality(batch_date: str, min_quality_score: float, **context):
"""质量校验,返回下游分支 task_id"""
import pandas as pd
raw_path = context['task_instance'].xcom_pull(
task_ids='clean_data', key='output_path')
df = pd.read_parquet(raw_path)
# 计算质量分(简化版)
total = len(df)
if total == 0:
return 'quality_alert'
# 标签缺失率
label_missing_rate = df['label'].isna().sum() / total
# 内容过短比例
short_content_rate = (df['content'].str.len() < 10).sum() / total
# 综合质量分
quality_score = 1.0 - label_missing_rate * 0.5 - short_content_rate * 0.5
print(f"质量分: {quality_score:.3f}, 阈值: {min_quality_score}")
if quality_score >= min_quality_score:
return 'feature_engineering' # 返回下游 task_id
else:
return 'quality_alert'
def generate_embeddings(batch_date: str, batch_size: int, **context):
"""调用 Embedding API,批量生成向量"""
import pandas as pd
import requests
import time
input_path = context['task_instance'].xcom_pull(
task_ids='feature_engineering', key='output_path')
df = pd.read_parquet(input_path)
embeddings = []
errors = 0
for i in range(0, len(df), batch_size):
batch = df['content'].iloc[i:i + batch_size].tolist()
for attempt in range(3):
try:
resp = requests.post(
'https://api.openai.com/v1/embeddings',
json={'input': batch, 'model': 'text-embedding-3-small'},
headers={'Authorization': f'Bearer {get_api_key()}'},
timeout=30
)
resp.raise_for_status()
batch_embeddings = [item['embedding']
for item in resp.json()['data']]
embeddings.extend(batch_embeddings)
break
except Exception as e:
if attempt == 2:
# 三次都失败,用零向量占位并记录
embeddings.extend([[0.0] * 1536] * len(batch))
errors += len(batch)
print(f"Embedding 失败,批次 {i}: {e}")
else:
time.sleep(2 ** attempt) # 指数退避
# 限速:每秒最多 10 个请求
time.sleep(0.1)
df['embedding'] = embeddings
output_path = f"/tmp/embeddings_{batch_date}.parquet"
df[['id', 'embedding']].to_parquet(output_path, index=False)
context['task_instance'].xcom_push('output_path', output_path)
context['task_instance'].xcom_push('error_count', errors)
if errors > len(df) * 0.05: # 错误率超 5% 告警
raise ValueError(f"Embedding 错误率过高: {errors}/{len(df)}")四、Java 服务与 Airflow 的集成
在我们的架构里,Airflow 负责编排,Java 服务负责业务逻辑(特别是那些对性能要求高的部分)。两者的集成方式主要有两种:
4.1 HTTP 调用(SimpleHttpOperator)
适合调用已有的 Java RESTful API。上面的 snapshot_task 就是这种方式。
关键是要在 Java 服务里设计幂等接口——Airflow 可能会因为重试多次调用同一个接口,Java 端必须保证重复调用不产生副作用。
@RestController
@RequestMapping("/api/v1/data-version")
public class DataVersionController {
@Autowired
private DataVersionService versionService;
@PostMapping("/snapshot")
public ResponseEntity<ApiResponse> createSnapshot(
@RequestBody SnapshotRequest request) {
// 幂等键:datasetName + batchDate 组合
String idempotencyKey = request.getDatasetName() + "_" + request.getBatchDate();
// 检查是否已经创建过
Optional<DataVersion> existing = versionService
.findByIdempotencyKey(idempotencyKey);
if (existing.isPresent()) {
log.info("快照已存在,跳过重复创建: {}", idempotencyKey);
return ResponseEntity.ok(ApiResponse.success(existing.get()));
}
// 从存储读取当天的数据文件,创建版本快照
DataVersion version = versionService.createSnapshotFromStorage(
request.getDatasetName(),
request.getBatchDate(),
request.getChangeDescription(),
idempotencyKey
);
return ResponseEntity.ok(ApiResponse.success(version));
}
}4.2 通过消息队列解耦
对于耗时较长的 Java 任务,用消息队列(Kafka/RocketMQ)解耦更合适:
@Service
public class DataProcessingConsumer {
@KafkaListener(topics = "ai-data-processing-tasks",
groupId = "data-processing-group")
public void processTask(DataProcessingTask task) {
String correlationId = task.getCorrelationId();
try {
// 更新状态为处理中
taskStatusRepository.updateStatus(correlationId, TaskStatus.PROCESSING);
// 执行实际处理逻辑
ProcessingResult result = doProcess(task);
// 写入结果
taskStatusRepository.updateResult(
correlationId, TaskStatus.COMPLETED, result);
} catch (Exception e) {
log.error("处理任务失败: {}", correlationId, e);
taskStatusRepository.updateStatus(
correlationId, TaskStatus.FAILED, e.getMessage());
}
}
private ProcessingResult doProcess(DataProcessingTask task) {
// 具体的特征工程、数据处理逻辑
// ...
return new ProcessingResult();
}
}对应的 Airflow Python Task:
def trigger_java_processing(task_type: str, payload: dict, **context):
"""触发 Java 服务处理,等待完成"""
import requests
import time
import uuid
correlation_id = str(uuid.uuid4())
# 发送任务消息(通过 Kafka REST Proxy 或直接 HTTP)
requests.post(
'http://java-service/api/v1/tasks/submit',
json={**payload, 'correlationId': correlation_id, 'taskType': task_type},
timeout=10
)
# 轮询等待完成(最多 10 分钟)
for i in range(60):
time.sleep(10)
status_resp = requests.get(
f'http://java-service/api/v1/tasks/{correlation_id}/status')
status = status_resp.json()['status']
if status == 'COMPLETED':
return status_resp.json()['result']
elif status == 'FAILED':
raise Exception(f"Java 任务失败: {status_resp.json()['errorMsg']}")
raise TimeoutError(f"Java 任务超时: {correlation_id}")五、监控与告警设计
5.1 SLA 监控
Airflow 内置了 SLA(Service Level Agreement)机制,某个 Task 超时会自动告警:
with DAG(
dag_id='ai_training_data_pipeline',
sla_miss_callback=sla_miss_callback, # 超时回调
...
) as dag:
embedding_task = PythonOperator(
task_id='generate_embeddings',
python_callable=generate_embeddings,
sla=timedelta(hours=2), # 这个任务必须在 2 小时内完成
...
)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""SLA 超时告警"""
message = f"DAG {dag.dag_id} SLA 超时:\n"
for sla in slas:
message += f" - Task: {sla.task_id}, 执行日期: {sla.execution_date}\n"
send_dingtalk_alert(message)5.2 数据量异常检测
每次运行后自动检测数据量是否在正常范围内,避免静默失败:
def validate_output_volume(task_id: str, expected_min: int,
expected_max: int, **context):
"""校验 Task 输出数据量是否在合理范围"""
record_count = context['task_instance'].xcom_pull(
task_ids=task_id, key='record_count')
if record_count is None:
raise ValueError(f"Task {task_id} 未上报数据量")
if record_count < expected_min:
raise ValueError(
f"数据量异常偏少: {record_count} < {expected_min},"
f"可能数据采集失败")
if record_count > expected_max:
# 数据量异常偏多,不一定是错误,但需要告警
send_dingtalk_alert(
f"数据量异常偏多: {record_count} > {expected_max},请检查")
print(f"数据量校验通过: {record_count}")六、几个实际踩过的坑
坑一:XCom 不适合传大数据
XCom 是 Airflow 的 Task 间数据传递机制,底层存在数据库里。我们最早直接把 DataFrame 序列化后放到 XCom 里传,结果 Airflow 的元数据库被撑爆了。正确做法是 XCom 只传文件路径或任务 ID,实际数据写到外部存储(S3、HDFS 或本地临时文件)。
坑二:不设 max_active_runs 导致并发冲突
某次上游任务积压,Airflow 补跑历史任务时同时跑了 5 个 DAG 实例,几个实例在写同一个临时文件,数据互相覆盖。加上 max_active_runs=1 就解决了,但要接受补跑会变慢。
坑三:任务超时没有处理清理逻辑
Embedding 生成任务超时被 Airflow 强杀,但已经写了一半的中间文件还在磁盘上。下次重试读到这个损坏的文件,直接报错。后来加了启动时的文件完整性检查,有问题的文件直接删掉重新生成。
坑四:Airflow 调度器不是实时的
Airflow 的调度器默认每分钟检查一次,所以触发时间可能有 1-2 分钟的误差。对于对时效性要求非常高的数据管道,要么调整调度器心跳频率,要么换用 Celery Executor 并配合外部触发。
# 外部触发(比如上游系统数据就绪后主动触发)
# 通过 Airflow REST API
import requests
requests.post(
'http://airflow-server:8080/api/v1/dags/ai_training_data_pipeline/dagRuns',
json={
'conf': {'batch_date': '2024-03-15', 'trigger_reason': 'upstream_ready'},
'logical_date': '2024-03-15T02:00:00Z',
},
auth=('admin', 'password')
)七、小结
Airflow 不是银弹,引入它也带来了新的复杂度:需要维护调度器、学习 DAG 编写规范、处理各种调度边界情况。但对于 AI 数据管道这种多步骤、有依赖、需要监控的场景,它带来的工程收益远远超过维护成本。
关键是别把所有业务逻辑都塞进 Airflow Task 里,Airflow 做的是编排,计算密集型和业务复杂的逻辑交给专门的服务(Java、Spark、Flink)来处理。
