Saga分布式事务:用消息队列实现跨服务的最终一致性
Saga分布式事务:用消息队列实现跨服务的最终一致性
适读人群:在微服务架构中处理跨服务数据一致性问题的Java工程师 | 阅读时长:约17分钟
开篇故事
电商下单流程涉及多个微服务:订单服务、库存服务、积分服务、优惠券服务。每个服务有自己独立的数据库,需要保证这四个服务的操作要么全部成功,要么全部回滚。
最初我们用了XA两阶段提交。结果:高并发下TPS只有800(因为锁跨多个DB,等待时间极长),而且有一次库存服务在二阶段提交时宕机,陷入了分布式锁死,只能人工干预。
后来我们改用Saga模式,把分布式事务拆成多个独立的本地事务,通过消息队列串联,任何一步失败就通过补偿消息回滚前面的步骤。改造完成后,TPS从800提升到了8000,故障恢复从"人工干预"变成了"自动补偿"。
今天把Saga的设计思路和完整实现讲清楚。
一、Saga的核心思想
Saga模式将一个大的分布式事务拆成一系列小的本地事务,每个本地事务完成后发布一个消息,触发下一个本地事务。如果某个步骤失败,发布补偿消息,按反向顺序执行补偿操作(回滚)。
二、两种Saga实现模式
| 模式 | 说明 | 优点 | 缺点 |
|---|---|---|---|
| 编排式(Choreography) | 每个服务监听事件自己决定下一步 | 去中心化,低耦合 | 业务流程分散,难以整体把握 |
| 编制式(Orchestration) | 中央Saga编排器控制整个流程 | 流程清晰,易监控 | 编排器成为中心,引入额外复杂度 |
生产中,简单流程用编排式,复杂流程(多分支、多补偿)用编制式。
三、完整Java实现(编排式)
3.1 事件定义
/**
* Saga事件基类
*/
public abstract class SagaEvent {
private String sagaId; // Saga事务ID,贯穿整个流程
private String orderId; // 业务ID
private LocalDateTime timestamp;
private String status; // SUCCESS / FAILED / COMPENSATING
// getter/setter...
}
// 正向事件
public class OrderCreatedEvent extends SagaEvent {}
public class InventoryReservedEvent extends SagaEvent {}
public class PointsDeductedEvent extends SagaEvent {}
public class CouponUsedEvent extends SagaEvent {}
// 补偿事件
public class ReleaseInventoryEvent extends SagaEvent {}
public class RestorePointsEvent extends SagaEvent {}
public class ReleaseCouponEvent extends SagaEvent {}
public class CancelOrderEvent extends SagaEvent {}3.2 订单服务(Saga发起者)
/**
* 订单服务:Saga的发起者,创建订单并发布第一个Saga事件
* 同时监听最终结果(成功/补偿完成)
*/
@Service
@Slf4j
public class OrderSagaService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final SagaStateRepository sagaStateRepository;
/**
* 发起Saga:创建订单
*/
@Transactional
public String createOrder(CreateOrderRequest request) {
// 1. 创建订单(PENDING状态)
Order order = new Order();
order.setOrderId(generateOrderId());
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// 2. 记录Saga状态(用于监控和故障恢复)
String sagaId = UUID.randomUUID().toString();
SagaState state = new SagaState();
state.setSagaId(sagaId);
state.setOrderId(order.getOrderId());
state.setCurrentStep("ORDER_CREATED");
state.setStatus("IN_PROGRESS");
state.setStartTime(LocalDateTime.now());
sagaStateRepository.save(state);
// 3. 发布OrderCreatedEvent,触发库存扣减
OrderCreatedEvent event = new OrderCreatedEvent();
event.setSagaId(sagaId);
event.setOrderId(order.getOrderId());
event.setUserId(request.getUserId());
event.setItems(request.getItems());
event.setStatus("SUCCESS");
kafkaTemplate.send("order-saga-events", order.getOrderId(),
JsonUtil.toJson(event));
log.info("Saga发起: sagaId={}, orderId={}", sagaId, order.getOrderId());
return order.getOrderId();
}
/**
* 监听Saga最终结果
*/
@KafkaListener(topics = "order-saga-result")
@Transactional
public void handleSagaResult(ConsumerRecord<String, String> record,
Acknowledgment ack) {
SagaEvent event = JsonUtil.fromJson(record.value(), SagaEvent.class);
String orderId = event.getOrderId();
if ("SUCCESS".equals(event.getStatus())) {
// 所有步骤都成功,确认订单
orderRepository.updateStatus(orderId, OrderStatus.CONFIRMED);
sagaStateRepository.updateStatus(event.getSagaId(), "COMPLETED");
log.info("Saga成功完成: orderId={}", orderId);
} else if ("COMPENSATED".equals(event.getStatus())) {
// 补偿完成,取消订单
orderRepository.updateStatus(orderId, OrderStatus.CANCELLED);
sagaStateRepository.updateStatus(event.getSagaId(), "COMPENSATED");
log.info("Saga补偿完成,订单取消: orderId={}", orderId);
}
ack.acknowledge();
}
}3.3 库存服务(Saga参与者)
/**
* 库存服务:Saga参与者
* 消费OrderCreated事件,扣减库存,成功后发布InventoryReserved
* 失败则发布ReleaseInventory补偿事件回滚
*/
@Service
@Slf4j
public class InventorySagaService {
private final InventoryRepository inventoryRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* 处理订单创建事件:扣减库存
*/
@KafkaListener(topics = "order-saga-events", groupId = "inventory-saga-group")
@Transactional
public void handleOrderCreated(ConsumerRecord<String, String> record,
Acknowledgment ack) {
SagaEvent event = JsonUtil.fromJson(record.value(), SagaEvent.class);
if (!(event instanceof OrderCreatedEvent)) {
ack.acknowledge();
return;
}
OrderCreatedEvent orderEvent = (OrderCreatedEvent) event;
String sagaId = orderEvent.getSagaId();
String orderId = orderEvent.getOrderId();
log.info("库存扣减开始: sagaId={}, orderId={}", sagaId, orderId);
try {
// 幂等检查
if (inventoryRepository.isReserved(sagaId)) {
log.info("库存已扣减(幂等跳过): sagaId={}", sagaId);
publishSuccess(sagaId, orderId, orderEvent);
ack.acknowledge();
return;
}
// 扣减库存
for (OrderItem item : orderEvent.getItems()) {
boolean success = inventoryRepository.reserve(
item.getSkuId(), item.getQuantity(), sagaId);
if (!success) {
throw new InsufficientInventoryException(
"库存不足: skuId=" + item.getSkuId());
}
}
// 成功:发布InventoryReserved事件,触发积分扣减
publishSuccess(sagaId, orderId, orderEvent);
ack.acknowledge();
log.info("库存扣减成功: sagaId={}", sagaId);
} catch (Exception e) {
log.error("库存扣减失败: sagaId={}, error={}", sagaId, e.getMessage());
// 失败:发布补偿事件(取消订单,无需恢复库存,因为扣减失败了)
CancelOrderEvent compensateEvent = new CancelOrderEvent();
compensateEvent.setSagaId(sagaId);
compensateEvent.setOrderId(orderId);
compensateEvent.setStatus("COMPENSATING");
compensateEvent.setReason(e.getMessage());
kafkaTemplate.send("order-saga-result", orderId,
JsonUtil.toJson(compensateEvent));
ack.acknowledge();
}
}
/**
* 处理补偿事件:恢复库存
*/
@KafkaListener(topics = "inventory-compensate", groupId = "inventory-saga-group")
@Transactional
public void handleReleaseInventory(ConsumerRecord<String, String> record,
Acknowledgment ack) {
ReleaseInventoryEvent event = JsonUtil.fromJson(
record.value(), ReleaseInventoryEvent.class);
log.info("恢复库存: sagaId={}", event.getSagaId());
// 恢复已扣减的库存
inventoryRepository.releaseReservation(event.getSagaId());
// 通知订单服务补偿完成
CancelOrderEvent cancelEvent = new CancelOrderEvent();
cancelEvent.setSagaId(event.getSagaId());
cancelEvent.setOrderId(event.getOrderId());
cancelEvent.setStatus("COMPENSATED");
kafkaTemplate.send("order-saga-result", event.getOrderId(),
JsonUtil.toJson(cancelEvent));
ack.acknowledge();
log.info("库存恢复完成: sagaId={}", event.getSagaId());
}
private void publishSuccess(String sagaId, String orderId,
OrderCreatedEvent sourceEvent) {
InventoryReservedEvent successEvent = new InventoryReservedEvent();
successEvent.setSagaId(sagaId);
successEvent.setOrderId(orderId);
successEvent.setStatus("SUCCESS");
successEvent.setUserId(sourceEvent.getUserId());
// 发送到积分服务的Topic
kafkaTemplate.send("inventory-saga-events", orderId,
JsonUtil.toJson(successEvent));
}
}四、踩坑实录
坑1:补偿失败如何处理(补偿的补偿)
积分补偿事件发出去了,但积分服务此时宕机,补偿消息没有被处理,订单处于"库存已回滚,但积分还没恢复"的中间状态。
解决方案:
- 补偿事件必须保证at-least-once投递(Kafka + 消费者幂等)
- Saga状态机记录每一步的状态,定时扫描"超时未完成的补偿"重新触发
- 超过最大重试次数的Saga进入人工处理队列
坑2:消息顺序导致的补偿时序问题
如果正向事件和补偿事件都在同一个Topic,可能出现:补偿事件先被消费,然后正向事件才到,导致状态混乱。
解决方案:正向事件和补偿事件用不同的Topic,或者在每个事件里包含Saga步骤序号,消费时检查序号防止乱序。
坑3:Saga状态不一致导致重复补偿
网络超时时,补偿消息可能被发送多次。如果补偿操作不幂等(比如库存恢复了两次),会导致数据错误。
解决方案:所有补偿操作必须幂等,通过sagaId去重:
if (inventoryRepository.isReleased(sagaId)) {
log.info("库存已恢复(幂等跳过): sagaId={}", sagaId);
return;
}坑4:长事务导致数据长时间不一致
Saga是最终一致的,在整个流程完成前(可能需要几秒到几十秒),数据处于不一致状态。如果用户在这期间查询订单状态或库存,可能看到中间状态。
解决方案:
- 订单对外显示"处理中"状态,屏蔽中间状态
- 设置合理的Saga超时时间(超时自动触发补偿)
- 对于隔离性要求高的操作,用预留(reserve)代替直接扣减,等Saga确认后再真正扣减
五、总结
Saga通过消息队列把分布式事务拆成多个本地事务,是微服务下最实用的数据一致性方案:
- 适用场景:跨多个微服务、需要最终一致性、对隔离性要求不高
- 不适用场景:需要强一致性(如金融转账的中间状态不可见)、补偿操作不可能实现(如已发出的短信无法撤回)
核心设计要点:
- 幂等性:每个步骤和补偿步骤都必须幂等
- 状态机:维护Saga的完整状态,支持故障恢复
- 超时机制:设置每步的超时时间,超时触发补偿
- 监控告警:Saga状态必须可视化监控,异常及时告警
下一篇(第451期)讲MQTT协议Java实践,IoT百万设备消息接入的架构设计,把消息队列的应用边界从传统业务系统扩展到物联网场景。
