RocketMQ顺序消息:全局顺序vs分区顺序的性能代价分析
RocketMQ顺序消息:全局顺序vs分区顺序的性能代价分析
适读人群:需要保证消息消费顺序(订单状态流转、账务流水)的Java工程师 | 阅读时长:约16分钟
开篇故事
订单状态流转是顺序消息最经典的场景:一个订单的状态变化必须按顺序处理——创建->支付->发货->收货,如果乱序处理,后果很严重。比如"收货"消息先于"发货"消息被处理,系统就会认为订单已完成但实际上还没发货,仓库永远不会发出这个包裹。
我们有一段时间没有使用顺序消息,用的是普通并发消息。正常情况下,同一个订单的消息创建时间有先后,消费也是按时间顺序来的,看起来没问题。直到一次大促,消息队列积压严重,系统扩容了消费者实例,多个消费者并发消费同一个订单的不同状态消息,"发货"消息比"支付"消息先被消费,导致系统抛出"订单未支付,不能发货"的异常,消息进入重试,最终积压了数万条无法消费的消息。
从那以后,我把所有订单状态流转的消息都改成了顺序消息。今天把顺序消息的原理和代价全部讲清楚。
一、为什么消息会乱序
RocketMQ中,一个Topic有多个MessageQueue(类似Kafka的Partition),普通消息的Producer按轮询策略把消息发到不同Queue,Consumer并发消费各Queue,天然无序。
要实现顺序,必须:同一业务实体(如同一订单)的消息发到同一个Queue,且该Queue串行消费(同一时刻只有一个线程处理)。
二、全局顺序 vs 分区顺序
2.1 全局顺序消息
定义:整个Topic的所有消息严格按发送顺序消费。
实现:Topic只有一个MessageQueue,一个Producer一个Consumer,完全串行。
代价:
- 无并发,吞吐量极低(通常只有普通消息的1/10到1/20)
- 任一环节故障(Broker、Consumer),整个系统暂停
- 生产中极少使用,仅用于流水号生成等极少数场景
2.2 分区顺序消息(生产主流)
定义:同一个分区键(如orderId)的消息严格有序,不同分区键之间可以并发。
实现:Producer根据orderId的Hash决定发到哪个Queue,相同orderId的消息永远发到同一个Queue;Consumer对每个Queue串行消费。
代价:
- 并发度受限于Queue数量(通常4-16个Queue)
- 某个Queue的消费者出问题,该Queue的消息暂停,不影响其他Queue
- 吞吐量约为普通消息的60-80%
三、完整Java实现
3.1 顺序消息Producer
/**
* RocketMQ顺序消息Producer
* 关键:使用MessageQueueSelector选择Queue
*/
@Service
@Slf4j
public class OrderEventOrderlyProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送顺序消息
* 相同orderId的消息会发到同一个MessageQueue
*
* @param orderId 订单ID(分区键)
* @param event 事件内容
* @param eventType 事件类型(决定消息的TAG)
*/
public void sendOrderEvent(String orderId, OrderEvent event, String eventType) {
// 构建消息
Message<OrderEvent> message = MessageBuilder
.withPayload(event)
.setHeader(RocketMQHeaders.KEYS, orderId)
.setHeader(RocketMQHeaders.TAGS, eventType)
.build();
// syncSendOrderly:第三个参数hashKey决定路由到哪个Queue
// 相同hashKey(orderId)的消息路由到同一个Queue
SendResult result = rocketMQTemplate.syncSendOrderly(
"order-events-orderly", // Topic
message,
orderId // hashKey:同一orderId → 同一Queue
);
log.info("顺序消息发送成功: orderId={}, eventType={}, queueId={}, msgId={}",
orderId, eventType,
result.getMessageQueue().getQueueId(),
result.getMsgId());
}
/**
* 批量发送同一订单的多个状态变更消息
* 注意:即使批量发送,也必须保证在同一个Queue
*/
public void sendOrderStatusFlow(String orderId) {
// 发送订单创建消息
sendOrderEvent(orderId, buildCreatedEvent(orderId), "CREATED");
// 注意:这三条消息都会发到同一个Queue
// 但发送是同步串行的,顺序由发送顺序保证
sendOrderEvent(orderId, buildPaidEvent(orderId), "PAID");
sendOrderEvent(orderId, buildShippedEvent(orderId), "SHIPPED");
}
private OrderEvent buildCreatedEvent(String orderId) {
return new OrderEvent(orderId, "CREATED", LocalDateTime.now());
}
private OrderEvent buildPaidEvent(String orderId) {
return new OrderEvent(orderId, "PAID", LocalDateTime.now());
}
private OrderEvent buildShippedEvent(String orderId) {
return new OrderEvent(orderId, "SHIPPED", LocalDateTime.now());
}
}3.2 顺序消息Consumer
/**
* RocketMQ顺序消息消费者
* 关键:实现RocketMQListener,且consumeMode = ConsumeMode.ORDERLY
*/
@Component
@RocketMQMessageListener(
topic = "order-events-orderly",
consumerGroup = "order-orderly-consumer-group",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式(关键)
messageModel = MessageModel.CLUSTERING,
maxReconsumeTimes = 3 // 最大重试次数(顺序消息重试会阻塞后续消息!)
)
@Slf4j
public class OrderEventOrderlyConsumer implements RocketMQListener<OrderEvent> {
private final OrderStateMachine orderStateMachine;
public OrderEventOrderlyConsumer(OrderStateMachine orderStateMachine) {
this.orderStateMachine = orderStateMachine;
}
@Override
public void onMessage(OrderEvent event) {
String orderId = event.getOrderId();
String eventType = event.getEventType();
log.info("顺序消费: orderId={}, eventType={}", orderId, eventType);
try {
// 状态机处理,保证状态流转的合法性
orderStateMachine.transition(orderId, eventType);
log.info("状态流转成功: orderId={}, {} -> {}",
orderId, eventType, orderStateMachine.currentState(orderId));
} catch (IllegalStateTransitionException e) {
// 状态流转不合法(说明乱序了,或者重复消费)
log.error("非法状态流转: orderId={}, eventType={}, currentState={}",
orderId, eventType, orderStateMachine.currentState(orderId));
// 顺序消息处理失败会导致后续消息阻塞!
// 如果无法处理,应该考虑跳过并告警,而不是无限重试
throw e; // 抛出异常后会重试(最多maxReconsumeTimes次)
}
}
}3.3 顺序消费的重试问题处理
/**
* 原生顺序消费者(更细粒度控制重试和跳过逻辑)
*/
@Component
@Slf4j
public class OrderOrderlyConsumerWithRetryControl {
/**
* 使用原生RocketMQ API,更精细控制顺序消费的重试行为
*
* 顺序消息重试的关键点:
* 返回SUCCESS:消费成功,处理下一条
* 返回SUSPEND_CURRENT_QUEUE_A_MOMENT:消费失败,稍后重试(会阻塞当前Queue!)
*/
@Bean
public DefaultMQPushConsumer orderOrderlyConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-orderly-group");
consumer.setNamesrvAddr("rocketmq-namesrv:9876");
consumer.subscribe("order-events-orderly", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true); // 自动提交offset
for (MessageExt msg : msgs) {
String orderId = msg.getKeys();
int reconsumeTimes = msg.getReconsumeTimes();
try {
OrderEvent event = JsonUtil.fromJson(
new String(msg.getBody()), OrderEvent.class);
processOrderEvent(event);
} catch (Exception e) {
log.error("顺序消息处理失败: orderId={}, reconsumeTimes={}",
orderId, reconsumeTimes, e);
// 超过最大重试次数,跳过该消息(避免无限阻塞)
if (reconsumeTimes >= 3) {
log.error("消息超过最大重试次数,跳过并告警: orderId={}", orderId);
sendAlarm(msg);
// 返回SUCCESS跳过该消息,继续处理后续消息
return ConsumeOrderlyStatus.SUCCESS;
}
// 稍后重试(会暂停当前Queue的消费约1秒)
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
return consumer;
}
private void processOrderEvent(OrderEvent event) {
// 实际业务处理
}
private void sendAlarm(MessageExt msg) {
// 告警逻辑
log.error("死信告警: msgId={}, keys={}", msg.getMsgId(), msg.getKeys());
}
}四、踩坑实录
坑1:顺序消息消费失败导致整个Queue阻塞
这是顺序消息最大的坑。普通消息消费失败,只是这条消息重试,其他消息不受影响。顺序消息消费失败,当前Queue的后续所有消息都会暂停等待,直到失败消息重试成功(或超过最大重试次数)。
我们有一次因为业务代码Bug,某类订单消息处理抛出NPE,整个Queue阻塞了20分钟,该Queue上积压了数万条消息。
解决方案:
- 顺序消息的消费逻辑必须足够健壮,不能轻易抛出未处理的异常
- 设置合理的
maxReconsumeTimes(建议3-5次),超过后跳过并告警 - 对每种异常做分类处理:可重试异常(网络超时)重试,永久性异常(数据错误)直接跳过+告警
坑2:Hash分布不均导致某些Queue过热
用orderId % queueCount做Hash路由,如果orderId是时间戳前缀(如2024091200001)的递增ID,大量新订单会路由到相同的Queue(因为时间前缀相同),造成某些Queue过热,其他Queue空闲。
解决方案:用String.hashCode() % queueCount代替简单取模,或者用一致性Hash确保分布更均匀。
坑3:顺序消息的Broker Slave不提供消费
顺序消息只能从Broker Master消费(即使配置了Slave),因为Master需要锁住Queue才能保证顺序。Master宕机时,顺序消息消费会暂停,直到新Master选举完成。
影响:顺序消息场景下,Broker Master宕机的影响比普通消息大得多。需要确保Broker的高可用配置(Master+Slave+自动主从切换)。
坑4:Consumer重启导致顺序消费暂时乱序
Consumer实例重启时,会释放持有的Queue锁,其他Consumer实例可能获得这些Queue,但新Consumer还没有从上次的offset开始消费,可能从较早的offset重新消费(如果之前的offset没来得及提交),这期间产生了重复消费,但重复的消息打破了顺序(相同状态被处理两次)。
解决方案:消费逻辑必须是幂等的,重复处理同一条消息不能产生错误效果(比如状态机检查当前状态是否已经是目标状态,是则直接跳过)。
五、性能代价量化对比
在我们的生产测试数据中(单Master,16个Queue):
| 消息类型 | TPS | P99延迟 | CPU消耗 | 说明 |
|---|---|---|---|---|
| 普通并发消息 | 18万 | 5ms | 30% | 基准 |
| 分区顺序消息(16Q) | 12万 | 8ms | 35% | TPS降33% |
| 全局顺序消息(1Q) | 1万 | 12ms | 15% | TPS降94%! |
结论:分区顺序消息的代价是可以接受的(33%),全局顺序消息代价极大(94%),生产中几乎不用全局顺序消息。
顺序消息的使用原则:只在真正需要顺序保证的业务实体粒度使用分区顺序,不要为了方便就用全局顺序。
下一篇(第443期)讲消费者背压处理,动态调整并发度与fetch.max.bytes的协调,应对下游系统处理能力不均衡的场景。
