延迟消息的4种实现:RocketMQ定时、RabbitMQ TTL+死信、Redis ZSet、时间轮
延迟消息的4种实现:RocketMQ定时、RabbitMQ TTL+死信、Redis ZSet、时间轮
适读人群:需要实现订单超时、定时通知等延迟任务的Java工程师 | 阅读时长:约18分钟
开篇故事
订单超时自动取消,是每个电商系统都有的需求。用户下单后30分钟内未支付,订单自动取消并释放库存。
这个需求看起来简单,实现起来却是个坑。
我第一次实现这个功能时,用了数据库定时扫描:每分钟扫一次订单表,找出超时未支付的订单执行取消。这个方案在日订单量10万以下还能用,到了100万日单后,每分钟扫表就开始把数据库CPU打高,慢查询告警不断。
后来公司业务规模扩大,在技术架构升级时,我研究了四种延迟消息的实现方案,从原理到实践全面对比,最终选了RocketMQ定时消息作为主方案,Redis ZSet作为补充。今天把这四种方案全部讲透。
一、延迟消息的技术本质
延迟消息的核心问题是:如何精确地在指定时间点触发一个任务,同时保证高可靠性和高性能。
四种方案的核心差异:
- 存储位置:消息存在哪里(MQ Broker、Redis、内存)
- 时间精度:能精确到秒还是分钟
- 可靠性:宕机后任务是否丢失
- 容量上限:能存多少延迟任务
二、四种方案原理对比
2.1 方案一:RocketMQ原生定时消息
RocketMQ 4.x支持18个固定延迟级别(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),RocketMQ 5.x支持任意延迟时间(精确到秒)。
原理:消息发到Broker后,先存入SCHEDULE_TOPIC_XXXX这个内部Topic(按延迟级别有18个队列),ScheduleMessageService每隔100ms扫描,到期后把消息投递到真正的业务Topic。
优点:原生支持,使用简单,高可靠(消息持久化)
缺点:4.x只有固定18个级别(不能精确到任意秒),5.x已支持任意延迟
2.2 方案二:RabbitMQ TTL+死信队列
上一期(第437期)详细讲了死信队列,这里说延迟消息的用法:
创建一个不带消费者的"延迟等待队列",设置TTL=延迟时间,设置DLX(死信交换机),消息过期后自动路由到业务队列处理。
优点:无需引入新中间件,RabbitMQ本身支持
缺点:队列级TTL不支持混合不同延迟时间(一个队列只能一个TTL),消息级TTL有堆积问题(详见第437期踩坑)
2.3 方案三:Redis ZSet延迟队列
利用Redis的有序集合(Sorted Set),以消息到期时间戳作为score,以消息ID/内容作为member。用定时任务轮询,ZRANGEBYSCORE key 0 当前时间戳 LIMIT 0 100取出到期的消息处理。
优点:简单,延迟精度高(秒级),任意时间都能支持
缺点:Redis不能100%保证持久性,集群故障可能丢失任务;需要轮询(空轮询浪费CPU);单Redis QPS有限
2.4 方案四:内存时间轮(HashedWheelTimer)
Netty自带HashedWheelTimer,或者Kafka也内置了时间轮算法。把延迟任务按到期时间分布到环形缓冲区的各个槽位(Slot),定时转动轮子,到达某个槽位时触发该槽位的所有任务。
优点:内存操作,延迟精度极高(毫秒级),吞吐量极大(百万级)
缺点:纯内存,进程重启任务全丢;不适合分布式场景(每个节点独立,无法共享)
三、完整Java代码实现
3.1 RocketMQ 5.x任意延迟消息
/**
* RocketMQ 5.x 任意延迟时间消息
* 支持精确到秒的任意延迟
*/
@Service
@Slf4j
public class RocketMQDelayService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送30分钟后触发的订单超时取消消息
*/
public void sendOrderTimeoutMsg(String orderId, int delaySeconds) {
OrderTimeoutEvent event = new OrderTimeoutEvent();
event.setOrderId(orderId);
event.setCreateTime(LocalDateTime.now());
event.setTimeoutAt(LocalDateTime.now().plusSeconds(delaySeconds));
// RocketMQ 5.x 使用 deliverTimeMs 设置精确投递时间
long deliverTimeMs = System.currentTimeMillis() + delaySeconds * 1000L;
Message<OrderTimeoutEvent> message = MessageBuilder
.withPayload(event)
.setHeader(RocketMQHeaders.KEYS, orderId)
.setHeader("deliverTimeMs", deliverTimeMs) // 精确投递时间
.build();
// RocketMQ 5.x API
rocketMQTemplate.asyncSend(
"order-timeout-topic",
message,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("超时消息发送成功: orderId={}, deliverAt={}, msgId={}",
orderId,
new Date(deliverTimeMs),
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("超时消息发送失败: orderId={}", orderId, e);
// 降级:写入DB由补偿任务处理
saveToDbForCompensation(orderId, deliverTimeMs);
}
}
);
}
/**
* RocketMQ 4.x 固定延迟级别版本
* 延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 对应级别:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
*/
public void sendOrderTimeoutMsg4x(String orderId) {
OrderTimeoutEvent event = new OrderTimeoutEvent();
event.setOrderId(orderId);
SendResult result = rocketMQTemplate.syncSend(
"order-timeout-topic",
MessageBuilder.withPayload(event).build(),
3000, // 超时时间3秒
14 // delayLevel=14 对应 10分钟
);
log.info("4.x延迟消息发送成功: orderId={}, delayLevel=14(10min), msgId={}",
orderId, result.getMsgId());
}
private void saveToDbForCompensation(String orderId, long deliverTimeMs) {
// 兜底逻辑
}
}
/**
* 订单超时消费者
*/
@Component
@RocketMQMessageListener(
topic = "order-timeout-topic",
consumerGroup = "order-timeout-consumer-group"
)
@Slf4j
public class OrderTimeoutConsumer implements RocketMQListener<OrderTimeoutEvent> {
private final OrderService orderService;
@Override
public void onMessage(OrderTimeoutEvent event) {
String orderId = event.getOrderId();
log.info("收到订单超时事件: orderId={}", orderId);
// 检查订单是否已支付(幂等)
Order order = orderService.findById(orderId);
if (order == null) {
log.warn("订单不存在: orderId={}", orderId);
return;
}
if (order.getStatus() == OrderStatus.PAID) {
log.info("订单已支付,无需取消: orderId={}", orderId);
return;
}
if (order.getStatus() == OrderStatus.CANCELLED) {
log.info("订单已取消,幂等跳过: orderId={}", orderId);
return;
}
// 取消订单,释放库存
orderService.cancelOrderByTimeout(orderId);
log.info("订单超时取消完成: orderId={}", orderId);
}
}3.2 Redis ZSet延迟队列实现
/**
* Redis ZSet延迟队列
* 适用场景:中小规模,需要任意延迟时间,对可靠性要求不是极高
*/
@Component
@Slf4j
public class RedisDelayQueue {
private static final String DELAY_QUEUE_KEY = "delay:orders";
private static final int BATCH_SIZE = 100;
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
private volatile boolean running = true;
public RedisDelayQueue(StringRedisTemplate redisTemplate, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
}
/**
* 添加延迟任务
* @param task 任务数据
* @param delaySeconds 延迟秒数
*/
public <T> void add(T task, long delaySeconds) {
try {
String taskJson = objectMapper.writeValueAsString(task);
double score = System.currentTimeMillis() / 1000.0 + delaySeconds;
redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, taskJson, score);
log.debug("延迟任务已添加: task={}, executeAt={}",
taskJson, new Date((long)(score * 1000)));
} catch (Exception e) {
log.error("添加延迟任务失败", e);
throw new RuntimeException("添加延迟任务失败", e);
}
}
/**
* 启动扫描线程(在@PostConstruct中调用)
*/
@PostConstruct
public void startScanner() {
Thread scanThread = new Thread(this::scan, "delay-queue-scanner");
scanThread.setDaemon(true);
scanThread.start();
log.info("Redis延迟队列扫描线程已启动");
}
private void scan() {
while (running) {
try {
processExpiredTasks();
// 每秒扫描一次(精度1秒)
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("扫描延迟队列异常", e);
}
}
}
private void processExpiredTasks() {
long now = System.currentTimeMillis() / 1000;
// 使用Lua脚本保证取出和删除的原子性(防止并发重复处理)
String luaScript =
"local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, ARGV[2])\n" +
"if #tasks == 0 then return tasks end\n" +
"redis.call('ZREM', KEYS[1], unpack(tasks))\n" +
"return tasks";
DefaultRedisScript<List> script = new DefaultRedisScript<>(luaScript, List.class);
List<String> tasks = redisTemplate.execute(
script,
Collections.singletonList(DELAY_QUEUE_KEY),
String.valueOf(now),
String.valueOf(BATCH_SIZE)
);
if (tasks == null || tasks.isEmpty()) {
return;
}
log.info("扫描到{}个到期延迟任务", tasks.size());
for (String taskJson : tasks) {
try {
OrderTimeoutTask task = objectMapper.readValue(taskJson,
OrderTimeoutTask.class);
handleTask(task);
} catch (Exception e) {
log.error("处理延迟任务失败: task={}", taskJson, e);
// 处理失败的任务可以重新加回队列(加一个短延迟)
// 或者存入DB等人工处理
}
}
}
private void handleTask(OrderTimeoutTask task) {
// 调用业务逻辑
log.info("处理延迟任务: orderId={}", task.getOrderId());
// orderService.cancelOrderByTimeout(task.getOrderId());
}
@PreDestroy
public void stop() {
running = false;
}
}3.3 时间轮实现(Netty HashedWheelTimer)
/**
* Netty时间轮实现延迟任务
* 适用:单机、高精度(毫秒级)、高吞吐、不需要持久化
* 典型场景:连接心跳超时、批量任务调度
*/
@Component
@Slf4j
public class WheelTimerDelayService {
/**
* HashedWheelTimer参数说明:
* - tickDuration=100ms:时间轮精度100ms
* - wheelSize=512:512个槽,时间轮一圈=512*100ms=51.2秒
* - maxPendingTimeouts=1000000:最大挂起任务数100万
*/
private final HashedWheelTimer timer = new HashedWheelTimer(
r -> new Thread(r, "wheel-timer"),
100,
TimeUnit.MILLISECONDS,
512,
true,
1_000_000
);
/**
* 添加延迟任务
* @param taskId 任务ID(用于取消)
* @param delay 延迟时间
* @param unit 时间单位
* @param task 任务逻辑
* @return Timeout对象(可用于取消)
*/
public Timeout addTask(String taskId, long delay, TimeUnit unit, Runnable task) {
Timeout timeout = timer.newTimeout(
t -> {
if (t.isCancelled()) {
log.debug("任务已取消: taskId={}", taskId);
return;
}
try {
task.run();
} catch (Exception e) {
log.error("时间轮任务执行失败: taskId={}", taskId, e);
}
},
delay,
unit
);
log.debug("时间轮任务已添加: taskId={}, delay={}{}",
taskId, delay, unit.name().toLowerCase());
return timeout;
}
/**
* 示例:5秒后检查WebSocket连接是否仍然活跃
*/
public void scheduleHeartbeatCheck(String connectionId, WebSocketSession session) {
addTask(
"heartbeat:" + connectionId,
5,
TimeUnit.SECONDS,
() -> {
if (!session.isOpen()) {
log.info("连接已断开: connectionId={}", connectionId);
return;
}
// 检查最后心跳时间
long lastHeartbeat = getLastHeartbeatTime(connectionId);
if (System.currentTimeMillis() - lastHeartbeat > 30000) {
log.warn("连接心跳超时,主动断开: connectionId={}", connectionId);
closeConnection(session);
} else {
// 重新调度
scheduleHeartbeatCheck(connectionId, session);
}
}
);
}
private long getLastHeartbeatTime(String connectionId) {
return 0L; // 实际实现
}
private void closeConnection(WebSocketSession session) {
// 关闭连接的逻辑
}
@PreDestroy
public void shutdown() {
timer.stop();
}
}四、踩坑实录
坑1:RocketMQ 4.x的延迟级别不满足需求
订单超时设置是"30分钟",RocketMQ 4.x Level 14=10分钟,Level 15=20分钟,Level 16=30分钟,正好有30分钟。但后来需要增加"15分钟催付提醒",Level 13=9分钟,Level 14=10分钟,都不是15分钟。
解决方案:升级到RocketMQ 5.x,支持任意延迟时间。或者用两次延迟消息:先发一个5分钟延迟消息,消费时再发一个10分钟延迟消息,总计15分钟。丑陋但能用。
坑2:Redis ZSet延迟队列扫描时间不精确
Redis ZSet方案依赖定时扫描,扫描间隔1秒,理论上精度是1秒。但实际中,如果处理100个到期任务每个耗时20ms,总耗时2000ms,下次扫描就延迟了2秒,实际精度可能是2-3秒。
解决方案:缩短扫描间隔(500ms),限制每次批量处理数量(如50条),超时任务放到线程池异步处理,不阻塞扫描线程。
坑3:时间轮任务在服务重启后丢失
用HashedWheelTimer实现的订单超时,遇到K8s滚动发布,Pod重启,内存里的定时任务全部丢失。用户下单后30分钟内Pod恰好重启,订单永远不会超时取消,库存不释放,商品永远无法被其他用户购买。
解决方案:时间轮只适合可以容忍丢失的临时任务(如会话超时),核心业务(订单超时)必须用持久化的MQ或Redis方案。
坑4:RabbitMQ消息级TTL的延迟不准确问题
上一期讲过,队列头部有大量消息未消费时,消息级TTL过期的消息不会立即变成死信,要等排在前面的消息被消费。
实测:队列积压50万条,TTL=30分钟的消息,实际等待了4小时才变成死信。
解决方案:用队列级TTL,每个延迟级别单独一个队列,队列内不混放不同TTL的消息。
五、四种方案选型总结
| 方案 | 精度 | 可靠性 | 规模 | 适用场景 |
|---|---|---|---|---|
| RocketMQ原生 | 秒级(5.x任意) | 极高 | 亿级 | 核心业务(订单超时、支付提醒) |
| RabbitMQ TTL+死信 | 分钟级(固定级别) | 高 | 百万级 | 已有RabbitMQ,延迟精度要求不高 |
| Redis ZSet | 秒级 | 中(内存) | 千万级 | 中等规模,已有Redis基础设施 |
| 时间轮 | 毫秒级 | 低(内存) | 百万/秒 | 单机、临时任务、不需持久化 |
生产推荐:核心业务用RocketMQ,简单场景用Redis ZSet,时间轮只用于纯内存的临时调度。永远不要用轮询数据库的方案——那是技术债,早晚要还。
下一篇(第440期)讲消息积压的紧急处理,百万条消息堆积时如何快速扩容消费,生产事故现场的操作手册。
