Python ETL 管道实战——Prefect/Airflow 数据工作流构建完整方案
Python ETL 管道实战——Prefect/Airflow 数据工作流构建完整方案
适读人群:需要构建自动化数据管道的工程师 | 阅读时长:约18分钟 | 核心价值:学会用 Prefect 和 Airflow 构建可靠的 ETL 管道,解决任务依赖、失败重试、监控告警等生产痛点
小吴在一家金融科技公司做数据工程,他们有个每天凌晨跑的数据任务:从几个业务数据库里抽取数据,做清洗和转换,最后写入数仓。
最开始他用 cron + Python 脚本搞定的,能跑。但问题慢慢多了:某天某个数据库连接超时,整个任务失败了,但没有任何告警,直到第二天早上开会他才发现报表数据是错的。然后他手动重跑,但不知道哪步成功了哪步没成功,只能把整个流程重跑一遍,浪费了两个小时。
更糟的是,任务越来越多,各种依赖关系缠在一起,几十个脚本相互依赖,一个人离职了,别人根本看不懂哪个先跑哪个后跑。
他找到我说:"老张,感觉数据管道越来越不可控,有什么好的方案?"
我说:你现在的问题有个专有名词叫"数据管道失控",解决方案是用一个专门的工作流编排工具。
一、两个主流方案对比
Prefect:
- 定位:现代化的数据流编排平台,Python 优先设计
- 特点:代码即配置,装饰器风格,上手容易,本地和云都能跑
- 适合:中小团队,Python 项目为主,快速迭代
Apache Airflow:
- 定位:企业级数据编排平台,生态最成熟
- 特点:DAG 有向无环图,Web UI 强大,插件丰富
- 适合:大型团队,多语言数据任务,对稳定性要求高
我的建议:中小项目用 Prefect,大型企业项目用 Airflow。Prefect 的开发体验更好,Airflow 的运维工具链更完整。
二、Prefect 实战——构建 ETL 管道
2.1 安装与基础概念
pip install prefect pandas sqlalchemy psycopg2-binaryPrefect 核心概念:
- Task:最小执行单元(用
@task装饰器) - Flow:任务的有向无环图(用
@flow装饰器) - Deployment:Flow 的运行计划(类似 cron job)
2.2 完整的 ETL Pipeline
import os
import pandas as pd
import sqlalchemy as sa
from datetime import datetime, timedelta
from prefect import task, flow, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.notifications import SlackWebhook
# =====================
# Extract(抽取)层
# =====================
@task(
name="抽取订单数据",
retries=3, # 失败自动重试3次
retry_delay_seconds=30, # 每次重试间隔30秒
cache_key_fn=task_input_hash, # 相同输入缓存结果(避免重复抽取)
cache_expiration=timedelta(hours=1) # 缓存1小时
)
def extract_orders(start_date: str, end_date: str) -> pd.DataFrame:
"""从订单数据库抽取指定日期范围的订单"""
logger = get_run_logger()
logger.info(f"抽取订单数据:{start_date} 到 {end_date}")
engine = sa.create_engine(os.environ.get("ORDER_DB_URL"))
query = """
SELECT
order_id, user_id, product_id, amount, status,
created_at, updated_at
FROM orders
WHERE DATE(created_at) BETWEEN :start_date AND :end_date
AND status != 'deleted'
"""
df = pd.read_sql(
query,
engine,
params={"start_date": start_date, "end_date": end_date}
)
logger.info(f"抽取完成,共 {len(df):,} 行")
return df
@task(name="抽取用户数据", retries=3, retry_delay_seconds=30)
def extract_users() -> pd.DataFrame:
"""从用户数据库抽取活跃用户信息"""
logger = get_run_logger()
engine = sa.create_engine(os.environ.get("USER_DB_URL"))
df = pd.read_sql(
"SELECT user_id, username, city, register_date, user_level FROM users WHERE is_active = 1",
engine
)
logger.info(f"抽取用户数据:{len(df):,} 行")
return df
@task(name="抽取商品数据", retries=2)
def extract_products() -> pd.DataFrame:
"""抽取商品信息"""
engine = sa.create_engine(os.environ.get("PRODUCT_DB_URL"))
df = pd.read_sql("SELECT product_id, product_name, category, cost_price FROM products", engine)
return df
# =====================
# Transform(转换)层
# =====================
@task(name="数据清洗与合并", log_prints=True)
def transform(
orders: pd.DataFrame,
users: pd.DataFrame,
products: pd.DataFrame
) -> pd.DataFrame:
"""合并三张表,计算指标,处理异常数据"""
logger = get_run_logger()
initial_count = len(orders)
# 1. 过滤异常订单
orders = orders[orders["amount"] > 0]
orders = orders[orders["amount"] < 1_000_000] # 过滤明显异常的超大金额
removed = initial_count - len(orders)
if removed > 0:
logger.warning(f"过滤了 {removed} 条异常订单")
# 2. 合并用户和商品信息
df = orders.merge(users, on="user_id", how="left")
df = df.merge(products, on="product_id", how="left")
# 3. 填充空值(商品下架等情况)
df["category"] = df["category"].fillna("未知分类")
df["city"] = df["city"].fillna("未知城市")
df["user_level"] = df["user_level"].fillna("普通用户")
# 4. 计算衍生指标
df["profit"] = df["amount"] - df["cost_price"]
df["profit_margin"] = (df["profit"] / df["amount"] * 100).round(2)
df["order_date"] = pd.to_datetime(df["created_at"]).dt.date
df["order_hour"] = pd.to_datetime(df["created_at"]).dt.hour
# 5. 标准化状态值
status_map = {
"completed": "已完成",
"refunded": "已退款",
"pending": "待处理",
"cancelled": "已取消"
}
df["status_cn"] = df["status"].map(status_map).fillna(df["status"])
logger.info(f"转换完成:{len(df):,} 行,{len(df.columns)} 列")
return df
# =====================
# Load(加载)层
# =====================
@task(name="加载到数仓", retries=2)
def load_to_dw(df: pd.DataFrame, table_name: str):
"""将处理后的数据写入数据仓库"""
logger = get_run_logger()
engine = sa.create_engine(os.environ.get("DW_DB_URL"))
# 使用 upsert 策略:当天数据先删后插(保证幂等性)
date_col = "order_date"
dates = df[date_col].unique()
with engine.begin() as conn:
# 删除当天已有数据
conn.execute(
sa.text(f"DELETE FROM {table_name} WHERE {date_col} = ANY(:dates)"),
{"dates": [str(d) for d in dates]}
)
# 写入新数据
df.to_sql(
table_name,
conn,
if_exists="append",
index=False,
method="multi",
chunksize=5000
)
logger.info(f"写入 {table_name}:{len(df):,} 行")
@task(name="发送完成通知")
def send_notification(summary: dict):
"""发送 Slack 通知"""
logger = get_run_logger()
message = (
f"✅ ETL 任务完成\n"
f"日期:{summary['date']}\n"
f"订单数:{summary['order_count']:,}\n"
f"总金额:¥{summary['total_amount']:,.0f}\n"
f"耗时:{summary['elapsed_seconds']:.0f} 秒"
)
logger.info(message)
# 实际发送 Slack 通知
# slack_webhook = SlackWebhook.load("your-webhook-block")
# slack_webhook.notify(message)
# =====================
# 主 Flow
# =====================
@flow(
name="每日订单ETL",
description="抽取、清洗、转换订单数据并加载到数仓",
log_prints=True
)
def daily_order_etl(
start_date: str = None,
end_date: str = None
):
"""
主 ETL 流程
默认处理昨天的数据
"""
import time
start_time = time.time()
logger = get_run_logger()
# 默认处理昨天的数据
if not start_date:
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
start_date = end_date = yesterday
logger.info(f"开始处理:{start_date} 到 {end_date}")
# 并行抽取(Prefect 自动检测无依赖的任务并并行执行)
orders_future = extract_orders.submit(start_date, end_date)
users_future = extract_users.submit()
products_future = extract_products.submit()
# 等待所有抽取完成,进行转换
df_clean = transform(
orders_future.result(),
users_future.result(),
products_future.result()
)
# 并行加载到不同的目标表
load_to_dw.submit(df_clean, "dw_order_detail")
# 计算汇总指标并加载
daily_summary = (
df_clean
.groupby("order_date")
.agg(order_count=("order_id", "count"), total_amount=("amount", "sum"))
.reset_index()
)
load_to_dw.submit(daily_summary, "dw_daily_summary")
# 发送通知
elapsed = time.time() - start_time
send_notification({
"date": start_date,
"order_count": len(df_clean),
"total_amount": df_clean["amount"].sum(),
"elapsed_seconds": elapsed
})
logger.info(f"ETL 完成,耗时 {elapsed:.1f} 秒")
return len(df_clean)
# 直接运行
if __name__ == "__main__":
daily_order_etl()2.3 部署定时任务
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
# 创建定时部署
deployment = Deployment.build_from_flow(
flow=daily_order_etl,
name="daily-order-etl-prod",
schedule=CronSchedule(cron="0 2 * * *"), # 每天凌晨2点执行
tags=["production", "etl", "orders"],
parameters={} # 默认参数
)
deployment.apply()三、Airflow 实战——企业级 DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pandas as pd
# DAG 默认参数
default_args = {
"owner": "data-team",
"depends_on_past": False, # 不依赖上次运行是否成功
"start_date": days_ago(1),
"email": ["data-team@company.com"],
"email_on_failure": True, # 失败时发邮件
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2)
}
# 定义 DAG
with DAG(
dag_id="daily_order_etl",
default_args=default_args,
description="每日订单数据 ETL",
schedule_interval="0 2 * * *", # 凌晨2点
catchup=False, # 不补跑历史任务
tags=["etl", "orders", "daily"]
) as dag:
def extract_data(**context):
"""抽取数据并通过 XCom 传递给下一个任务"""
execution_date = context["execution_date"]
date_str = execution_date.strftime("%Y-%m-%d")
# 实际抽取逻辑
print(f"抽取 {date_str} 的数据...")
row_count = 15000 # 模拟
# 通过 XCom 传递数据(适合传小数据,大数据存到文件/数据库)
context["task_instance"].xcom_push(key="row_count", value=row_count)
context["task_instance"].xcom_push(key="date_str", value=date_str)
return row_count
def transform_data(**context):
"""转换数据"""
row_count = context["task_instance"].xcom_pull(
task_ids="extract_data", key="row_count"
)
print(f"转换 {row_count} 行数据...")
def load_data(**context):
"""加载数据"""
row_count = context["task_instance"].xcom_pull(
task_ids="extract_data", key="row_count"
)
date_str = context["task_instance"].xcom_pull(
task_ids="extract_data", key="date_str"
)
print(f"加载 {date_str} 的 {row_count} 行数据...")
def validate_data(**context):
"""数据质量检查"""
row_count = context["task_instance"].xcom_pull(
task_ids="extract_data", key="row_count"
)
# 检查数据量是否合理
if row_count < 1000:
raise ValueError(f"数据量异常:仅 {row_count} 行,可能存在数据丢失")
print(f"数据质量检查通过:{row_count} 行")
# 定义任务
extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data,
)
transform_task = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
)
load_task = PythonOperator(
task_id="load_data",
python_callable=load_data,
)
validate_task = PythonOperator(
task_id="validate_data",
python_callable=validate_data,
)
# 设置依赖关系(DAG 结构)
extract_task >> transform_task >> load_task >> validate_task四、踩坑实录一:Prefect 任务结果过大导致内存溢出
现象:ETL 流程中,把大 DataFrame 从一个任务传给下一个任务,内存快速消耗,最终 OOM。
原因:Prefect 默认把任务结果缓存在内存中,大型数据集会占用大量内存。
解法:
from prefect.filesystems import LocalFileSystem
import pickle, tempfile, os
@task
def extract_large_data() -> str:
"""返回数据文件路径,而不是 DataFrame"""
df = pd.read_sql("SELECT * FROM huge_table", engine)
# 保存到临时文件
temp_path = f"/tmp/etl_data_{datetime.now().strftime('%Y%m%d%H%M%S')}.parquet"
df.to_parquet(temp_path, index=False)
return temp_path # 只传路径,不传数据
@task
def transform_large_data(data_path: str) -> str:
"""从文件加载、处理、保存"""
df = pd.read_parquet(data_path)
# 处理...
df_clean = df[df["amount"] > 0]
output_path = data_path.replace(".parquet", "_clean.parquet")
df_clean.to_parquet(output_path, index=False)
# 清理输入文件
os.remove(data_path)
return output_path五、踩坑实录二:Airflow XCom 传大数据
现象:通过 XCom 传 DataFrame,报 OperationalError: value too large。
原因:XCom 把数据存在 Airflow 的元数据库里,大小有限制(默认 48KB)。
解法:
# 通过共享存储(S3/本地文件系统)传递大数据
import boto3, pickle
def extract_and_save(**context):
df = extract_data_from_db()
# 保存到 S3
s3 = boto3.client("s3")
key = f"etl/temp/{context['ds']}/extracted.parquet"
df.to_parquet("/tmp/extracted.parquet", index=False)
s3.upload_file("/tmp/extracted.parquet", "your-bucket", key)
# XCom 只传 S3 路径
context["task_instance"].xcom_push(key="data_path", value=f"s3://your-bucket/{key}")六、踩坑实录三:时区问题导致数据重叠或遗漏
现象:ETL 任务每天凌晨2点跑,但有时候会重复处理昨天的数据,有时候又遗漏了一些数据,边界不清晰。
原因:数据库存的是 UTC 时间,但业务是按北京时间(UTC+8)统计,两者差8小时,导致"昨天"的边界不一致。
解法:
from datetime import timezone
import pytz
def get_date_range_for_etl(execution_date: datetime) -> tuple:
"""
根据业务时区计算正确的数据抽取范围
"""
BJ_TZ = pytz.timezone("Asia/Shanghai")
# 业务日期是北京时间的昨天
bj_date = execution_date.astimezone(BJ_TZ).date() - timedelta(days=1)
# 转换为 UTC 时间范围
start_utc = BJ_TZ.localize(
datetime.combine(bj_date, datetime.min.time())
).astimezone(timezone.utc)
end_utc = BJ_TZ.localize(
datetime.combine(bj_date, datetime.max.time())
).astimezone(timezone.utc)
return start_utc, end_utc, bj_date
# 使用
start_utc, end_utc, bj_date = get_date_range_for_etl(datetime.now())
print(f"业务日期:{bj_date}")
print(f"UTC 范围:{start_utc} 到 {end_utc}")ETL 管道的本质是数据契约——你保证数据每天在约定的时间以约定的格式出现在约定的位置。工作流编排工具是帮你履行这个契约的基础设施。从 cron 脚本升级到 Prefect 或 Airflow,是数据工程专业化的第一步,也是最值得投资的一步。
