Function Call与数据库事务:工具执行失败时的回滚策略
Function Call与数据库事务:工具执行失败时的回滚策略
适读人群:构建写操作类LLM工具的Java后端工程师 | 阅读时长:约17分钟
开篇故事
做了一个AI订单助手,能帮用户做一些简单的操作:取消订单、修改收货地址、申请退款等。一开始做得挺顺,但上线后遇到了一个棘手的问题。
用户说"帮我取消订单并申请退款"。Agent调用了cancel_order成功了,然后调用apply_refund,结果退款申请接口超时失败了。现在状态不一致:订单取消了,但退款没申请。
这在传统CRUD开发里很简单——用数据库事务,一个失败全部回滚。但在工具调用场景里,cancel_order工具执行后事务就提交了,apply_refund又是另一个工具调用,两个调用之间没有数据库事务覆盖。
这个问题,用分布式Saga模式才能正确解决。
一、工具调用的事务挑战
二、Saga模式:工具级别的补偿事务
Saga模式的核心思想:每个步骤有一个"正向操作",如果后续步骤失败,执行"补偿操作"撤销已完成的步骤。
三、完整代码实现
3.1 Saga协调器
@Component
public class SagaCoordinator {
@Autowired
private SagaLogRepository sagaLogRepository;
@Autowired
private ObjectMapper objectMapper;
/**
* 在Saga上下文中执行工具,支持补偿回滚
*/
public SagaResult executeInSaga(String sagaId, List<SagaStep> steps) {
List<CompletedStep> completedSteps = new ArrayList<>();
for (SagaStep step : steps) {
log.info("[Saga] {} executing step: {}", sagaId, step.toolName());
try {
// 执行工具
String result = step.tool().call(step.arguments());
// 记录已完成的步骤(用于后续补偿)
CompletedStep completed = new CompletedStep(
step.toolName(), step.arguments(), result,
step.compensationTool(), step.compensationArguments(result)
);
completedSteps.add(completed);
// 持久化Saga日志(防止机器宕机后无法恢复)
sagaLogRepository.save(new SagaLog(sagaId, step.toolName(),
"COMPLETED", result));
} catch (Exception e) {
log.error("[Saga] {} step {} failed: {}", sagaId, step.toolName(), e.getMessage());
// 执行补偿,逆序回滚
List<CompensationError> compensationErrors =
compensateCompletedSteps(sagaId, completedSteps);
return SagaResult.failure(sagaId, step.toolName(), e.getMessage(),
compensationErrors);
}
}
return SagaResult.success(sagaId);
}
/**
* 逆序执行补偿操作
*/
private List<CompensationError> compensateCompletedSteps(String sagaId,
List<CompletedStep> completedSteps) {
List<CompensationError> errors = new ArrayList<>();
// 逆序执行补偿
ListIterator<CompletedStep> it =
completedSteps.listIterator(completedSteps.size());
while (it.hasPrevious()) {
CompletedStep step = it.previous();
if (step.compensationTool() == null) {
log.info("[Saga] No compensation for step: {}", step.toolName());
continue;
}
log.info("[Saga] {} compensating step: {}", sagaId, step.toolName());
try {
step.compensationTool().call(step.compensationArguments());
sagaLogRepository.save(new SagaLog(sagaId, step.toolName(),
"COMPENSATED", null));
log.info("[Saga] {} step {} compensated successfully",
sagaId, step.toolName());
} catch (Exception e) {
log.error("[Saga] Compensation failed for step {}: {}",
step.toolName(), e.getMessage());
errors.add(new CompensationError(step.toolName(), e.getMessage()));
// 补偿失败要告警,需要人工介入
alertService.sendCriticalAlert(
"Saga compensation failed: " + sagaId + "/" + step.toolName());
}
}
return errors;
}
}3.2 带Saga的订单操作工具集
@Configuration
public class OrderToolsWithSaga {
@Autowired
private OrderService orderService;
@Autowired
private RefundService refundService;
@Autowired
private SagaCoordinator sagaCoordinator;
@Bean
@Description("取消订单并申请退款。这是一个原子操作:如果退款申请失败,取消操作会自动撤销。" +
"用户确认取消后才调用此工具。")
public Function<CancelWithRefundRequest, CancelWithRefundResponse> cancelOrderWithRefund() {
return request -> {
String sagaId = "saga-" + UUID.randomUUID();
// 定义步骤:操作 + 补偿
List<SagaStep> steps = List.of(
// 步骤1:取消订单
new SagaStep(
"cancel_order",
// 执行工具
args -> {
CancelOrderResult result = orderService.cancel(
request.orderId(), request.cancelReason());
return objectMapper.writeValueAsString(result);
},
// 参数
objectMapper.writeValueAsString(Map.of(
"orderId", request.orderId(),
"reason", request.cancelReason()
)),
// 补偿工具:如果后续步骤失败,恢复订单
args -> {
orderService.restore(request.orderId());
return "{\"restored\": true}";
},
// 补偿参数(由执行结果生成)
stepResult -> objectMapper.writeValueAsString(Map.of(
"orderId", request.orderId()
))
),
// 步骤2:申请退款
new SagaStep(
"apply_refund",
args -> {
RefundResult result = refundService.apply(
request.orderId(), request.refundAmount());
return objectMapper.writeValueAsString(result);
},
objectMapper.writeValueAsString(Map.of(
"orderId", request.orderId(),
"amount", request.refundAmount()
)),
// 退款申请的补偿:取消退款申请
args -> {
refundService.cancel(request.orderId());
return "{\"cancelled\": true}";
},
stepResult -> {
try {
JsonNode result = objectMapper.readTree(stepResult);
return objectMapper.writeValueAsString(Map.of(
"refundId", result.get("refundId").asText()
));
} catch (Exception e) {
return "{}";
}
}
)
);
SagaResult sagaResult = sagaCoordinator.executeInSaga(sagaId, steps);
if (sagaResult.isSuccess()) {
return new CancelWithRefundResponse(true,
"订单已取消,退款申请已提交,预计3-5个工作日到账", sagaId);
} else {
// 如果补偿成功,订单状态已恢复
if (sagaResult.compensationErrors().isEmpty()) {
return new CancelWithRefundResponse(false,
"取消失败:" + sagaResult.failureReason() + ",订单状态已恢复", sagaId);
} else {
// 补偿也失败了,需要人工处理
return new CancelWithRefundResponse(false,
"操作失败且无法自动恢复,请联系客服处理,Saga ID:" + sagaId, sagaId);
}
}
};
}
}3.3 Saga日志持久化(防止宕机后无法恢复)
@Entity
@Table(name = "saga_log")
public class SagaLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String sagaId;
private String stepName;
private String status; // STARTED, COMPLETED, COMPENSATED, COMPENSATION_FAILED
private String result;
@Column(updatable = false)
private LocalDateTime createdAt;
@PrePersist
void onCreate() { this.createdAt = LocalDateTime.now(); }
}
// 宕机恢复:应用启动时检查未完成的Saga
@Component
public class SagaRecovery implements ApplicationRunner {
@Autowired
private SagaLogRepository sagaLogRepository;
@Autowired
private SagaCoordinator sagaCoordinator;
@Override
public void run(ApplicationArguments args) {
// 查找所有STARTED但没有COMPLETED或COMPENSATED结果的Saga
List<String> incompleteSagas = sagaLogRepository.findIncompleteSagas();
for (String sagaId : incompleteSagas) {
log.warn("[Saga] Found incomplete saga: {}, attempting recovery", sagaId);
// 发送告警,人工检查
alertService.sendAlert("Incomplete saga found after restart: " + sagaId);
}
}
}四、踩坑实录
坑1:补偿操作也可能失败
Saga最棘手的情况:补偿操作本身失败了,这时数据处于"部分回滚"状态。
策略:补偿失败时立即告警,并把Saga状态标记为NEEDS_MANUAL_INTERVENTION。同时在告警里包含足够的信息供运维人工处理。
坑2:读操作不需要补偿
并不是所有工具都需要Saga。只有有副作用的写操作才需要补偿机制。查询类工具不需要Saga:
// 不需要Saga的工具
@Bean
public Function<QueryRequest, QueryResponse> queryOrder() { ... } // 只读
// 需要Saga的工具(有写操作)
@Bean
public Function<CancelRequest, CancelResponse> cancelOrder() { ... } // 写操作坑3:外部系统调用的补偿更难
如果工具调用的是外部系统(支付宝退款、物流取消等),补偿操作需要调用外部系统的"撤销"接口。外部接口不一定提供撤销能力,或者撤销有时间限制。
设计时要预先调研每个外部操作的可逆性,不可逆的操作要在执行前加确认步骤。
坑4:Saga的幂等性
Saga步骤和补偿步骤都可能因为重试而执行多次,必须保证幂等:
// 取消订单前先检查状态
public CancelOrderResult cancel(String orderId, String reason) {
Order order = orderRepository.findById(orderId).orElseThrow();
// 幂等:如果已经是取消状态,直接返回成功
if (order.getStatus() == OrderStatus.CANCELLED) {
return new CancelOrderResult(orderId, "ALREADY_CANCELLED");
}
order.setStatus(OrderStatus.CANCELLED);
order.setCancelReason(reason);
order.setCancelTime(LocalDateTime.now());
orderRepository.save(order);
return new CancelOrderResult(orderId, "CANCELLED");
}五、总结与延伸
Function Call与事务的核心结论:
| 场景 | 策略 |
|---|---|
| 单个工具,单个DB操作 | 工具内部使用@Transactional |
| 单个工具,跨多个DB操作 | 工具内部使用分布式事务或Saga |
| 多个工具,顺序执行有依赖 | 外部Saga协调器 |
| 多个工具,可以并行 | 各自独立事务,结果聚合 |
Saga模式不是万能的,它带来了额外的复杂性。对于简单场景,在工具内部做好事务管理就够了。只有当多个工具之间有原子性需求时,才需要Saga。
下一篇是第500期特别纪念篇,聊聊我这三年从Java后端到AI工程师的经历。
