RocketMQ 事务消息实战——分布式事务的消息最终一致性方案
RocketMQ 事务消息实战——分布式事务的消息最终一致性方案
适读人群:使用 RocketMQ 解决分布式事务问题的 Java 后端开发者 | 阅读时长:约17分钟 | 核心价值:彻底掌握 RocketMQ 事务消息的原理和完整实现,解决分布式事务的最终一致性
从一次转账失败说起
我负责过一个钱包服务,用户充值后,需要同时做两件事:写充值记录、给用户账户加余额。这两个操作在同一个数据库,用一个本地事务搞定,没什么问题。
但后来业务升级,需要把充值记录写到单独的"账单服务",账单服务有自己的数据库。这下麻烦了——加余额和写账单变成了跨服务的操作,本地事务覆盖不了。
第一个方案:先加余额,再调账单服务接口写账单。如果写账单失败,余额已经加了,怎么补?手动回滚余额?回滚也可能失败。
第二个方案:先发 MQ 消息,消费者处理时加余额 + 写账单。如果发 MQ 成功,但进程崩了,数据库事务没提交,MQ 消息已经发出去了,下游处理了,但余额根本没加。
这两个方案都有漏洞。RocketMQ 的事务消息,是专门为这类场景设计的。
事务消息核心原理
RocketMQ 事务消息的核心思路是两阶段提交(Half Message + Commit/Rollback):
第一阶段:发送半消息(Half Message)
- Producer 发送一个半消息到 Broker
- 半消息对消费者不可见
- Broker 返回 SEND_OK 给 Producer
第二阶段:本地事务 + Commit/Rollback
- Producer 执行本地事务(加余额)
- 本地事务成功:Producer 发送 COMMIT 给 Broker,消息变为可见,消费者可以消费
- 本地事务失败:Producer 发送 ROLLBACK,Broker 删除半消息
- 网络问题导致 Commit/Rollback 丢失:Broker 定期回查 Producer,Producer 根据本地事务状态返回 COMMIT 或 ROLLBACK
事务回查(Check)
- Broker 对长时间没有状态的半消息,主动回查 Producer
- Producer 查询本地事务执行结果,返回 COMMIT / ROLLBACK / UNKNOW
完整代码实现
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>充值服务:发送事务消息
@Service
public class RechargeService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RechargeMapper rechargeMapper;
/**
* 用户充值
* 1. 发半消息到 MQ
* 2. 在事务监听器中执行本地事务(写充值记录)
* 3. 本地事务成功 -> MQ 消息可见 -> 账单服务消费
*/
public void recharge(RechargeRequest request) {
// 构建消息
RechargeEvent event = new RechargeEvent();
event.setRechargeId(request.getRechargeId());
event.setUserId(request.getUserId());
event.setAmount(request.getAmount());
event.setTimestamp(System.currentTimeMillis());
Message<String> message = MessageBuilder
.withPayload(JSON.toJSONString(event))
.setHeader("rechargeId", request.getRechargeId()) // 用于事务回查
.build();
// 发送事务消息
// 第一个参数:Topic,第二个参数:消息,第三个参数:额外参数(传给 TransactionListener)
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"RECHARGE_TOPIC:RECHARGE_TAG", // topic:tag
message,
request // 传给 executeLocalTransaction 的参数
);
log.info("事务消息发送结果: {}, rechargeId={}",
result.getLocalTransactionState(), request.getRechargeId());
if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
throw new BusinessException("充值失败,请重试");
}
}
}事务监听器:执行本地事务 + 事务回查
@Component
@RocketMQTransactionListener
public class RechargeTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private RechargeMapper rechargeMapper;
@Autowired
private AccountBalanceMapper accountBalanceMapper;
/**
* 执行本地事务
* 半消息发送成功后,这个方法被调用
*/
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
RechargeRequest request = (RechargeRequest) arg;
try {
// 幂等检查:防止重复处理
RechargeRecord existing = rechargeMapper.selectByRechargeId(request.getRechargeId());
if (existing != null) {
log.warn("充值记录已存在,直接提交。rechargeId={}", request.getRechargeId());
return RocketMQLocalTransactionState.COMMIT;
}
// 本地事务:写充值记录 + 加余额(同一个 DB 事务)
RechargeRecord record = new RechargeRecord();
record.setRechargeId(request.getRechargeId());
record.setUserId(request.getUserId());
record.setAmount(request.getAmount());
record.setStatus(RechargeStatus.SUCCESS);
record.setCreatedAt(LocalDateTime.now());
rechargeMapper.insert(record);
// 加余额
accountBalanceMapper.addBalance(request.getUserId(), request.getAmount());
// 本地事务成功,提交消息(消费者可以消费了)
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务执行失败,回滚消息。rechargeId={}", request.getRechargeId(), e);
// 本地事务失败,回滚消息(消费者不会收到消息)
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务回查
* 当 Broker 没有收到 COMMIT/ROLLBACK 时(比如 Producer 崩溃),Broker 回调这个方法
* 这里根据本地事务结果告诉 Broker 是 COMMIT 还是 ROLLBACK
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String rechargeId = (String) msg.getHeaders().get("rechargeId");
if (rechargeId == null) {
log.error("回查时 rechargeId 为空,无法确认事务状态,返回 UNKNOW");
return RocketMQLocalTransactionState.UNKNOW;
}
RechargeRecord record = rechargeMapper.selectByRechargeId(rechargeId);
if (record == null) {
// 本地记录不存在,可能本地事务没执行(或者执行失败并回滚了)
// 检查时间:如果超过一定时间还没记录,说明本地事务确实失败了
log.warn("充值记录不存在,回滚消息。rechargeId={}", rechargeId);
return RocketMQLocalTransactionState.ROLLBACK;
}
if (record.getStatus() == RechargeStatus.SUCCESS) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}账单服务消费者:消费事务消息
@Component
@RocketMQMessageListener(
topic = "RECHARGE_TOPIC",
selectorExpression = "RECHARGE_TAG",
consumerGroup = "bill-service-consumer-group"
)
public class RechargeMessageConsumer implements RocketMQListener<String> {
@Autowired
private BillMapper billMapper;
@Override
public void onMessage(String messageBody) {
RechargeEvent event = JSON.parseObject(messageBody, RechargeEvent.class);
// 幂等检查(消息可能重复消费)
Bill existingBill = billMapper.selectByRechargeId(event.getRechargeId());
if (existingBill != null) {
log.warn("账单已存在,跳过重复处理。rechargeId={}", event.getRechargeId());
return;
}
// 写账单
Bill bill = new Bill();
bill.setUserId(event.getUserId());
bill.setRechargeId(event.getRechargeId());
bill.setAmount(event.getAmount());
bill.setBillType(BillType.RECHARGE);
bill.setCreatedAt(LocalDateTime.now());
billMapper.insert(bill);
log.info("账单写入成功。rechargeId={}, userId={}",
event.getRechargeId(), event.getUserId());
}
}三大踩坑实录
坑一:事务回查超时,消息被自动回滚
现象: 本地事务明明执行成功了,但消费者始终没收到消息。查 RocketMQ 控制台,发现这些消息状态是 ROLLBACK。
原因: 本地事务执行时间较长(超过 60 秒),Broker 回查超时后自动判定为 ROLLBACK,删除了半消息。
解法:
- 控制本地事务执行时间,超过 10 秒的事务要告警
- 调整 RocketMQ 的事务超时时间(
transactionTimeoutSecond) - 如果本地事务确实需要长时间,在回查接口里正确返回真实状态,不要让 Broker 靠超时判断
坑二:回查接口返回 UNKNOW 导致无限回查
现象: 某次 DB 故障期间,回查接口因为数据库不可用而返回了 UNKNOW。DB 恢复后,理论上应该恢复正常,但这批消息永远处于 UNKNOW 状态,不断被回查,浪费 Broker 资源。
原因: RocketMQ 默认对 UNKNOW 状态的消息无限回查,没有次数限制。
解法: 在回查接口记录回查次数,超过一定次数(比如 15 次)后,根据时间推断事务状态并强制返回 ROLLBACK:
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String rechargeId = msg.getHeaders().get("rechargeId", String.class);
RechargeRecord record = rechargeMapper.selectByRechargeId(rechargeId);
if (record == null) {
// 获取消息发送时间
long msgTimestamp = (Long) msg.getHeaders().get(RocketMQHeaders.BORN_TIMESTAMP);
long elapsed = System.currentTimeMillis() - msgTimestamp;
// 超过 5 分钟还没记录,判定为事务失败,回滚
if (elapsed > 5 * 60 * 1000) {
log.warn("事务超时,强制回滚。rechargeId={}", rechargeId);
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOW;
}
return record.getStatus() == RechargeStatus.SUCCESS
? RocketMQLocalTransactionState.COMMIT
: RocketMQLocalTransactionState.ROLLBACK;
}坑三:消费者处理失败导致余额加了但账单没写
现象: 账单服务消费者抛出了 RuntimeException,消息被 RocketMQ 重试了多次,但账单服务那台机器 OOM 重启,消息最终进入死信队列,账单没写入。用户余额加了,但账单记录缺失。
原因: 事务消息保证的是"加余额"和"MQ 消息投递"的原子性,不保证消费者一定处理成功。消费者处理失败是需要业务层面处理的问题。
解法:
- 消费者必须做好幂等,写账单用
INSERT IGNORE或唯一索引 - 配置死信队列告警,有消息进死信队列立刻人工介入
- 定期对账:扫描成功的充值记录,检查账单服务是否有对应账单
对比其他方案
| 方案 | 一致性保证 | 性能 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| Seata AT | 强一致性 | 低 | 低 | 低并发强一致 |
| TCC | 强一致性 | 中 | 高 | 高并发强一致 |
| RocketMQ 事务消息 | 最终一致性 | 高 | 中 | 解耦、高并发最终一致 |
| 本地消息表 | 最终一致性 | 高 | 中 | 同上 |
事务消息适合的核心场景:跨服务操作,下游允许延迟(数秒内),不需要强一致性。
