RabbitMQ 死信队列与延迟队列——电商延时订单取消的完整实现方案
RabbitMQ 死信队列与延迟队列——电商延时订单取消的完整实现方案
适读人群:使用 RabbitMQ 的 Java 后端开发者,特别是电商、O2O 业务场景 | 阅读时长:约16分钟 | 核心价值:掌握 RabbitMQ 死信队列与延迟队列完整实现,解决延时任务核心痛点
订单超时取消这件"小事"
做过电商后端的人,都绕不开一个需求:用户下单后 30 分钟不付款,自动取消订单并释放库存。
听起来很简单,但真正做起来坑无数。
我的前同事老陈,刚接手公司电商项目时,发现前任程序员是这么实现的:一个定时任务,每分钟扫一次订单表,把超时未付款的订单捞出来取消。
"这不挺好的吗,简单直接。"老陈说。
我问他:"你们现在订单量多少?"
"高峰期一天100万单。"
"那单表有多少条待付款订单?"
老陈沉默了。他去查了一下,高峰期待付款订单约 30 万条,定时任务每分钟扫 30 万条,数据库 CPU 直接打满,整个系统在每分钟的整点前后都会卡顿 3-5 秒。
这是一个典型的用轮询代替事件驱动的错误设计。解决这类延时任务问题,RabbitMQ 的死信队列 + 延迟队列是最优雅的方案之一。
死信队列:理解核心机制
什么是死信(Dead Letter)
死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 的一个核心特性。当消息满足以下条件时,会变成"死信":
- 消息被消费者拒绝(nack/reject)且不重新入队
- 消息过期(TTL 到期)
- 队列达到最大长度限制
死信产生后,RabbitMQ 会将这条消息路由到配置好的死信交换机(Dead Letter Exchange,DLX),再由 DLX 路由到死信队列,等待进一步处理。
利用 TTL + 死信队列实现延迟
延迟队列实现原理:
- 创建一个普通队列(不消费),设置 TTL(消息最大存活时间),绑定死信交换机
- 消息发送到这个普通队列,不被消费,等待 TTL 到期
- TTL 到期后,消息变成死信,路由到死信交换机,再进入真正的处理队列
- 消费者消费处理队列中的"到期消息",执行延时任务
完整代码实现
RabbitMQ 配置
@Configuration
public class RabbitMQDelayConfig {
// 延迟队列的交换机名称
public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";
// 真正处理消息的交换机(接收死信)
public static final String ORDER_DEAD_EXCHANGE = "order.dead.exchange";
// 延迟队列(消息在这里等待TTL到期,没有消费者)
public static final String ORDER_DELAY_QUEUE = "order.delay.queue";
// 死信队列(消息到期后流入这里,有消费者处理)
public static final String ORDER_DEAD_QUEUE = "order.dead.queue";
// Routing key
public static final String ORDER_DELAY_ROUTING_KEY = "order.delay";
public static final String ORDER_DEAD_ROUTING_KEY = "order.dead";
/**
* 延迟交换机(Direct 类型)
*/
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange(ORDER_DELAY_EXCHANGE, true, false);
}
/**
* 死信交换机(Direct 类型)
*/
@Bean
public DirectExchange orderDeadExchange() {
return new DirectExchange(ORDER_DEAD_EXCHANGE, true, false);
}
/**
* 延迟队列(核心配置)
* 设置 TTL = 30分钟
* 设置死信交换机和路由 key
*/
@Bean
public Queue orderDelayQueue() {
Map<String, Object> args = new HashMap<>();
// 消息过期时间:30分钟 = 1800000毫秒
args.put("x-message-ttl", 1800000);
// 队列最大长度(可选,防止队列无限增长)
args.put("x-max-length", 500000);
// 死信交换机:消息过期后路由到哪个交换机
args.put("x-dead-letter-exchange", ORDER_DEAD_EXCHANGE);
// 死信路由 key
args.put("x-dead-letter-routing-key", ORDER_DEAD_ROUTING_KEY);
return new Queue(ORDER_DELAY_QUEUE, true, false, false, args);
}
/**
* 死信队列(有消费者)
*/
@Bean
public Queue orderDeadQueue() {
return new Queue(ORDER_DEAD_QUEUE, true, false, false);
}
/**
* 绑定:延迟队列 -> 延迟交换机
*/
@Bean
public Binding delayQueueBinding() {
return BindingBuilder
.bind(orderDelayQueue())
.to(orderDelayExchange())
.with(ORDER_DELAY_ROUTING_KEY);
}
/**
* 绑定:死信队列 -> 死信交换机
*/
@Bean
public Binding deadQueueBinding() {
return BindingBuilder
.bind(orderDeadQueue())
.to(orderDeadExchange())
.with(ORDER_DEAD_ROUTING_KEY);
}
}发送延迟消息(下单时调用)
@Service
public class OrderDelayMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 下单后发送延迟取消消息
* @param orderId 订单ID
* @param delayMillis 延迟时间(毫秒),不传则使用队列默认TTL
*/
public void sendOrderTimeoutMessage(String orderId, Long delayMillis) {
OrderTimeoutMessage message = new OrderTimeoutMessage();
message.setOrderId(orderId);
message.setSendTime(System.currentTimeMillis());
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
properties.setMessageId(UUID.randomUUID().toString()); // 消息唯一ID,用于幂等
// 如果需要不同的超时时间,可以在消息级别设置 TTL(会覆盖队列的 TTL)
// 注意:消息级别的 TTL 只能比队列 TTL 短,不能更长
if (delayMillis != null) {
properties.setExpiration(String.valueOf(delayMillis));
}
Message rabbitMessage = new Message(
JSON.toJSONBytes(message), properties);
rabbitTemplate.send(
RabbitMQDelayConfig.ORDER_DELAY_EXCHANGE,
RabbitMQDelayConfig.ORDER_DELAY_ROUTING_KEY,
rabbitMessage
);
log.info("已发送订单超时消息,orderId={}", orderId);
}
}@Data
public class OrderTimeoutMessage {
private String orderId;
private Long sendTime;
}消费者:处理超时订单
@Component
public class OrderTimeoutConsumer {
@Autowired
private OrderService orderService;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@RabbitListener(queues = RabbitMQDelayConfig.ORDER_DEAD_QUEUE,
containerFactory = "rabbitListenerContainerFactory")
public void handleOrderTimeout(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
try {
// 幂等性检查:防止消息重复消费
String idempotencyKey = "order:timeout:processed:" + messageId;
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(idempotencyKey, "1", Duration.ofDays(3));
if (Boolean.FALSE.equals(isNew)) {
log.warn("消息已处理,跳过。messageId={}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
// 解析消息
OrderTimeoutMessage timeoutMsg = JSON.parseObject(
message.getBody(), OrderTimeoutMessage.class);
String orderId = timeoutMsg.getOrderId();
log.info("收到订单超时消息,orderId={},发送时间={},延迟={}ms",
orderId,
timeoutMsg.getSendTime(),
System.currentTimeMillis() - timeoutMsg.getSendTime());
// 执行超时取消逻辑
orderService.cancelTimeoutOrder(orderId);
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理订单超时消息失败,orderId={}", e);
// 根据异常类型决定是否重新入队
// 业务异常(如订单已取消):直接丢弃,不重入队
// 技术异常(如DB超时):重新入队,等待重试
if (isBusinessException(e)) {
channel.basicNack(deliveryTag, false, false); // 不重入队,变成死信
} else {
channel.basicNack(deliveryTag, false, true); // 重新入队
}
}
}
private boolean isBusinessException(Exception e) {
return e instanceof BusinessException;
}
}订单取消业务逻辑
@Service
@Transactional
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryService inventoryService;
/**
* 超时取消订单
* 注意:这里必须做幂等处理,因为消息可能被重复消费
*/
public void cancelTimeoutOrder(String orderId) {
Order order = orderMapper.selectByOrderId(orderId);
if (order == null) {
log.warn("订单不存在,orderId={}", orderId);
return;
}
// 只有待付款状态的订单才能被超时取消
if (order.getStatus() != OrderStatus.PENDING_PAYMENT) {
log.info("订单状态不符,跳过取消。orderId={},当前状态={}",
orderId, order.getStatus());
return;
}
// 乐观锁更新状态,防止并发修改
int affected = orderMapper.cancelOrder(orderId, OrderStatus.PENDING_PAYMENT);
if (affected == 0) {
log.warn("订单状态已变更,取消失败(可能已付款)。orderId={}", orderId);
return;
}
// 释放库存
inventoryService.releaseStock(order.getSkuId(), order.getQuantity());
log.info("订单超时取消成功,orderId={}", orderId);
}
}延迟插件方案:更灵活的实现
上面的 TTL + DLX 方案有一个缺点:队列级别的 TTL 是固定的,如果你需要不同订单有不同的超时时间(比如秒杀订单15分钟,普通订单30分钟),消息级别的 TTL 虽然支持,但会有一个问题——队头消息不过期,后面的消息即使到期也不会被投递(因为 RabbitMQ 只检查队头消息的过期情况)。
这时候可以使用 rabbitmq-delayed-message-exchange 插件:
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange@Configuration
public class DelayedPluginConfig {
public static final String DELAYED_EXCHANGE = "delayed.exchange";
public static final String DELAYED_QUEUE = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed.key";
/**
* 使用延迟插件的交换机
*/
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE, true);
}
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue())
.to(delayedExchange())
.with(DELAYED_ROUTING_KEY)
.noargs();
}
}
// 发送延迟消息
public void sendWithDelay(String orderId, int delaySeconds) {
rabbitTemplate.convertAndSend(
DelayedPluginConfig.DELAYED_EXCHANGE,
DelayedPluginConfig.DELAYED_ROUTING_KEY,
orderId,
message -> {
// 设置延迟时间(毫秒)
message.getMessageProperties().setDelay(delaySeconds * 1000);
return message;
}
);
}三大踩坑实录
坑一:消息积压导致超时时间不准
现象: 30分钟超时的订单,实际上 35-40 分钟才被取消,有时候甚至更长。
原因: TTL + DLX 方案中,消息只有到达队头时才会被检查是否过期。如果死信队列里有大量消息积压,后来的消息即使早过期了,也要排队等前面的消息被消费完。
解法:
- 增加死信队列的消费者并发数,保证处理速度
- 改用延迟插件(每条消息独立计时,不受队头影响)
- 业务层做容忍:超时取消允许 5 分钟的误差
坑二:消费者重启后消息重复消费
现象: 消费者发版重启后,发现有些订单被重复取消(已经取消的订单又收到了取消消息)。
原因: 消费者在处理消息中途重启,消息没来得及 ACK,RabbitMQ 将消息重新投递。业务逻辑没有幂等保护,导致重复取消。
解法: 消费者必须做幂等,使用消息 ID + Redis SET NX 或数据库唯一索引来防止重复处理。代码示例已在上文中体现。
坑三:死信队列的死信没有兜底
现象: 消费者处理超时订单时抛出了异常,消息进入了死信(basicNack 不重入队),但我们没有为"死信队列的死信"设置处理机制,消息就消失了,对应的订单永远没被取消。
原因: 在 RabbitMQ 中,死信队列的消息也可能再次产生死信(消费失败 + 不重入队),如果没有为死信队列配置它自己的死信交换机,消息就彻底丢失了。
解法: 为业务死信队列也配置死信交换机,并监控这个"二次死信队列",触发人工告警:
// 为死信队列也配置死信交换机(二级死信)
Map<String, Object> deadArgs = new HashMap<>();
deadArgs.put("x-dead-letter-exchange", "order.dead2.exchange");
deadArgs.put("x-dead-letter-routing-key", "order.dead2");
Queue orderDeadQueue = new Queue(ORDER_DEAD_QUEUE, true, false, false, deadArgs);与定时任务方案对比
回到文章开头老陈的问题,我给他做了个对比:
| 维度 | 定时任务扫表 | RabbitMQ 延迟队列 |
|---|---|---|
| 数据库压力 | 高(全表扫描) | 无(事件驱动) |
| 延迟精度 | 取决于扫描频率(分钟级) | 秒级 |
| 水平扩展 | 需要分布式锁 | 天然支持多消费者 |
| 实现复杂度 | 简单 | 中等 |
| 适合量级 | < 10 万单/天 | 100 万单+/天 |
老陈最终在公司推行了 RabbitMQ 延迟队列方案,数据库 CPU 从高峰期 85% 降到了 45%,系统卡顿问题彻底消失了。
