Python Celery 深度实战——任务队列、重试策略、优先级、监控
Python Celery 深度实战——任务队列、重试策略、优先级、监控
适读人群:构建异步任务系统的后端工程师 | 阅读时长:约18分钟 | 核心价值:Celery 生产级配置与工程化实践完整指南
一次因任务队列崩溃引发的线上事故
三年前我在一家 SaaS 公司,负责 AI 分析服务的后端。有个功能是用户上传文档,后台异步提取内容、调用大模型分析、生成报告,整个流程可能需要3-5分钟。
当时团队用的是一个自研的简陋任务队列——直接把任务写进 MySQL,一个定时任务每秒扫表取任务执行。这个方案有个致命问题:没有优先级、没有重试、没有并发控制。
某天一个大客户批量上传了3000份文档,系统直接崩了——数据库扫表 QPS 飙高,3000个任务开始被并发执行,MySQL 连接池耗尽,所有其他用户的请求也跟着超时,整个产品停服了将近2小时。
那次事故之后,团队下定决心迁移到 Celery。我花了三周完成了迁移和调优,踩了很多坑,今天把生产级 Celery 的完整实践讲清楚。
一、Celery 架构理解
在讲配置之前,先把架构搞清楚,避免用错方式:
Producer (Django/FastAPI)
↓ 发送任务
Broker (Redis / RabbitMQ) ← 任务队列,消息中间件
↓ 分发任务
Worker (Celery Worker) ← 执行任务的进程
↓ 存储结果
Backend (Redis / DB) ← 任务结果存储二、生产级配置
pip install celery redis celery[redis]# celery_app.py
from celery import Celery
from kombu import Queue, Exchange
app = Celery("myapp")
app.conf.update(
# Broker & Backend
broker_url="redis://:password@redis-host:6379/0",
result_backend="redis://:password@redis-host:6379/1",
# 序列化(JSON 比 pickle 更安全)
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# 时区
timezone="Asia/Shanghai",
enable_utc=True,
# 任务确认策略(可靠性关键配置)
task_acks_late=True, # 任务完成后才确认(worker 崩溃不丢任务)
task_reject_on_worker_lost=True, # worker 意外退出,任务重新入队
# 并发
worker_concurrency=8, # 每个 worker 进程的并发数
worker_prefetch_multiplier=1, # 每次只预取1个任务(配合 acks_late 更安全)
# 任务超时
task_soft_time_limit=300, # 软超时:300秒后任务收到 SoftTimeLimitExceeded
task_time_limit=360, # 硬超时:360秒后强制杀死任务
# 结果过期
result_expires=3600, # 结果保留1小时
# 多队列配置(重要!)
task_queues=(
Queue("high", Exchange("high"), routing_key="high", priority=10),
Queue("default", Exchange("default"), routing_key="default", priority=5),
Queue("low", Exchange("low"), routing_key="low", priority=1),
),
task_default_queue="default",
task_default_exchange="default",
task_default_routing_key="default",
)三、任务定义——重试策略是核心
from celery import Task
from celery.utils.log import get_task_logger
import time
logger = get_task_logger(__name__)
class BaseTask(Task):
"""基础任务类,统一处理日志和监控"""
abstract = True
def on_success(self, retval, task_id, args, kwargs):
logger.info(f"任务成功: {self.name} [{task_id}]")
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error(
f"任务失败: {self.name} [{task_id}], 错误: {exc}",
exc_info=True
)
# 这里可以发告警(钉钉/飞书/PagerDuty)
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.warning(f"任务重试: {self.name} [{task_id}], 原因: {exc}")
@app.task(
base=BaseTask,
bind=True,
queue="high",
# 重试配置
max_retries=5,
default_retry_delay=60, # 默认重试间隔60秒
autoretry_for=(ConnectionError, TimeoutError), # 这些异常自动重试
retry_backoff=True, # 指数退避:60s, 120s, 240s...
retry_backoff_max=600, # 最大退避不超过10分钟
retry_jitter=True, # 加随机抖动,避免雪崩
)
def process_document(self, doc_id: int, user_id: int) -> dict:
"""处理文档的核心任务"""
logger.info(f"开始处理文档: doc_id={doc_id}, user_id={user_id}")
try:
# 1. 从数据库获取文档
doc = fetch_document(doc_id)
# 2. 调用 AI 服务(可能超时或失败)
result = call_ai_service(doc.content)
# 3. 保存分析结果
save_result(doc_id, result)
return {"status": "success", "doc_id": doc_id}
except ConnectionError as exc:
# 连接错误,自动重试
raise self.retry(exc=exc, countdown=30)
except ValueError as exc:
# 参数错误,不重试,直接失败
logger.error(f"文档格式错误,不重试: doc_id={doc_id}, {exc}")
raise
except Exception as exc:
# 未知错误,使用指数退避重试
countdown = min(60 * (2 ** self.request.retries), 600)
logger.warning(f"未知错误,{countdown}s 后重试: {exc}")
raise self.retry(exc=exc, countdown=countdown)
@app.task(queue="low", rate_limit="10/m") # 每分钟最多执行10次
def send_notification(user_id: int, message: str) -> bool:
"""发送通知(低优先级,有速率限制)"""
# 推送逻辑
return True
@app.task(queue="default", time_limit=30, soft_time_limit=25)
def quick_task(data: dict) -> dict:
"""快速任务,严格限时"""
from celery.exceptions import SoftTimeLimitExceeded
try:
result = process_quickly(data)
return result
except SoftTimeLimitExceeded:
logger.warning("任务接近超时,保存中间结果")
return {"status": "partial", "data": data}四、任务路由——精细控制任务分发
# 基于任务名的路由配置
app.conf.task_routes = {
"tasks.process_document": {"queue": "high"},
"tasks.send_notification": {"queue": "low"},
"tasks.*": {"queue": "default"}, # 其他任务走默认队列
}
# 基于运行时参数的动态路由
@app.task
def dynamic_route_task(data: dict, priority: str = "normal"):
pass
# 发送时动态指定队列
process_document.apply_async(
args=[doc_id, user_id],
queue="high",
priority=9, # 0-9,数字越大优先级越高
countdown=0, # 立即执行
expires=3600, # 1小时内未执行则丢弃
)
# 链式任务
from celery import chain, group, chord
# 顺序执行:先处理文档,再发通知
task_chain = chain(
process_document.s(doc_id, user_id),
send_notification.s(user_id, "文档处理完成"),
)
result = task_chain.delay()
# 并行执行一组任务,所有完成后执行回调
task_group = group(process_document.s(i, user_id) for i in doc_ids)
callback = send_notification.s(user_id, "所有文档处理完成")
result = chord(task_group)(callback)五、踩坑实录
踩坑实录1:所有任务放同一个队列,VIP 用户任务被低优先级任务堵死
现象:付费用户提交的任务,排在免费用户任务后面,要等30分钟。
原因:单队列 FIFO,先来先服务,没有优先级机制。
解法:建立多队列(high/default/low),VIP 用户任务发到 high 队列,专门的 Worker 专跑 high 队列。
# 启动专门处理高优先级的 Worker
celery -A app worker -Q high --concurrency=4 --hostname=high-worker@%h
# 启动处理普通任务的 Worker
celery -A app worker -Q default,low --concurrency=8 --hostname=default-worker@%h踩坑实录2:任务结果忘记释放,Redis 内存爆了
现象:Redis 内存从几十 MB 增长到了几十 GB,最后触发 OOM 保护。
原因:result_expires 没有设置,每个任务结果永久保存在 Redis 里。
解法:设置 result_expires=3600,任务结果保留1小时后自动清理。
踩坑实录3:worker_prefetch_multiplier 太大导致任务分配不均
现象:有些 Worker 进程排了很长的队列,有些却在空闲,整体吞吐量下降。
原因:默认 worker_prefetch_multiplier=4,每个 Worker 预取4个任务,导致分配不均。
解法:对于执行时间长且不均匀的任务,设置 worker_prefetch_multiplier=1,每次只预取1个,让任务分配更均衡。
六、监控——Flower 是标配
pip install flower
celery -A app flower --port=5555 --address=0.0.0.0
# 带基础认证
celery -A app flower --basic_auth=admin:your-passwordFlower 提供:
- 实时任务状态监控
- Worker 状态和负载
- 任务历史和失败记录
- 支持手动重试失败任务
生产环境还可以集成 Prometheus + Grafana:
pip install celery-exporter
# celery-exporter 会暴露 Prometheus 指标
celery-exporter --broker-url redis://localhost:6379/0七、选型建议
Celery 功能强大,但也有一定运维成本。选型原则:
| 需求 | 方案 |
|---|---|
| 任务量少(<1000/天) | APScheduler 或简单线程池 |
| 中等规模,单机 | Celery 单 Worker |
| 大规模,需要横向扩展 | Celery 多 Worker + Redis Cluster |
| 需要任务依赖关系(DAG) | Celery + Canvas 或 Apache Airflow |
| 极高可靠性要求 | Celery + RabbitMQ(比 Redis 更可靠) |
那次线上事故之后,我们的 Celery 集群稳定支撑了同时处理数千份文档,高峰期也没再出现过类似问题。系统设计的收益,远比代码层面的优化更显著。
