设计一个分布式任务调度系统:XXL-Job vs PowerJob vs Quartz
设计一个分布式任务调度系统:XXL-Job vs PowerJob vs Quartz
适读人群:Java中高级工程师、需要做任务调度的技术人员 | 阅读时长:约18分钟 | 难度:★★★☆☆
开篇故事
曾经有个项目,10个微服务里用了8种不同的定时任务方案:Spring @Scheduled、Quartz、自己写的Redis分布式锁 + cron、xxl-job、甚至还有一个把定时任务写在了数据库的存储过程里。每次要调整任务执行时间,要找到具体是哪个服务、哪种方式实现的,改完还要重启服务发版。
有一次线上出了个bug,某个定时清理任务在凌晨触发时把不该删的数据删掉了。排查发现是8台服务器同时触发了这个任务(Redis锁失效了),执行了8次,删了8倍的数据。
痛定思痛,我花了两个月把所有定时任务统一迁移到了XXL-Job,建立了统一的调度平台。这篇文章就把定时任务调度系统的核心问题、主流框架对比,以及我们自己做调度系统时的关键设计点都讲清楚。
一、需求分析与规模估算
为什么需要分布式任务调度
单机@Scheduled的问题:
- 多实例部署时,每个实例都会执行,任务重复
- 不能在运行时动态修改cron表达式,必须重启
- 没有执行日志,任务失败了不知道
- 没有依赖管理,任务A执行完才能执行任务B
分布式任务调度需要解决的核心问题:
- 只执行一次: 集群中多台机器,同一个任务只让一台执行
- 调度灵活: 支持运行时修改cron、手动触发、延迟执行
- 可靠性: 任务失败自动重试,执行记录可查
- 扩展性: 任务量大时支持分片并行执行
规模估算
以一个中等规模平台为例:
任务规模:
- 注册任务数:500个(各业务系统的定时任务)
- 每天任务执行次数:约10万次(平均每个任务每天200次)
- 最大并发执行任务数:50个(凌晨批处理高峰)
调度性能要求:
- 调度延迟:任务触发时间误差 < 1秒
- 调度节点QPS:不高(调度本身只是触发,不做业务计算)
执行节点规模:
- 执行器集群:20-50台机器(业务应用服务器,兼做执行器)
这个规模下,任何主流框架都能满足,技术选型更多看功能特性和运维成本。
二、三大框架对比
Quartz
最老牌的Java调度框架,20年历史。集群模式下用数据库做分布式锁,所有节点共享一个Quartz数据库,通过行锁保证同一个任务只被一个节点执行。
优点: 成熟稳定,Spring原生支持
缺点: 无管理界面,运维麻烦;数据库行锁在任务量大时性能差;不支持分片
适用场景: 小规模项目,不需要管理界面,任务数量少(< 100个)
XXL-Job
国内最流行的分布式任务调度框架,大众点评开源,目前有26k+ GitHub Star。
架构: 调度中心(xxl-job-admin)+ 执行器。调度中心负责触发任务,执行器负责执行任务。调度中心通过HTTP调用执行器,执行器把结果回调给调度中心。
优点:
- 有成熟的管理界面,可视化操作
- 支持路由策略(第一个、最后一个、轮询、随机、一致性哈希、最不经常使用等)
- 支持分片广播(任务拆成N份,N台机器各执行一份)
- 失败重试、告警通知
缺点:
- 调度中心单点(需要部署多个 + 数据库锁)
- 不支持工作流(任务依赖DAG)
适用场景: 中等规模,需要管理界面,95%的场景
PowerJob
阿里系的新一代调度框架,功能比XXL-Job强大。
优点:
- 支持Map/MapReduce任务(大数据量自动分片)
- 支持工作流DAG(任务依赖编排)
- 调度服务器集群(无单点)
缺点:
- 相对较新,社区小于XXL-Job
- 功能复杂,上手成本高
适用场景: 需要工作流、大数据分片的场景
选型建议
| 场景 | 推荐方案 |
|---|---|
| 简单定时任务,< 50个 | Spring @Scheduled + Redis分布式锁 |
| 中等规模,需要管理界面 | XXL-Job |
| 需要任务工作流、Map/MR | PowerJob |
| 已有Spring Cloud生态 | Spring Cloud Task |
三、系统架构设计(自研简版调度器)
理解原理比用框架更重要。下面我设计一个简版的分布式任务调度系统,把核心机制展示清楚:
四、关键代码实现
4.1 任务触发调度器(核心逻辑)
@Component
@Slf4j
public class TaskScheduler {
@Autowired
private TaskDefinitionMapper taskMapper;
@Autowired
private TaskDispatchService dispatchService;
@Autowired
private StringRedisTemplate redisTemplate;
// 预读时间窗口:提前5秒扫描即将触发的任务
private static final long PRE_READ_WINDOW_MS = 5000;
/**
* 调度主循环:每秒扫描一次待执行任务
* 使用分布式锁保证只有一个调度节点在工作(主备模式)
*/
@Scheduled(fixedDelay = 1000)
public void schedule() {
// 尝试获取调度主节点锁
String lockKey = "scheduler:master:lock";
Boolean acquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, getLocalIp(), 10, TimeUnit.SECONDS);
if (!Boolean.TRUE.equals(acquired)) {
return; // 不是主节点,跳过
}
try {
doSchedule();
} finally {
// 续期或释放锁(如果任务执行超过10秒会自动释放,
// 此处10秒内完成可以主动续期)
redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
}
}
private void doSchedule() {
long now = System.currentTimeMillis();
long maxTriggerTime = now + PRE_READ_WINDOW_MS;
// 查询触发时间在 [now-5s, now+5s] 内的任务(避免漏触发)
List<TaskDefinition> pendingTasks = taskMapper.findPendingTasks(
now - 5000, maxTriggerTime);
for (TaskDefinition task : pendingTasks) {
long triggerTime = task.getNextTriggerTime();
if (triggerTime <= now + 1000) {
// 立即触发
triggerTask(task);
} else {
// 提前5秒预读到内存时间轮中,到时再触发
scheduleInMemory(task, triggerTime - now);
}
// 计算下次触发时间,更新数据库
long nextTime = calculateNextTriggerTime(task.getCronExpression());
taskMapper.updateNextTriggerTime(task.getId(), nextTime);
}
}
private void triggerTask(TaskDefinition task) {
// 防止重复触发:用任务ID+触发时间做幂等Key
String idempotentKey = "trigger:" + task.getId() + ":" + task.getNextTriggerTime();
Boolean isFirst = redisTemplate.opsForValue()
.setIfAbsent(idempotentKey, "1", 60, TimeUnit.SECONDS);
if (!Boolean.TRUE.equals(isFirst)) {
log.warn("重复触发,跳过。taskId={}", task.getId());
return;
}
// 异步派发,不阻塞调度循环
dispatchService.dispatch(task);
}
private long calculateNextTriggerTime(String cronExpression) {
CronExpression cron = CronExpression.parse(cronExpression);
ZonedDateTime next = cron.next(ZonedDateTime.now());
return next != null ? next.toInstant().toEpochMilli() : Long.MAX_VALUE;
}
private String getLocalIp() {
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
return "unknown";
}
}
}4.2 执行器注册与路由
@Service
@Slf4j
public class ExecutorRegistryService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String EXECUTOR_KEY_PREFIX = "executor:";
private static final int EXECUTOR_TTL_SECONDS = 90; // 3倍心跳间隔
/**
* 执行器启动时注册
*/
public void register(String appName, String address) {
String key = EXECUTOR_KEY_PREFIX + appName;
redisTemplate.opsForSet().add(key, address);
redisTemplate.expire(key, EXECUTOR_TTL_SECONDS, TimeUnit.SECONDS);
log.info("执行器注册成功, appName={}, address={}", appName, address);
}
/**
* 心跳续期(每30秒一次)
*/
public void heartbeat(String appName, String address) {
String key = EXECUTOR_KEY_PREFIX + appName;
redisTemplate.opsForSet().add(key, address);
redisTemplate.expire(key, EXECUTOR_TTL_SECONDS, TimeUnit.SECONDS);
}
/**
* 获取可用执行器列表
*/
public List<String> getExecutors(String appName) {
String key = EXECUTOR_KEY_PREFIX + appName;
Set<String> addresses = redisTemplate.opsForSet().members(key);
return addresses != null ? new ArrayList<>(addresses) : Collections.emptyList();
}
/**
* 路由选择:根据策略选择一台执行器
*/
public String route(String appName, RoutingStrategy strategy, String shardingKey) {
List<String> executors = getExecutors(appName);
if (executors.isEmpty()) {
throw new RuntimeException("无可用执行器: " + appName);
}
switch (strategy) {
case FIRST:
return executors.get(0);
case RANDOM:
return executors.get(ThreadLocalRandom.current().nextInt(executors.size()));
case ROUND_ROBIN:
// 轮询:用Redis自增计数器实现
long index = redisTemplate.opsForValue()
.increment("rr:" + appName);
return executors.get((int)(index % executors.size()));
case CONSISTENT_HASH:
// 一致性哈希:相同shardingKey路由到同一台执行器
int hash = Math.abs(shardingKey.hashCode());
return executors.get(hash % executors.size());
default:
return executors.get(0);
}
}
}4.3 分片任务实现(XXL-Job风格)
/**
* 分片广播任务示例
* 场景:全量用户积分到期检查,100万用户分10片并行处理
*/
@Component
@Slf4j
public class PointsExpireCheckJob {
@Autowired
private PointsService pointsService;
@Autowired
private UserMapper userMapper;
/**
* XXL-Job注解:任务处理器
* 支持分片:分片参数由调度中心注入
*/
@XxlJob("pointsExpireCheck")
public void execute() {
// 获取分片参数
int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
XxlJobHelper.log("开始执行积分到期检查, 分片{}/{}", shardIndex + 1, shardTotal);
// 按分片查询对应的用户ID范围
// 策略:userId % shardTotal == shardIndex
int page = 0;
int batchSize = 1000;
List<Long> userIds;
int processedCount = 0;
do {
// 查询属于本分片的用户(按userId取模)
userIds = userMapper.findByShardIndex(shardIndex, shardTotal, page++, batchSize);
for (Long userId : userIds) {
try {
pointsService.checkAndExpirePoints(userId);
processedCount++;
} catch (Exception e) {
XxlJobHelper.log("用户{}积分检查失败: {}", userId, e.getMessage());
}
}
} while (userIds.size() == batchSize);
XxlJobHelper.log("积分到期检查完成, 处理用户数={}", processedCount);
XxlJobHelper.handleSuccess();
}
}对应的SQL分片查询:
-- 查询属于分片shardIndex的用户(总分片数shardTotal)
SELECT id FROM user
WHERE MOD(id, #{shardTotal}) = #{shardIndex}
AND status = 1
LIMIT #{offset}, #{batchSize}4.4 任务失败重试与告警
@Service
@Slf4j
public class TaskCallbackService {
@Autowired
private TaskExecutionLogMapper logMapper;
@Autowired
private TaskDefinitionMapper taskMapper;
@Autowired
private AlertService alertService;
/**
* 接收执行器的回调
*/
public void handleCallback(TaskCallbackRequest request) {
TaskExecutionLog log = logMapper.findByExecutionId(request.getExecutionId());
if (log == null) return;
if (request.isSuccess()) {
// 成功:更新日志状态
logMapper.updateSuccess(log.getId(), request.getResult());
} else {
// 失败:判断是否需要重试
handleFailure(log, request.getErrorMsg());
}
}
private void handleFailure(TaskExecutionLog executionLog, String errorMsg) {
TaskDefinition task = taskMapper.findById(executionLog.getTaskId());
logMapper.updateFailed(executionLog.getId(), errorMsg);
int retryCount = executionLog.getRetryCount();
int maxRetry = task.getMaxRetryCount();
if (retryCount < maxRetry) {
// 重试:延迟N秒后再次触发
long retryDelay = calculateRetryDelay(retryCount); // 指数退避
scheduleRetry(task, executionLog.getId(), retryDelay);
log.info("任务失败,安排重试, taskId={}, retry={}/{}",
task.getId(), retryCount + 1, maxRetry);
} else {
// 超过最大重试次数:告警
alertService.sendAlert(AlertEvent.builder()
.taskId(task.getId())
.taskName(task.getName())
.errorMsg(errorMsg)
.failCount(maxRetry + 1)
.build());
log.error("任务失败超过最大重试次数, taskId={}", task.getId());
}
}
/**
* 指数退避:第1次重试延迟1分钟,第2次2分钟,第3次4分钟
*/
private long calculateRetryDelay(int retryCount) {
return (long) Math.pow(2, retryCount) * 60 * 1000;
}
}五、扩展性设计
调度中心的高可用
单台调度中心是单点,需要高可用。XXL-Job的方案:多台调度中心共享同一个数据库,通过数据库行锁保证只有一台在调度。这种方案简单可靠,但数据库成为瓶颈(不过调度中心QPS本身不高,通常够用)。
PowerJob的方案:调度中心集群,用Raft协议选主,主节点负责调度,从节点热备。这种方案更健壮,但部署复杂。
任务量过大时的分区调度
如果任务数增加到10万级别,单线程扫描会成为瓶颈。解决方案:按任务ID取模分区,每个调度节点只负责一部分任务的调度,水平扩展。
六、踩坑实录
坑1:时钟偏移导致任务重复触发
多台调度节点的服务器时钟差了2秒,导致同一个任务在A节点触发后,B节点由于时间滞后2秒,以为任务还没触发,又触发了一次。
解决方案:强制NTP同步,容忍1秒内的时钟偏差;同时用Redis实现任务触发幂等Key,同一任务同一触发时间只触发一次。
坑2:执行器宕机后任务卡住不重试
执行器接到任务后宕机了,任务既没有回调成功也没有回调失败,调度中心认为任务"还在执行中",永远不会重试。
解决方案:调度中心对每个执行中的任务设置超时时间(比如最长执行30分钟),超时后强制标记失败并触发重试。
坑3:大批量任务同时触发造成执行器OOM
某天凌晨,有30个任务的cron都设置了"每天0点执行",30个任务同时派发给同一个执行器,每个任务创建了1个线程,执行器线程数暴增到300+,直接OOM。
解决方案:执行器设置任务并发上限(最大运行任务数),超过上限时任务进入本地队列等待。同时建议不要把所有任务都设置整点触发,错开触发时间(比如0:00、0:01、0:02……)。
七、总结
分布式任务调度的核心问题:
| 问题 | 解决方案 |
|---|---|
| 集群中只执行一次 | 分布式锁(Redis或DB行锁) |
| 调度延迟准确 | NTP同步 + 时间轮算法 |
| 执行失败重试 | 回调机制 + 指数退避重试 |
| 大任务分片 | 分片参数注入 + MOD路由 |
| 可观测性 | 执行日志 + 告警通知 |
技术选型上:中小团队首选XXL-Job,开箱即用,文档完善,社区活跃;有大数据分片或工作流需求选PowerJob;简单场景用Spring @Scheduled + Redis分布式锁,成本最低。
