分布式事务选型:2PC、SAGA、TCC和本地消息表的适用边界
分布式事务选型:2PC、SAGA、TCC和本地消息表的适用边界
适读人群:Java后端开发、微服务架构师、对分布式系统感兴趣的工程师 | 阅读时长:约27分钟
开篇故事
2021年双11前,我们做微服务拆分,把一个单体的下单流程拆成了订单服务、库存服务、积分服务三个微服务。
上线后第三天,客服开始收到用户投诉:下单成功了,但库存没扣,导致同一件商品被多人下单。
排查发现:订单服务的createOrder成功了,但调用库存服务的decreaseStock RPC失败了(网络抖动,超时),而订单服务的事务已经提交。
问题很清晰:跨服务的操作无法用数据库事务保证原子性。我们面临的是分布式事务问题。
当时团队里吵了整整一周:有人说用Seata的AT模式,有人说用TCC,有人说本地消息表就够了,甚至有人提议用2PC。
最后我把每种方案的适用场景、实现成本、性能开销全部列出来,团队才达成了共识。今天把这个选型过程完整地呈现出来。
一、分布式事务的本质问题
分布式事务要解决的是:多个服务/数据库的操作如何保证原子性。
CAP理论告诉我们:分布式系统无法同时满足一致性(C)、可用性(A)和分区容错性(P)。
实际选型时,我们通常在三个维度权衡:
强一致性
(2PC/XA)
/ \
/ \
高可用 ← CAP权衡 → 高性能
(本地消息表) (TCC/SAGA)二、底层原理:四种方案的协议流程
2.1 2PC(两阶段提交)
2PC是数据库领域最经典的分布式事务协议,由XA规范定义。
2PC时序图
事务管理器TM 参与者RM1 参与者RM2
| | |
|---Prepare(xa)----->| |
|---Prepare(xa)-----------------> |
| | |
|<---Vote: YES-------| |
|<---Vote: YES-------------------| |
| | |
| (所有YES,决定提交) | |
| | |
|---Commit---------->| |
|---Commit-----------------------> |
| | |
|<---ACK-------------| |
|<---ACK-------------------------| |
| | |
完成 | |2PC的问题:
- 同步阻塞:Prepare阶段,所有参与者都持有锁,等待TM的决定
- 单点故障:TM崩溃后,参与者一直持有锁,无法判断是提交还是回滚
- 数据不一致:TM发出Commit后,第一个RM提交了,但TM崩溃,第二个RM还没收到Commit
2.2 SAGA模式
SAGA将长事务分解为一系列本地事务,每个本地事务都有对应的补偿事务。
SAGA执行流程(正常情况):
T1(订单) → T2(库存) → T3(积分) → 全部成功
SAGA补偿流程(T3失败):
T1 → T2 → T3(失败)
↓
C2(退库存) → C1(取消订单) → 最终状态回滚
补偿操作必须是幂等的、可重试的。两种SAGA实现方式:
- 编排式(Orchestration):中心化的Saga协调器驱动各步骤
- 协同式(Choreography):各服务通过事件互相触发,无中心协调器
2.3 TCC模式
TCC(Try-Confirm-Cancel)是2PC的业务层实现:
TCC时序图:
协调器 订单服务Try 库存服务Try 积分服务Try
| | | |
|---Try()------------>| | |
|---Try()--------------------------> | |
|---Try()----------------------------------------------> |
| | | |
| (Try全部成功) | | |
| | | |
|---Confirm()-------->| | |
|---Confirm()-------------------------->| |
|---Confirm()---------------------------------------------->|
| | | |
Try阶段:检查并预留资源(冻结库存,预扣积分)
Confirm阶段:确认执行(实际扣除)
Cancel阶段:释放预留资源(解冻)TCC的核心约束:
- Try操作必须能幂等
- Confirm/Cancel必须能幂等且不失败
- 要处理空回滚(Cancel先于Try执行的情况)
2.4 本地消息表
本地消息表流程:
订单服务(主库):
BEGIN;
INSERT INTO orders(...); -- 业务操作
INSERT INTO outbox_msg(topic, payload, status) -- 同一个事务
VALUES('stock.decrease', '{"orderId":123}', 'PENDING');
COMMIT; -- 原子性保证:要么都成功,要么都失败
消息发送线程(轮询):
SELECT * FROM outbox_msg WHERE status = 'PENDING' LIMIT 100;
发送消息到MQ;
UPDATE outbox_msg SET status = 'SENT' WHERE id = ?;
库存服务(消费者):
消费MQ消息;
BEGIN;
decreaseStock(...);
记录消费记录(防重复消费);
COMMIT;三、完整解决方案与代码
3.1 本地消息表实现(推荐方案)
这是我们最终采用的方案,实现简单,可靠性高。
// 1. 消息表DDLCREATE TABLE outbox_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
topic VARCHAR(128) NOT NULL COMMENT '消息主题',
payload TEXT NOT NULL COMMENT '消息内容JSON',
status TINYINT NOT NULL DEFAULT 0 COMMENT '0:待发送 1:已发送 2:发送失败',
retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数',
next_retry DATETIME COMMENT '下次重试时间',
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
INDEX idx_status_retry (status, next_retry)
) ENGINE=InnoDB;// 2. 业务代码:在同一个事务内写业务数据和消息
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OutboxMessageMapper outboxMapper;
@Autowired
private ObjectMapper objectMapper;
/**
* 下单:业务操作和发消息在同一个本地事务内
*/
@Transactional(rollbackFor = Exception.class)
public Order createOrder(CreateOrderRequest request) throws Exception {
// 1. 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);
// 2. 在同一个事务内写入outbox消息(原子性保证)
StockDecreaseEvent event = new StockDecreaseEvent();
event.setOrderId(order.getId());
event.setProductId(request.getProductId());
event.setQuantity(request.getQuantity());
OutboxMessage message = new OutboxMessage();
message.setTopic("stock.decrease");
message.setPayload(objectMapper.writeValueAsString(event));
message.setStatus(0); // 待发送
message.setCreatedAt(LocalDateTime.now());
outboxMapper.insert(message);
return order;
}
}
// 3. 消息投递线程(定时轮询)
@Component
public class OutboxMessagePublisher {
@Autowired
private OutboxMessageMapper outboxMapper;
@Autowired
private RocketMQTemplate mqTemplate;
@Scheduled(fixedDelay = 1000) // 每秒检查一次
@Transactional(rollbackFor = Exception.class)
public void publishPendingMessages() {
// 查询待发送的消息(带分布式锁,防止多实例重复发送)
List<OutboxMessage> messages = outboxMapper.selectPending(
LocalDateTime.now(), 100 // 每批最多100条
);
for (OutboxMessage msg : messages) {
try {
// 发送到MQ(同步发送,确保消息到达broker)
mqTemplate.syncSend(msg.getTopic(), msg.getPayload());
// 标记为已发送
outboxMapper.markSent(msg.getId());
} catch (Exception e) {
// 发送失败,更新重试信息
int retryCount = msg.getRetryCount() + 1;
LocalDateTime nextRetry = LocalDateTime.now()
.plusSeconds(fibonacci(retryCount) * 10L); // 指数退避
outboxMapper.markFailed(msg.getId(), retryCount, nextRetry);
}
}
}
// 斐波那契退避:10, 10, 20, 30, 50, 80... 秒
private int fibonacci(int n) {
if (n <= 1) return 1;
int a = 1, b = 1;
for (int i = 2; i <= n; i++) {
int c = a + b;
a = b;
b = c;
}
return Math.min(b, 100); // 最大间隔100倍基数
}
}
// 4. 消费端:幂等处理
@Component
@RocketMQMessageListener(topic = "stock.decrease", consumerGroup = "stock-service")
public class StockDecreaseConsumer implements RocketMQListener<String> {
@Autowired
private StockService stockService;
@Autowired
private ConsumedMessageMapper consumedMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(String payload) {
StockDecreaseEvent event = parseEvent(payload);
// 幂等检查:已消费过的消息直接跳过
if (consumedMapper.exists(event.getOrderId(), "stock.decrease")) {
return;
}
// 执行业务操作
stockService.decrease(event.getProductId(), event.getQuantity());
// 记录消费记录(和业务操作在同一个事务内)
consumedMapper.insert(event.getOrderId(), "stock.decrease");
}
}3.2 TCC实现(适合强一致性场景)
// TCC接口定义
public interface StockTccService {
/**
* Try:冻结库存(不真正扣减)
* @return 冻结是否成功
*/
@TwoPhaseBusinessAction(name = "freezeStock", commitMethod = "confirm", rollbackMethod = "cancel")
boolean tryFreeze(@BusinessActionContext BusinessActionContext ctx,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "quantity") int quantity);
/**
* Confirm:实际扣减库存(冻结转扣减)
*/
boolean confirm(BusinessActionContext ctx);
/**
* Cancel:解冻库存(释放预留)
*/
boolean cancel(BusinessActionContext ctx);
}@Service
public class StockTccServiceImpl implements StockTccService {
@Autowired
private StockMapper stockMapper;
@Override
@Transactional
public boolean tryFreeze(BusinessActionContext ctx, Long productId, int quantity) {
// Try阶段:检查库存是否充足,冻结指定数量
int affected = stockMapper.freeze(productId, quantity);
if (affected == 0) {
throw new RuntimeException("库存不足,冻结失败: productId=" + productId);
}
return true;
}
@Override
@Transactional
public boolean confirm(BusinessActionContext ctx) {
Long productId = Long.parseLong(ctx.getActionContext("productId").toString());
int quantity = Integer.parseInt(ctx.getActionContext("quantity").toString());
// Confirm阶段:冻结转扣减(幂等处理)
stockMapper.confirmFreeze(productId, quantity);
return true;
}
@Override
@Transactional
public boolean cancel(BusinessActionContext ctx) {
Long productId = Long.parseLong(ctx.getActionContext("productId").toString());
int quantity = Integer.parseInt(ctx.getActionContext("quantity").toString());
// Cancel阶段:解除冻结(幂等处理,防止空回滚)
// 注意:如果Try未执行(空回滚场景),这里要能安全退出
stockMapper.cancelFreeze(productId, quantity);
return true;
}
}对应SQL:
-- 库存表增加冻结字段
ALTER TABLE stock ADD COLUMN frozen_quantity INT NOT NULL DEFAULT 0;
-- Try:冻结库存
UPDATE stock
SET frozen_quantity = frozen_quantity + :quantity
WHERE product_id = :productId
AND available_quantity >= :quantity;
-- available_quantity = total - frozen - sold
-- Confirm:冻结转扣减
UPDATE stock
SET frozen_quantity = frozen_quantity - :quantity,
sold_quantity = sold_quantity + :quantity
WHERE product_id = :productId
AND frozen_quantity >= :quantity;
-- Cancel:解除冻结
UPDATE stock
SET frozen_quantity = GREATEST(0, frozen_quantity - :quantity)
WHERE product_id = :productId;
-- GREATEST(0, ...) 防止空回滚导致负数四、踩坑实录
坑1:本地消息表消息积压,轮询查询成为瓶颈
症状:高峰期每秒产生5000条订单,outbox消息表积压到了10万+条,轮询查询越来越慢。
-- 问题SQL:全表扫描
SELECT * FROM outbox_message WHERE status = 0 LIMIT 100;
-- EXPLAIN发现没走索引(status选择性低)
-- type=ALL,扫描10万行,耗时2.3秒报错:
[ERROR] Slow query: 2.341 seconds
SELECT * FROM outbox_message WHERE status = 0 LIMIT 100解决方案:
-- 优化索引(按状态+创建时间排序,先处理老消息)
ALTER TABLE outbox_message
DROP INDEX idx_status_retry,
ADD INDEX idx_status_created (status, created_at);
-- 优化查询SQL
SELECT * FROM outbox_message
WHERE status = 0
AND created_at < DATE_SUB(NOW(), INTERVAL 1 SECOND)
ORDER BY created_at ASC
LIMIT 100 FOR UPDATE SKIP LOCKED; -- MySQL 8.0+,跳过被其他线程锁住的行// 多线程并发投递(每个线程处理不同的消息)
@Scheduled(fixedDelay = 500)
public void publishAsync() {
// 使用线程池并发处理
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
executor.submit(this::publishOneBatch);
}
}坑2:TCC的空回滚问题
场景:Try请求因网络超时没到达库存服务,框架触发了Cancel。
协调器 → Try(库存服务) --超时--> 框架触发Cancel(库存服务)
↓
Cancel收到,但Try从未执行过!
此时执行 frozen_quantity -= quantity
→ frozen_quantity变成负数!报错/数据异常:
库存表: product_id=100, frozen_quantity=-5
-- 逻辑上不可能的数据解决方案:
@Override
@Transactional
public boolean cancel(BusinessActionContext ctx) {
Long productId = Long.parseLong(ctx.getActionContext("productId").toString());
String xid = ctx.getXid();
// 检查是否有对应的Try记录
TccRecord record = tccRecordMapper.selectByXid(xid);
if (record == null) {
// 空回滚:Try未执行,记录一个空回滚标记,直接返回成功
tccRecordMapper.insertEmptyCancel(xid, productId);
return true; // 返回true,让框架认为Cancel成功
}
// 正常回滚:释放Try冻结的资源
stockMapper.cancelFreeze(productId,
Integer.parseInt(ctx.getActionContext("quantity").toString()));
return true;
}
@Override
@Transactional
public boolean tryFreeze(BusinessActionContext ctx, Long productId, int quantity) {
String xid = ctx.getXid();
// 检查是否已经有空回滚标记(悬挂问题:Cancel先于Try执行)
TccRecord cancelRecord = tccRecordMapper.selectByXid(xid);
if (cancelRecord != null && cancelRecord.isEmptyCancel()) {
// 悬挂:Cancel已经执行,Try不应该再执行
return false;
}
// 记录Try执行
tccRecordMapper.insertTry(xid, productId, quantity);
// 执行冻结
int affected = stockMapper.freeze(productId, quantity);
return affected > 0;
}坑3:SAGA补偿操作执行失败,最终状态不一致
场景:SAGA的补偿操作(Cancel/Compensate)失败了怎么办?
T1(订单) → T2(库存) → T3(积分,失败)
↓
C2(退库存) ← 补偿操作也失败了!报错:
WARN SagaCoordinator - Compensation C2 failed after 3 retries,
transaction stuck in compensating state解决方案:
- 补偿操作必须设计为无限重试(不能失败,必须保证最终执行)
- 补偿操作本身必须幂等(多次执行结果一致)
- 人工干预机制:如果补偿一直失败,触发告警并人工介入
// 补偿操作的幂等设计
@Transactional
public void compensateStockDecrease(String sagaId, Long productId, int quantity) {
// 幂等检查:是否已经补偿过
if (sagaCompensationMapper.exists(sagaId, "STOCK_COMPENSATE")) {
log.warn("Compensation already done for sagaId={}", sagaId);
return;
}
// 执行补偿(退还库存)
stockMapper.increase(productId, quantity);
// 记录补偿执行
sagaCompensationMapper.insert(sagaId, "STOCK_COMPENSATE");
}
// 补偿告警:超过10次重试,发出人工介入告警
@Scheduled(fixedDelay = 60_000)
public void checkStuckCompensations() {
List<SagaInstance> stuck = sagaMapper.selectStuckCompensating(10);
if (!stuck.isEmpty()) {
alertService.sendUrgent("SAGA补偿卡住,需要人工介入", stuck);
}
}五、总结与延伸
分布式事务的选型没有最好的方案,只有最合适的方案。
选型参考表:
| 方案 | 一致性 | 性能 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC/XA | 强一致 | 低(同步阻塞) | 中 | 单机多数据库,非高并发 |
| TCC | 强一致(业务层) | 高 | 高 | 金融转账、资金操作 |
| SAGA | 最终一致 | 高 | 中高 | 长流程、多步骤业务 |
| 本地消息表 | 最终一致 | 高 | 低 | 大多数微服务场景 |
一条核心原则:
能用本地事务解决的,绝对不引入分布式事务。能用最终一致的,绝对不用强一致。分布式事务是不得不用时的选择,不是设计系统的第一步。
