RabbitMQ死信队列实战:消息过期、拒绝、队列满三种死信场景
RabbitMQ死信队列实战:消息过期、拒绝、队列满三种死信场景
适读人群:使用RabbitMQ处理异常消息、需要实现延迟重试和兜底处理的Java工程师 | 阅读时长:约15分钟
开篇故事
2021年,我们的短信通知服务接了一个新需求:用户注册后发送欢迎短信,如果短信平台临时故障(返回失败),需要重试三次,每次间隔5分钟,三次都失败后人工介入。
当时用的是RabbitMQ,我查了一圈资料,发现这个需求可以完美用死信队列来实现:正常队列设置TTL(消息5分钟过期),过期后进入死信队列,死信队列的消费者再次尝试发送。
但实现过程中踩了不少坑:TTL+死信的组合有顺序要求,队列满触发死信和预期不同,死信队列也可能积压……花了两天才彻底搞清楚。今天把这些坑全部讲清楚。
一、死信的三种来源
RabbitMQ的死信(Dead Letter)是指正常队列里"无法正常处理"的消息,有三种情况会产生死信:
- 消息TTL过期:消息在队列中存活时间超过TTL(Queue级别或Message级别)
- 消息被拒绝:消费者调用
basicNack或basicReject,且requeue=false - 队列达到最大长度:队列消息数超过
x-max-length,或消息总字节数超过x-max-length-bytes,队首消息被驱逐为死信
二、三种死信场景的完整原理
2.1 场景一:TTL过期死信(实现延迟消息)
队列级TTL:队列里所有消息的统一过期时间,由x-message-ttl参数控制。
消息级TTL:每条消息单独设置expiration属性,单位毫秒。
注意:队列级TTL和消息级TTL同时存在时,取两者中的较小值。
利用TTL+死信实现"延迟消息"是RabbitMQ的经典模式:
- 发送消息到有TTL的正常队列,不绑定消费者(消息一定会过期)
- 过期后进入死信队列,死信队列的消费者处理"延迟到期"的业务
2.2 场景二:消息被拒绝死信
消费者主动拒绝消息(basicNack(deliveryTag, false, false)),消息进入死信队列。这是最常用的场景:消息格式错误、业务逻辑异常、不可重试的错误。
关键参数区别:
basicNack(tag, multiple, requeue=true):消息重新入原队列(可能无限重试!)basicNack(tag, multiple, requeue=false):消息进入死信队列basicReject(tag, requeue=false):拒绝单条消息,进入死信队列
2.3 场景三:队列满死信
# 队列长度限制
x-max-length: 10000 # 最多10000条消息
x-max-length-bytes: 10MB # 最多10MB
# 溢出行为(默认drop-head:丢弃队首最老的消息,使之成为死信)
x-overflow: drop-head # 队首消息变成死信,发到DLX
# 或
x-overflow: reject-publish # 拒绝新消息(Publisher Confirm会收到Nack)三、完整Java实现
3.1 死信队列配置
/**
* RabbitMQ死信队列完整配置
* 实现:SMS通知三次重试 + 最终死信人工处理
*/
@Configuration
public class RabbitMQDeadLetterConfig {
// ========== Exchange名称 ==========
public static final String NORMAL_EXCHANGE = "sms.normal.exchange";
public static final String DLX_EXCHANGE = "sms.dlx.exchange";
public static final String RETRY_EXCHANGE = "sms.retry.exchange";
// ========== Queue名称 ==========
public static final String NORMAL_QUEUE = "sms.normal.queue";
public static final String DLX_QUEUE = "sms.dlx.queue"; // 最终死信队列
public static final String RETRY_QUEUE_5S = "sms.retry.5s.queue"; // 5秒后重试
public static final String RETRY_QUEUE_30S = "sms.retry.30s.queue";// 30秒后重试
// ========== Exchange声明 ==========
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE, true, false);
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE, true, false);
}
@Bean
public DirectExchange retryExchange() {
return new DirectExchange(RETRY_EXCHANGE, true, false);
}
// ========== 正常业务队列 ==========
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE)
// 死信转发配置:死信发到dlxExchange
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
// 死信路由键(不配则使用原消息路由键)
.withArgument("x-dead-letter-routing-key", "sms.dead")
// 队列最大长度:防止积压
.withArgument("x-max-length", 50000)
.build();
}
// ========== 延迟重试队列(TTL=5秒,无消费者,过期后转到normalExchange重新消费)==========
@Bean
public Queue retryQueue5s() {
return QueueBuilder.durable(RETRY_QUEUE_5S)
.withArgument("x-dead-letter-exchange", NORMAL_EXCHANGE) // 过期后回到正常队列
.withArgument("x-dead-letter-routing-key", "sms.send")
.withArgument("x-message-ttl", 5000) // 5秒TTL
.withArgument("x-max-length", 10000)
.build();
}
@Bean
public Queue retryQueue30s() {
return QueueBuilder.durable(RETRY_QUEUE_30S)
.withArgument("x-dead-letter-exchange", NORMAL_EXCHANGE)
.withArgument("x-dead-letter-routing-key", "sms.send")
.withArgument("x-message-ttl", 30000) // 30秒TTL
.withArgument("x-max-length", 10000)
.build();
}
// ========== 最终死信队列(三次重试都失败后到这里)==========
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(DLX_QUEUE).build();
}
// ========== 绑定关系 ==========
@Bean
public Binding normalBinding() {
return BindingBuilder.bind(normalQueue())
.to(normalExchange())
.with("sms.send");
}
@Bean
public Binding retry5sBinding() {
return BindingBuilder.bind(retryQueue5s())
.to(retryExchange())
.with("sms.retry.5s");
}
@Bean
public Binding retry30sBinding() {
return BindingBuilder.bind(retryQueue30s())
.to(retryExchange())
.with("sms.retry.30s");
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with("sms.dead");
}
}3.2 短信发送服务(带重试逻辑)
/**
* 短信发送消费者
* 失败时根据重试次数路由到不同延迟队列
*/
@Component
@Slf4j
public class SmsConsumer {
private static final int MAX_RETRY = 3;
private static final String RETRY_COUNT_HEADER = "x-retry-count";
private final RabbitTemplate rabbitTemplate;
private final SmsGateway smsGateway;
private final AlarmService alarmService;
public SmsConsumer(RabbitTemplate rabbitTemplate,
SmsGateway smsGateway,
AlarmService alarmService) {
this.rabbitTemplate = rabbitTemplate;
this.smsGateway = smsGateway;
this.alarmService = alarmService;
}
/**
* 正常短信队列消费者
*/
@RabbitListener(
queues = RabbitMQDeadLetterConfig.NORMAL_QUEUE,
ackMode = "MANUAL"
)
public void consumeSms(SmsMessage smsMsg,
Message message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
// 获取重试次数
Integer retryCount = (Integer) message.getMessageProperties()
.getHeaders().getOrDefault(RETRY_COUNT_HEADER, 0);
log.info("处理短信: mobile={}, retryCount={}", smsMsg.getMobile(), retryCount);
try {
// 调用短信网关
boolean success = smsGateway.send(smsMsg.getMobile(), smsMsg.getContent());
if (success) {
channel.basicAck(deliveryTag, false);
log.info("短信发送成功: mobile={}", smsMsg.getMobile());
} else {
// 发送失败,进行重试路由
handleRetry(channel, message, smsMsg, deliveryTag, retryCount);
}
} catch (Exception e) {
log.error("短信发送异常: mobile={}, error={}", smsMsg.getMobile(), e.getMessage());
handleRetry(channel, message, smsMsg, deliveryTag, retryCount);
}
}
/**
* 重试路由逻辑
* 第1次失败 -> 5秒后重试
* 第2次失败 -> 30秒后重试
* 第3次失败 -> 进入死信队列,人工处理
*/
private void handleRetry(Channel channel, Message message,
SmsMessage smsMsg, long deliveryTag,
int retryCount) throws IOException {
if (retryCount >= MAX_RETRY) {
// 超过最大重试次数,进入死信队列
log.warn("短信重试次数已耗尽: mobile={}, retryCount={}",
smsMsg.getMobile(), retryCount);
// requeue=false,消息会进入正常队列配置的DLX(sms.dlx.exchange)
channel.basicNack(deliveryTag, false, false);
return;
}
// 重新发送到对应的延迟队列
MessageProperties props = new MessageProperties();
props.getHeaders().put(RETRY_COUNT_HEADER, retryCount + 1);
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message retryMessage = new Message(message.getBody(), props);
String retryRoutingKey = retryCount == 0 ? "sms.retry.5s" : "sms.retry.30s";
rabbitTemplate.send(
RabbitMQDeadLetterConfig.RETRY_EXCHANGE,
retryRoutingKey,
retryMessage
);
// 原消息ACK(避免重复消费)
channel.basicAck(deliveryTag, false);
log.info("短信已加入重试队列: mobile={}, nextRetry={}, retryCount={}",
smsMsg.getMobile(), retryRoutingKey, retryCount + 1);
}
/**
* 死信队列消费者(最终兜底)
*/
@RabbitListener(queues = RabbitMQDeadLetterConfig.DLX_QUEUE)
public void consumeDeadLetter(SmsMessage smsMsg, Message message) {
log.error("短信最终失败,进入死信队列: mobile={}", smsMsg.getMobile());
// 1. 发送告警
alarmService.sendAlert(
"SMS_DEAD_LETTER",
"短信发送失败三次: mobile=" + smsMsg.getMobile()
);
// 2. 记录到数据库,方便人工查询
recordDeadLetter(smsMsg, message);
// 3. 可选:发送邮件/钉钉通知运营人员
}
private void recordDeadLetter(SmsMessage smsMsg, Message message) {
// 记录到数据库的逻辑
log.info("死信记录入库: mobile={}, msgId={}",
smsMsg.getMobile(),
message.getMessageProperties().getMessageId());
}
}3.3 发送短信的入口
/**
* 短信发送入口
*/
@Service
@Slf4j
public class SmsService {
private final RabbitTemplate rabbitTemplate;
public SmsService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendSms(String mobile, String content, String msgId) {
SmsMessage smsMsg = new SmsMessage();
smsMsg.setMobile(mobile);
smsMsg.setContent(content);
smsMsg.setMsgId(msgId);
smsMsg.setCreateTime(LocalDateTime.now());
MessageProperties props = new MessageProperties();
props.setMessageId(msgId);
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
props.getHeaders().put("x-retry-count", 0);
try {
String body = new ObjectMapper().writeValueAsString(smsMsg);
Message message = new Message(body.getBytes(StandardCharsets.UTF_8), props);
rabbitTemplate.send(
RabbitMQDeadLetterConfig.NORMAL_EXCHANGE,
"sms.send",
message
);
log.info("短信消息入队: mobile={}, msgId={}", mobile, msgId);
} catch (Exception e) {
log.error("短信消息入队失败: mobile={}", mobile, e);
throw new RuntimeException("消息入队失败", e);
}
}
}四、踩坑实录
坑1:队列TTL和消息TTL的优先级搞错
队列TTL(x-message-ttl)对所有消息统一过期,从消息进入队列时开始计时。消息级TTL(expiration属性)也从进入队列时开始计时。
两者取最小值——但有一个重要区别:队列级TTL过期的消息立即变成死信;消息级TTL过期后只有在消息到达队首时才变成死信。
这意味着:用消息级TTL实现延迟消息时,如果队列里有其他消息堵在前面,你的消息即使已经"过期",也不会立即进入死信队列,要等前面的消息全部消费完它才轮到它。
结论:用死信队列实现延迟消息,必须用队列级TTL,且该队列只存放特定延迟时长的消息,不能混放不同TTL的消息。
坑2:死信队列的消息路由键会改变
消息进入死信队列时,RabbitMQ会在消息Header里添加x-death属性,记录死信原因、原队列名、路由键等信息。同时,如果正常队列配置了x-dead-letter-routing-key,消息路由键会被替换。
这导致一个常见错误:没有配置x-dead-letter-routing-key,期望原路由键路由到死信队列,但原路由键在死信Exchange上没有绑定,消息被丢弃!
检查清单:
x-dead-letter-exchange:必须配置,指向哪个Exchangex-dead-letter-routing-key:如果不配,使用原路由键;如果配了,替换原路由键- 死信Exchange上必须有对应路由键的绑定
坑3:basicNack和basicReject的requeue参数
channel.basicNack(deliveryTag, false, true):消息重新入队,会被立即再次消费,如果处理一直失败会无限循环消费!
我们曾经因为这个无限循环,导致消费者CPU 100%,消息处理速度极慢,整个队列积压。
正确写法:永久性错误用requeue=false,临时性错误(如网络超时)才用requeue=true,但要有最大重试次数限制(通过Header记录重试次数)。
坑4:死信队列也会积压
死信队列如果没有消费者(只用于告警),消息会无限积压,最终导致内存或磁盘告警,拖垮整个RabbitMQ集群。
解决方案:死信队列也要配置消费者(哪怕只是记录日志),或者配置x-message-ttl设置死信的过期时间,或者配置x-max-length限制死信队列长度。
坑5:队列已存在时修改参数失败
如果正常队列已经存在(不带死信配置),再声明同名队列并加上x-dead-letter-exchange参数,会抛出channel error异常,因为队列参数不匹配。
解决方案:修改队列参数必须先删除旧队列再重新创建,生产环境中这需要停机维护时间窗口,或者使用新的队列名。
五、总结与延伸
死信队列的核心价值:把"不正常"的消息从主流程中剥离,统一在死信队列中处理,避免异常消息阻塞正常消息的消费。
三种死信场景的使用建议:
- TTL过期死信:实现延迟消息、延迟重试,是RabbitMQ最经典的用法之一
- 拒绝死信:处理永久性错误(格式错误、业务异常),配合重试次数限制
- 队列满死信:配合
x-max-length做背压保护,防止积压拖垮系统
生产环境建议每个业务队列都配置死信队列,死信队列接一个告警消费者,至少保证能及时发现异常消息。
下一篇(第438期)讲Kafka性能调优,batch.size、linger.ms、压缩算法这三个参数到底怎么配才能在吞吐量和延迟之间找到最优点。
