Python 定时任务实战——APScheduler、Celery Beat 分布式任务调度
Python 定时任务实战——APScheduler、Celery Beat 分布式任务调度
适读人群:需要在 Python 项目中实现定时任务的后端开发者 | 阅读时长:约16分钟 | 核心价值:从简单定时到分布式调度的完整工程方案
从 crontab 的噩梦说起
我有个运营数据服务,早期用 Linux crontab 管理定时任务,配了七八个任务,每天定时跑数据同步、报表生成、缓存刷新。
表面上运行得很好。直到有一天,一个任务突然失败了,我根本不知道是哪个任务,因为 crontab 的日志默认就是发邮件(还没配)或者静默失败。找了半天,才发现是一个 Python 脚本依赖的环境变量没有在 crontab 的 shell 环境里设置。
更麻烦的是,随着任务越来越多,crontab 里的配置越来越难维护:任务之间有依赖关系、有些任务需要根据条件跳过、有些任务执行时间会随业务动态调整……crontab 完全无法满足这些需求。
那之后我全面切换到了 APScheduler,对于更复杂的分布式场景则用 Celery Beat。今天来系统讲讲这两套方案。
一、APScheduler——单机定时任务的最佳选择
pip install apschedulerAPScheduler 支持四种触发器:
cron:类似 crontab,按日历时间触发interval:固定间隔触发date:在指定时间点触发一次calendarinterval:按日历间隔(天/周/月)触发
完整的 APScheduler 封装
import logging
import signal
import time
from datetime import datetime
from typing import Callable
from contextlib import contextmanager
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.events import (
EVENT_JOB_EXECUTED,
EVENT_JOB_ERROR,
EVENT_JOB_MISSED,
)
from pytz import timezone
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
CHINA_TZ = timezone("Asia/Shanghai")
def create_scheduler(use_redis: bool = False) -> BackgroundScheduler:
"""
创建配置好的调度器
:param use_redis: 是否使用 Redis 持久化存储(重启后任务不丢失)
"""
jobstores = {
"default": RedisJobStore(host="localhost", port=6379, db=0)
if use_redis
else MemoryJobStore(),
}
executors = {
"default": ThreadPoolExecutor(max_workers=10),
"processpool": ProcessPoolExecutor(max_workers=4), # CPU 密集型任务用进程池
}
job_defaults = {
"coalesce": True, # 如果错过了多次执行,只补跑一次(不堆积)
"max_instances": 1, # 同一任务最多同时运行1个实例(防止重叠)
"misfire_grace_time": 60, # 错过60秒内的任务还是会执行
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=CHINA_TZ,
)
# 监听任务执行事件
def job_listener(event):
if event.exception:
logger.error(
f"任务执行失败: {event.job_id}, 错误: {event.exception}",
exc_info=event.traceback,
)
elif hasattr(event, "missed_fire_time"):
logger.warning(f"任务错过执行: {event.job_id}")
else:
logger.info(f"任务执行成功: {event.job_id}")
scheduler.add_listener(
job_listener,
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED,
)
return scheduler
class SchedulerManager:
"""调度器管理类,支持动态添加/删除/修改任务"""
def __init__(self, use_redis: bool = False):
self.scheduler = create_scheduler(use_redis)
self._started = False
def start(self):
if not self._started:
self.scheduler.start()
self._started = True
logger.info("调度器已启动")
def stop(self, wait: bool = True):
if self._started:
self.scheduler.shutdown(wait=wait)
self._started = False
logger.info("调度器已停止")
def add_cron_job(
self,
func: Callable,
job_id: str,
cron_expr: str,
**kwargs,
):
"""
添加 cron 风格定时任务
:param cron_expr: 格式 "秒 分 时 日 月 星期",如 "0 0 9 * * 1-5"(工作日早9点)
"""
parts = cron_expr.split()
if len(parts) == 5:
second, minute, hour, day, month, day_of_week = 0, *parts
elif len(parts) == 6:
second, minute, hour, day, month, day_of_week = parts
else:
raise ValueError(f"无效的 cron 表达式: {cron_expr}")
self.scheduler.add_job(
func,
"cron",
id=job_id,
second=second,
minute=minute,
hour=hour,
day=day,
month=month,
day_of_week=day_of_week,
replace_existing=True,
**kwargs,
)
logger.info(f"已添加 Cron 任务: {job_id} ({cron_expr})")
def add_interval_job(
self,
func: Callable,
job_id: str,
seconds: int = None,
minutes: int = None,
hours: int = None,
**kwargs,
):
"""添加固定间隔任务"""
self.scheduler.add_job(
func,
"interval",
id=job_id,
seconds=seconds,
minutes=minutes,
hours=hours,
replace_existing=True,
**kwargs,
)
def pause_job(self, job_id: str):
self.scheduler.pause_job(job_id)
logger.info(f"任务已暂停: {job_id}")
def resume_job(self, job_id: str):
self.scheduler.resume_job(job_id)
logger.info(f"任务已恢复: {job_id}")
def remove_job(self, job_id: str):
self.scheduler.remove_job(job_id)
def list_jobs(self) -> list[dict]:
jobs = []
for job in self.scheduler.get_jobs():
jobs.append({
"id": job.id,
"name": job.name,
"next_run": str(job.next_run_time),
"trigger": str(job.trigger),
})
return jobs
# 业务任务定义
def sync_data_task():
"""数据同步任务"""
logger.info(f"开始数据同步: {datetime.now()}")
# 实际业务逻辑
time.sleep(2)
logger.info("数据同步完成")
def generate_daily_report():
"""每日报表生成"""
logger.info(f"开始生成日报: {datetime.now()}")
time.sleep(5)
logger.info("日报生成完成")
def cleanup_temp_files():
"""清理临时文件"""
from pathlib import Path
import shutil
tmp_dir = Path("/tmp/app_cache")
if tmp_dir.exists():
shutil.rmtree(tmp_dir)
tmp_dir.mkdir()
logger.info("临时文件清理完成")
# 使用示例
def main():
manager = SchedulerManager(use_redis=False)
# 每5分钟同步一次数据
manager.add_interval_job(sync_data_task, "sync_data", minutes=5)
# 每天早上9点生成日报(工作日)
manager.add_cron_job(
generate_daily_report,
"daily_report",
"0 9 * * 1-5", # 周一到周五 09:00
)
# 每天凌晨2点清理临时文件
manager.add_cron_job(cleanup_temp_files, "cleanup", "0 2 * * *")
manager.start()
print("调度器运行中,当前任务:")
for job in manager.list_jobs():
print(f" - {job['id']}: 下次执行 {job['next_run']}")
try:
signal.pause() # Linux/macOS 等待信号
except AttributeError:
while True:
time.sleep(60)
finally:
manager.stop()
if __name__ == "__main__":
main()二、踩坑实录
踩坑实录1:任务执行超时,下一次触发时上一次还没结束
现象:某个每分钟执行的任务,有时候会执行超过1分钟,导致多个实例同时在跑,数据重复处理。
原因:max_instances 默认是1,但如果没有正确配置,APScheduler 会允许同时运行多个实例。
解法:确保 job_defaults 中 max_instances=1,同时给长时间任务加合理的超时保护。
踩坑实录2:进程重启后定时任务配置丢失
现象:每次重启服务,之前通过 API 动态添加的任务都消失了。
原因:使用了默认的 MemoryJobStore,重启后内存清空。
解法:切换到 RedisJobStore 或 SQLAlchemyJobStore,任务配置持久化存储。
踩坑实录3:时区问题导致任务在错误时间执行
现象:配置的早9点日报,实际在下午5点才跑(相差8小时)。
原因:调度器使用了 UTC 时区,而预期是北京时间,相差8小时。
解法:创建调度器时明确指定 timezone=timezone("Asia/Shanghai")。
三、Celery Beat——分布式定时任务
当单机调度不够用,需要多机协同、任务持久化、可视化监控时,Celery Beat 是标准答案:
pip install celery redis django-celery-beat # 或 celery[redis]# celery_app.py
from celery import Celery
from celery.schedules import crontab
app = Celery(
"myapp",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
timezone="Asia/Shanghai",
enable_utc=True,
beat_schedule={
# 每天 9:00 发送日报
"send-daily-report": {
"task": "tasks.send_daily_report",
"schedule": crontab(hour=9, minute=0, day_of_week="1-5"),
},
# 每5分钟同步数据
"sync-data-every-5min": {
"task": "tasks.sync_data",
"schedule": 300, # 秒
},
# 每月1号凌晨1点生成月报
"monthly-report": {
"task": "tasks.generate_monthly_report",
"schedule": crontab(hour=1, minute=0, day_of_month=1),
},
},
beat_max_loop_interval=5, # 调度精度(秒)
task_acks_late=True, # 任务完成后才确认,防止丢失
task_reject_on_worker_lost=True,
)
# tasks.py
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def sync_data(self):
"""可重试的数据同步任务"""
try:
# 业务逻辑
pass
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
# 启动命令
# 启动 Worker(可多机部署)
# celery -A celery_app worker --loglevel=info --concurrency=4
# 启动 Beat(只启动一个!)
# celery -A celery_app beat --loglevel=info
# 启动 Flower 监控面板(可选)
# celery -A celery_app flower --port=5555四、选型建议
| 场景 | 推荐方案 |
|---|---|
| 单机,简单定时任务 | APScheduler |
| 单机,需要持久化 | APScheduler + RedisJobStore |
| 多机分布式,需要任务队列 | Celery + Celery Beat |
| 超简单场景,不想引入依赖 | schedule 库(轻量级) |
| 系统级定时任务 | crontab(不依赖 Python 进程) |
选型核心原则:先评估规模。单机+任务数量少选 APScheduler;需要横向扩展或已经在用 Celery 就直接用 Celery Beat。不要过度设计,一个5人团队的内部服务上 Celery 反而增加了运维负担。
