RocketMQ事务消息原理:Half消息、回查机制与两阶段提交
RocketMQ事务消息原理:Half消息、回查机制与两阶段提交
适读人群:需要解决"消息发送与本地事务原子性"问题的Java工程师 | 阅读时长:约17分钟
开篇故事
电商系统中有一个经典难题:用户下单后,订单服务要同时完成两件事:
- 在数据库里创建订单记录(本地事务)
- 向消息队列发送"订单创建成功"消息,触发库存扣减、积分增加等下游操作
问题来了:这两件事如何保证原子性?
先发消息再写DB:消息发出去了,DB写失败,订单不存在,但库存和积分已经被扣了。
先写DB再发消息:DB写成功了,消息发送失败(网络超时),库存和积分没扣,数据不一致。
2019年的时候,我们的老系统用了一个"土方法":写DB成功后,用定时任务扫描未发送的消息补偿。这个方案有明显的延迟(最多30秒),而且定时任务扫描全表对数据库压力很大,每天都有几十条消息因为定时任务挂掉而永远没发出去,靠人工对账补数据。
后来换成RocketMQ事务消息,彻底解决了这个问题。今天把这套机制讲透。
一、问题本质:消息发送与本地事务的原子性
这个问题的本质是分布式事务,但我们不需要引入重量级的两阶段提交协议(2PC),因为消息队列只是一个通知机制,我们实际上需要的是:
- 要么:本地事务成功 AND 消息被消费者看见
- 要么:本地事务失败 AND 消息不可见(回滚)
RocketMQ用一个精巧的Half消息(半事务消息)机制解决了这个问题,在不引入分布式协调器的情况下,实现了生产者本地事务与消息投递的原子性。
二、Half消息与两阶段提交原理
2.1 Half消息是什么
Half消息(也叫Prepare消息)是一种特殊状态的消息:它已经发到Broker并持久化了,但消费者看不见它,无法消费。只有当生产者确认提交(Commit)后,消息才会变为可见状态。
RocketMQ在Broker内部有一个特殊的Topic:RMQ_SYS_TRANS_HALF_TOPIC,所有Half消息都临时存储在这里,消费者订阅的是用户定义的业务Topic,所以看不到Half消息。
2.2 完整的两阶段流程
2.3 回查机制的细节
回查是整个事务消息机制的"兜底"环节。当Producer在规定时间内没有发送Commit或Rollback时,Broker会主动回查Producer的事务状态。
回查参数:
# Broker端配置
# 回查间隔(第一次回查时间,默认60秒)
transactionCheckInterval=60000
# 最大回查次数(超过后强制回滚)
transactionCheckMax=15
# 事务超时时间(超过后开始回查)
transactionTimeout=6000回查的关键是:Producer端必须实现checkLocalTransaction方法,通过查询本地数据库来判断事务是否成功。这要求本地事务必须有查询状态的能力(比如订单表有状态字段)。
三、完整Java代码实现
3.1 RocketMQ事务消息生产者
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* RocketMQ事务消息生产者配置
* 解决:订单创建与消息发送的原子性
*/
@Configuration
public class RocketMQTransactionProducerConfig {
@Bean("transactionProducer")
public TransactionMQProducer transactionProducer(
TransactionListener transactionListener) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("order-transaction-group");
producer.setNamesrvAddr("rocketmq-namesrv:9876");
// 事务回查线程池(专用线程,不要用公共线程池)
ExecutorService checkThreadPool = Executors.newFixedThreadPool(
5,
r -> {
Thread t = new Thread(r, "tx-check-thread");
t.setDaemon(true);
return t;
}
);
producer.setExecutorService(checkThreadPool);
producer.setTransactionListener(transactionListener);
// 发送超时时间
producer.setSendMsgTimeout(10000);
// 重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.start();
return producer;
}
}
/**
* 事务监听器实现
* 核心逻辑:
* 1. executeLocalTransaction:执行本地事务(INSERT订单)
* 2. checkLocalTransaction:回查时检查本地事务状态
*/
@Component
@Slf4j
public class OrderTransactionListener implements TransactionListener {
private final OrderRepository orderRepository;
private final TransactionLogRepository txLogRepository;
public OrderTransactionListener(OrderRepository orderRepository,
TransactionLogRepository txLogRepository) {
this.orderRepository = orderRepository;
this.txLogRepository = txLogRepository;
}
/**
* 执行本地事务
* Half消息发送成功后,Broker回调此方法执行本地事务
*
* @param msg Half消息
* @param arg 业务参数(sendMessageInTransaction时传入)
* @return 事务状态
*/
@Override
@Transactional(rollbackFor = Exception.class)
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getTransactionId();
try {
// 从消息属性中获取业务参数
CreateOrderRequest request = (CreateOrderRequest) arg;
// 1. 创建订单
Order order = new Order();
order.setOrderNo(request.getOrderNo());
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setTransactionId(transactionId); // 关键:记录事务ID,用于回查
orderRepository.save(order);
// 2. 记录事务日志(用于回查)
TransactionLog txLog = new TransactionLog();
txLog.setTransactionId(transactionId);
txLog.setStatus(TransactionStatus.COMMITTED);
txLog.setCreateTime(LocalDateTime.now());
txLogRepository.save(txLog);
log.info("本地事务执行成功: transactionId={}, orderNo={}",
transactionId, request.getOrderNo());
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("本地事务执行失败: transactionId={}", transactionId, e);
// 异常自动回滚(@Transactional),返回回滚状态
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 回查本地事务状态
* 当Broker没收到Commit/Rollback时,定期回调此方法
*
* @param msg Half消息(包含transactionId)
* @return 事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
log.info("收到事务回查: transactionId={}", transactionId);
try {
// 方案1:查询事务日志表(推荐,最可靠)
TransactionLog txLog = txLogRepository.findByTransactionId(transactionId);
if (txLog == null) {
// 没有记录:本地事务未执行或已回滚
// 可能是executeLocalTransaction还没开始,给一次机会
log.warn("事务日志不存在,返回UNKNOW继续等待: {}", transactionId);
return LocalTransactionState.UNKNOW;
}
switch (txLog.getStatus()) {
case COMMITTED:
log.info("事务已提交: {}", transactionId);
return LocalTransactionState.COMMIT_MESSAGE;
case ROLLED_BACK:
log.info("事务已回滚: {}", transactionId);
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
// 执行中,继续等待
return LocalTransactionState.UNKNOW;
}
} catch (Exception e) {
log.error("回查事务状态异常: transactionId={}", transactionId, e);
// 查询异常时返回UNKNOW,等下次回查
// 注意:不要返回ROLLBACK,因为可能只是临时异常
return LocalTransactionState.UNKNOW;
}
}
}3.2 业务服务:发送事务消息
/**
* 订单服务:使用事务消息保证原子性
*/
@Service
@Slf4j
public class OrderService {
private final TransactionMQProducer transactionProducer;
private final ObjectMapper objectMapper;
public OrderService(@Qualifier("transactionProducer")
TransactionMQProducer transactionProducer,
ObjectMapper objectMapper) {
this.transactionProducer = transactionProducer;
this.objectMapper = objectMapper;
}
/**
* 创建订单(带事务消息保障)
*
* 流程:
* 1. 发送Half消息到Broker
* 2. Broker回调executeLocalTransaction(写DB)
* 3. DB成功 -> Commit消息 -> 下游消费
* 4. DB失败 -> Rollback消息 -> 消息不可见
*/
public CreateOrderResponse createOrder(CreateOrderRequest request) {
// 构建消息
String msgBody;
try {
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderNo(request.getOrderNo());
event.setUserId(request.getUserId());
event.setItems(request.getItems());
event.setAmount(request.getAmount());
event.setCreateTime(LocalDateTime.now());
msgBody = objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new RuntimeException("消息序列化失败", e);
}
Message message = new Message(
"order-created-topic", // Topic
"order-event", // Tag
request.getOrderNo(), // Key(业务唯一键,方便查询)
msgBody.getBytes(StandardCharsets.UTF_8)
);
// 设置消息属性(可以在Consumer端获取)
message.putUserProperty("userId", String.valueOf(request.getUserId()));
message.putUserProperty("orderNo", request.getOrderNo());
try {
// 发送事务消息
// arg参数会传给executeLocalTransaction
TransactionSendResult result = transactionProducer.sendMessageInTransaction(
message, request
);
log.info("事务消息发送结果: orderNo={}, msgId={}, localState={}, sendStatus={}",
request.getOrderNo(),
result.getMsgId(),
result.getLocalTransactionState(),
result.getSendStatus()
);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("Half消息发送失败: " + result.getSendStatus());
}
// 根据本地事务状态返回
if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
return CreateOrderResponse.success(request.getOrderNo());
} else {
return CreateOrderResponse.fail("订单创建失败,本地事务回滚");
}
} catch (MQClientException e) {
log.error("发送事务消息异常: orderNo={}", request.getOrderNo(), e);
throw new RuntimeException("消息发送异常", e);
}
}
}3.3 消费者端实现
/**
* 订单创建事件消费者
* 注意:必须实现幂等,因为网络重试可能导致重复消费
*/
@Component
@RocketMQMessageListener(
topic = "order-created-topic",
consumerGroup = "inventory-consumer-group",
selectorExpression = "order-event",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING
)
@Slf4j
public class OrderCreatedEventConsumer implements RocketMQListener<OrderCreatedEvent> {
private final InventoryService inventoryService;
private final ConsumeRecordRepository consumeRecordRepository;
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(OrderCreatedEvent event) {
String orderNo = event.getOrderNo();
log.info("收到订单创建事件: orderNo={}", orderNo);
// 幂等检查:防止重复消费
if (consumeRecordRepository.existsByOrderNo(orderNo)) {
log.info("订单已处理,跳过: orderNo={}", orderNo);
return;
}
// 扣减库存
for (OrderItem item : event.getItems()) {
inventoryService.deduct(item.getSkuId(), item.getQuantity());
}
// 记录消费记录
ConsumeRecord record = new ConsumeRecord();
record.setOrderNo(orderNo);
record.setConsumeTime(LocalDateTime.now());
consumeRecordRepository.save(record);
log.info("库存扣减完成: orderNo={}", orderNo);
}
}四、踩坑实录
坑1:Half消息发送失败的处理
Half消息发送失败(Broker不可达、网络超时)时,本地事务不会执行。业务层需要处理这种情况,不能忽略异常。
// 错误写法:忽略异常
try {
transactionProducer.sendMessageInTransaction(message, request);
} catch (Exception e) {
log.error("发送失败", e);
// 什么都不做,等结果丢失
}
// 正确写法:失败后走降级逻辑
try {
TransactionSendResult result = transactionProducer.sendMessageInTransaction(
message, request);
// ...
} catch (MQClientException e) {
// Half消息发送失败,事务消息不可用,降级到同步写DB + 补偿任务
log.error("事务消息发送失败,降级处理: orderNo={}", request.getOrderNo(), e);
return createOrderFallback(request);
}坑2:checkLocalTransaction查询逻辑不可靠
有个同事实现checkLocalTransaction时直接查订单表:orderRepository.existsByOrderNo(orderNo)。看起来没问题,但有个隐患:如果executeLocalTransaction还没开始执行(刚发完Half消息Broker就来回查了),查到的结果是"不存在",错误地返回了ROLLBACK,导致消息被删除,但后续executeLocalTransaction执行成功,订单创建了但消息没发出去。
正确做法:用专门的事务日志表,executeLocalTransaction里显式记录事务日志状态,checkLocalTransaction查事务日志表而不是业务表。回查前先检查Half消息的创建时间,如果距离创建时间不足30秒,返回UNKNOW等待。
坑3:事务消息和普通消息混用同一个Producer
事务Producer只能发送事务消息,不能发送普通消息。如果在同一个Producer上发普通消息,会抛出UnsupportedOperationException。
需要分别创建TransactionMQProducer(用于事务消息)和DefaultMQProducer(用于普通消息)。
坑4:回查次数耗尽后消息被强制回滚
默认最大回查次数transactionCheckMax=15,超过后Broker强制将Half消息删除(相当于Rollback)。如果本地事务处于UNKNOW状态超过15次回查,消息永久丢失。
场景:数据库压力极大时,checkLocalTransaction查询超时,一直返回UNKNOW,等到第15次回查时消息被删除,但本地事务其实成功了。
解决方案:
- checkLocalTransaction里加超时保护,超时时先返回UNKNOW,不要让整个方法长时间阻塞
- 适当增大
transactionCheckMax和transactionCheckInterval - 使用事务日志表+定时补偿任务双保险
坑5:消息体大小超限
RocketMQ默认单条消息最大4MB(maxMessageSize),事务消息也不例外。如果订单消息体太大(包含了大量商品详情、用户信息),会发送失败。
解决方案:消息体只包含关键ID(orderId、userId),下游消费时再查DB获取完整信息,这也是消息设计的最佳实践——消息是"通知",不是"数据快照"。
五、总结与延伸
RocketMQ事务消息通过Half消息 + 本地事务 + 回查机制,用一种巧妙的方式解决了"消息发送与本地事务原子性"问题。
核心流程记住三句话:
- 先发Half消息:Broker存着但消费者看不见
- 再执行本地事务:成功Commit,失败Rollback
- 网络故障靠回查:Producer实现checkLocalTransaction,查本地DB状态
这套机制的可靠性前提是:本地事务状态必须可查。所以一定要维护事务日志表,不能依赖业务表的间接查询。
和Kafka事务消息(第441期)的对比:RocketMQ事务消息是解决"生产端本地事务与消息的原子性",Kafka Exactly-Once是解决"消息在Kafka内部精确一次流转",两者解决的问题不同,不能互相替代。
下一篇(第435期)讲消息幂等性的三种方案,从业务唯一键、数据库唯一索引到Redis去重,看看实际项目中最常用哪种。
