Python 分布式任务实战——Celery 在高并发场景下的真实调优经历
Python 分布式任务实战——Celery 在高并发场景下的真实调优经历
适读人群:使用 Celery 处理异步任务的 Python 工程师、遇到 Celery 性能问题的开发者 | 阅读时长:约14分钟 | 核心价值:真实生产案例,Celery 从崩溃到稳定的完整调优过程
我们有个数据处理服务,高峰期每分钟要处理大概 2300 个任务。Celery + Redis 的组合,跑了几个月一直很稳。
直到某次大促,任务量突然上到每分钟 9000 个,然后 Celery 开始出各种奇怪的问题:任务延迟从平均1.2秒涨到了17秒,Redis 连接数告警,偶尔还有任务莫名其妙地重复执行。
排查了整整两天,改了一堆配置,最终稳住了。今天把这个过程完整复盘一遍。
Celery 基本架构回顾
先对齐一下概念,避免后面讲调优时理解有偏差。
Producer(生产者)→ Broker(消息队列)→ Worker(消费者)→ Backend(结果存储)- Broker:消息的中间件,任务先发到这里排队。常用 Redis 或 RabbitMQ
- Worker:实际执行任务的进程,可以有多个
- Backend:存任务结果,如果不需要查结果可以不配
- Concurrency:每个 worker 进程的并发数
默认情况下,Celery worker 用 prefork(多进程),并发数等于 CPU 核心数。
踩坑实录一:Redis 连接数爆了
现象: 大促期间,Redis 开始报 "max number of clients reached",Celery 任务堆积。
原因: 我们有 8 个 worker,每个 worker 启动 8 个子进程(8核机器),每个子进程维护自己的 Redis 连接。加上 Celery Beat、Producer 这边,总连接数达到了 8×8×3 = 192 个(broker 连接 + backend 连接 + 心跳连接),超过了 Redis 默认的 128 个最大连接限制。
解法:
# celery_app.py
from celery import Celery
app = Celery('myapp')
app.conf.update(
broker_url='redis://redis:6379/0',
result_backend='redis://redis:6379/1',
# 关键:限制 broker 连接池大小
broker_pool_limit=10, # 每个 worker 进程最多维持10个 broker 连接
redis_max_connections=20, # Redis backend 连接池大小
# 心跳相关
broker_heartbeat=None, # 禁用心跳(大量 worker 时心跳本身占大量连接)
broker_connection_retry_on_startup=True,
)同时,Redis 的 maxclients 调高:
# redis.conf
maxclients 1000这两个改完,连接数从 192 降到了 60 以内,告警消失。
踩坑实录二:任务重复执行
现象: 某些任务被执行了两次甚至三次,导致数据重复写入。
原因: 这是 Celery 的 ack_late 和 visibility timeout 配合不当的问题。
默认情况下,worker 取到任务立刻 ACK(确认消费),如果 ACK 之后 worker 进程崩溃,任务就丢了。为了避免丢任务,很多人开启了 acks_late=True,这样任务执行完才 ACK。
但问题来了:如果任务执行时间超过了 Redis 的 visibility_timeout(默认1小时),Redis 会认为这个消息没被消费(超时未 ACK),重新投递给其他 worker,于是同一个任务被执行两次。
解法:
app.conf.update(
# visibility_timeout 要大于你最长任务的执行时间
broker_transport_options={
'visibility_timeout': 3600 * 6, # 6小时,比最长任务时间大
},
acks_late=True, # 任务执行完才ACK,避免丢任务
reject_on_worker_lost=True, # worker 崩溃时拒绝任务,让其他 worker 重新执行
)更根本的解法是给任务加幂等保护:
import redis
from celery import Task
from functools import wraps
redis_client = redis.from_url("redis://redis:6379/2")
def idempotent_task(timeout=3600):
"""幂等任务装饰器,同一个 task_id 只执行一次"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
lock_key = f"task_lock:{self.request.id}"
# SETNX:只有当 key 不存在时才设置,原子操作
if not redis_client.setnx(lock_key, "1"):
print(f"Task {self.request.id} already processed, skipping")
return None
redis_client.expire(lock_key, timeout)
try:
return func(self, *args, **kwargs)
except Exception:
# 任务失败时删除锁,允许重试
redis_client.delete(lock_key)
raise
return wrapper
return decorator
class MyTask(Task):
abstract = True
@app.task(bind=True, base=MyTask)
@idempotent_task(timeout=7200)
def process_data(self, data_id: int):
"""处理数据任务,保证幂等"""
# 实际处理逻辑
...踩坑实录三:任务堆积,消费速度跟不上
现象: 队列里积压了几千个任务,worker 处理速度明显跟不上生产速度,延迟持续升高。
原因: 我们当时用的是 prefork 并发模式,每个 worker 进程是 Python 进程,进程间切换开销大。加上我们的任务其实主要是 I/O 密集型(调用数据库、Redis),不是 CPU 密集型,所以增加进程数也没太大帮助,线程或协程才更合适。
解法: 切换到 gevent 或 eventlet 并发模式:
# 启动命令
# prefork(默认,CPU密集型用这个):
celery -A myapp worker --concurrency=8 --pool=prefork
# gevent(I/O密集型用这个,并发数可以很高):
celery -A myapp worker --concurrency=200 --pool=gevent
# eventlet(类似 gevent):
celery -A myapp worker --concurrency=200 --pool=eventlet切换到 gevent 后,同样的机器,并发数从 8 提升到 200,任务处理速度提升了约 18 倍,积压迅速清空。
注意:用 gevent 需要在代码最开始打 monkey patch:
# celery_app.py 的最顶部
from gevent import monkey
monkey.patch_all()
from celery import Celery
# ... 后续代码完整的 Celery 生产配置
整理了一份经过生产验证的配置:
# celery_app.py
from celery import Celery
from kombu import Queue, Exchange
app = Celery('myapp')
# 定义不同优先级的队列
default_exchange = Exchange('default', type='direct')
priority_exchange = Exchange('priority', type='direct')
app.conf.update(
# ---- Broker & Backend ----
broker_url='redis://redis:6379/0',
result_backend='redis://redis:6379/1',
result_expires=3600, # 结果保留1小时后自动删除
# ---- 序列化 ----
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
enable_utc=True,
# ---- 任务执行 ----
task_acks_late=True,
task_reject_on_worker_lost=True,
task_track_started=True, # 记录任务开始时间
# ---- 重试策略 ----
task_max_retries=3,
task_default_retry_delay=60, # 重试间隔60秒
# ---- Worker ----
worker_prefetch_multiplier=1, # 每次只预取1个任务,避免一个 worker 囤积太多
worker_max_tasks_per_child=500, # 每个子进程处理500个任务后重启,防内存泄漏
worker_max_memory_per_child=400000, # 子进程内存超过400MB自动重启(KB)
# ---- 连接池 ----
broker_pool_limit=10,
broker_transport_options={
'visibility_timeout': 21600, # 6小时
},
# ---- 队列定义 ----
task_queues=[
Queue('high_priority', priority_exchange, routing_key='high'),
Queue('default', default_exchange, routing_key='default'),
Queue('low_priority', default_exchange, routing_key='low'),
],
task_default_queue='default',
task_default_exchange='default',
task_default_routing_key='default',
)
# ---- 任务路由 ----
app.conf.task_routes = {
'myapp.tasks.critical.*': {'queue': 'high_priority'},
'myapp.tasks.background.*': {'queue': 'low_priority'},
}
# ---- 任务定义示例 ----
@app.task(
bind=True,
max_retries=3,
default_retry_delay=30,
soft_time_limit=300, # 软超时5分钟,触发 SoftTimeLimitExceeded
time_limit=360, # 硬超时6分钟,强制 kill
queue='default',
name='myapp.tasks.process_order',
)
def process_order(self, order_id: int):
try:
# 实际业务逻辑
result = do_process(order_id)
return result
except SomeTemporaryError as exc:
# 临时性错误,重试
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
except Exception as exc:
# 永久性错误,记录日志,不重试
import logging
logging.error(f"Failed to process order {order_id}: {exc}", exc_info=True)
raise监控 Celery
只靠日志不够,要有指标:
# 用 celery-prometheus-exporter 或自己暴露指标
from celery import Celery
from celery.utils.log import get_task_logger
from prometheus_client import Counter, Histogram, Gauge
logger = get_task_logger(__name__)
TASK_COMPLETED = Counter('celery_tasks_completed_total', 'Completed tasks', ['task_name', 'status'])
TASK_DURATION = Histogram('celery_task_duration_seconds', 'Task duration', ['task_name'])
TASK_QUEUE_LENGTH = Gauge('celery_queue_length', 'Queue length', ['queue_name'])
@app.task(bind=True)
def monitored_task(self, *args, **kwargs):
import time
start = time.time()
status = 'success'
try:
result = actual_logic(*args, **kwargs)
return result
except Exception as exc:
status = 'failed'
raise
finally:
duration = time.time() - start
task_name = self.name
TASK_COMPLETED.labels(task_name=task_name, status=status).inc()
TASK_DURATION.labels(task_name=task_name).observe(duration)总结:关键调优点
| 问题 | 原因 | 解法 |
|---|---|---|
| Redis 连接数爆炸 | 每个 worker 进程都建连接 | broker_pool_limit 限制连接池 |
| 任务重复执行 | visibility_timeout < 任务时间 | 调大 timeout + 幂等锁 |
| 任务积压 | prefork 并发数低 | 切 gevent,concurrency 提高 |
| Worker 内存泄漏 | 进程长期运行 | max_tasks_per_child 定期重启 |
| 长任务超时被杀 | time_limit 设太短 | 拆分任务 + 合理设超时 |
Celery 稳定运行的核心就两点:配置要精细,任务要幂等。 两个里面少了任何一个,早晚出问题。
