第1754篇:分布式事务在AI工作流中的处理——Saga模式与补偿机制
第1754篇:分布式事务在AI工作流中的处理——Saga模式与补偿机制
做AI系统之前,我以为分布式事务是个"传统后端"的问题,跟AI关系不大。
直到有一次,我们的AI文档处理工作流出了事故——用户支付了积分,文档上传成功,向量化也完成了,但最后一步入库时数据库挂了。结果用户积分扣了,但文档没有成功导入系统。用户发现问题来投诉,我们排查了好久才把这几步的状态对齐。
那次事故让我深刻认识到:AI工作流本质上是分布式事务问题。一个AI流水线往往要跨越多个服务——用户服务、文件存储、AI推理服务、向量数据库、业务数据库——任何一个环节失败,都可能造成数据不一致。
今天来讲Saga模式在AI工作流中的实战应用,这是目前公认最适合长事务场景的解决方案。
一、AI工作流为什么不能用传统分布式事务
先把背景说清楚,为什么不能用2PC(两阶段提交)。
传统2PC的核心问题是同步阻塞。在Prepare阶段,所有参与者都持有锁等待协调者决定。对于AI推理来说,一次推理可能耗时10-30秒,持锁等待30秒,整个系统都会被拖死。
而且2PC要求所有参与方都支持XA协议,向量数据库Milvus、AI推理服务根本不支持XA,硬用2PC根本行不通。
Saga模式的思路完全不同:
- 把一个长事务拆分为一系列本地事务(Saga步骤)
- 每个本地事务成功后立即提交,不等待其他步骤
- 如果某步骤失败,通过执行补偿事务来回滚已完成的步骤
- 整个过程是最终一致的,不是强一致的
这个思路非常适合AI工作流:每个AI处理步骤都是独立的,成功了就提交,失败了执行补偿。
二、Saga的两种实现方式
Saga有两种实现风格,选哪种对系统架构影响很大。
编排式(Orchestration):有一个中心化的协调器(Orchestrator)来指挥每个步骤的执行和补偿。协调器知道完整的流程,负责决定什么时候调用哪个服务。
事件驱动式(Choreography):没有中心协调器,每个服务监听事件,收到事件后执行自己的步骤,然后发布新事件通知下一个服务。服务之间通过事件自发协作。
对于AI工作流,我更倾向于编排式,原因有三:
- AI流水线往往是线性的,编排式的流程更清晰,排障也容易
- 补偿逻辑在协调器里集中管理,不用在各服务间追踪
- 流程变更时只改协调器,不用改各个服务
但如果你的AI工作流很复杂,各步骤之间有复杂的依赖关系,事件驱动式反而更灵活。
三、实战:AI文档处理的Saga实现
以"用户上传文档→扣积分→解析文档→向量化→入知识库"这个典型流程为例。
先定义Saga的状态和步骤:
@Data
@Builder
@Entity
@Table(name = "doc_process_saga")
public class DocProcessSaga {
@Id
private String sagaId;
private String userId;
private String documentId;
private String documentUrl;
@Enumerated(EnumType.STRING)
private SagaStatus status;
@Enumerated(EnumType.STRING)
private SagaStep currentStep;
private String failureReason;
private int retryCount;
@Type(JsonType.class)
@Column(columnDefinition = "json")
private Map<String, Object> stepResults; // 各步骤的输出,补偿时需要用到
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
public enum SagaStatus {
STARTED, RUNNING, COMPLETED, COMPENSATING, COMPENSATED, FAILED
}
public enum SagaStep {
DEDUCT_CREDITS, // 扣积分
PARSE_DOCUMENT, // 解析文档
VECTORIZE_CONTENT, // 向量化
SAVE_TO_KNOWLEDGE_BASE, // 入知识库
NOTIFY_USER, // 通知用户
// 补偿步骤
REFUND_CREDITS, // 退积分
DELETE_PARSED_DATA, // 删除解析结果
DELETE_VECTORS, // 删除向量
DELETE_KB_ENTRY // 删除知识库条目
}
}Saga编排器的核心实现:
@Service
@Slf4j
@Transactional
public class DocProcessSagaOrchestrator {
private final DocProcessSagaRepository sagaRepo;
private final CreditsService creditsService;
private final DocumentParserService parserService;
private final VectorizationService vectorService;
private final KnowledgeBaseService kbService;
private final NotificationService notificationService;
private final SagaEventPublisher eventPublisher;
/**
* 启动Saga
*/
public String startSaga(String userId, String documentId, String documentUrl) {
DocProcessSaga saga = DocProcessSaga.builder()
.sagaId(UUID.randomUUID().toString())
.userId(userId)
.documentId(documentId)
.documentUrl(documentUrl)
.status(DocProcessSaga.SagaStatus.STARTED)
.currentStep(DocProcessSaga.SagaStep.DEDUCT_CREDITS)
.stepResults(new HashMap<>())
.retryCount(0)
.createdAt(LocalDateTime.now())
.updatedAt(LocalDateTime.now())
.build();
sagaRepo.save(saga);
// 发起第一步
eventPublisher.publishStepEvent(saga.getSagaId(),
DocProcessSaga.SagaStep.DEDUCT_CREDITS);
log.info("Started doc process saga: {} for user: {}", saga.getSagaId(), userId);
return saga.getSagaId();
}
/**
* 执行下一步
*/
public void executeNextStep(String sagaId, DocProcessSaga.SagaStep completedStep,
Object stepResult) {
DocProcessSaga saga = sagaRepo.findById(sagaId)
.orElseThrow(() -> new SagaNotFoundException(sagaId));
// 保存当前步骤的结果(补偿时可能需要)
saga.getStepResults().put(completedStep.name(), stepResult);
saga.setUpdatedAt(LocalDateTime.now());
// 确定下一步
DocProcessSaga.SagaStep nextStep = getNextStep(completedStep);
if (nextStep == null) {
// 所有步骤完成
saga.setStatus(DocProcessSaga.SagaStatus.COMPLETED);
sagaRepo.save(saga);
log.info("Saga {} completed successfully", sagaId);
return;
}
saga.setCurrentStep(nextStep);
saga.setStatus(DocProcessSaga.SagaStatus.RUNNING);
sagaRepo.save(saga);
// 触发下一步
eventPublisher.publishStepEvent(sagaId, nextStep);
}
/**
* 处理步骤失败,触发补偿
*/
public void handleStepFailure(String sagaId, DocProcessSaga.SagaStep failedStep,
String reason) {
DocProcessSaga saga = sagaRepo.findById(sagaId)
.orElseThrow(() -> new SagaNotFoundException(sagaId));
log.error("Saga {} step {} failed: {}", sagaId, failedStep, reason);
saga.setStatus(DocProcessSaga.SagaStatus.COMPENSATING);
saga.setFailureReason(reason);
saga.setUpdatedAt(LocalDateTime.now());
sagaRepo.save(saga);
// 触发补偿
startCompensation(saga, failedStep);
}
private void startCompensation(DocProcessSaga saga,
DocProcessSaga.SagaStep failedStep) {
// 找出需要补偿的步骤(已成功执行的步骤,逆序补偿)
List<DocProcessSaga.SagaStep> completedSteps = getCompletedSteps(failedStep);
if (completedSteps.isEmpty()) {
saga.setStatus(DocProcessSaga.SagaStatus.COMPENSATED);
sagaRepo.save(saga);
return;
}
// 从最后完成的步骤开始逆序补偿
DocProcessSaga.SagaStep firstCompensationStep =
getCompensationStep(completedSteps.get(completedSteps.size() - 1));
eventPublisher.publishCompensationEvent(saga.getSagaId(), firstCompensationStep);
}
/**
* 执行补偿
*/
public void executeCompensation(String sagaId,
DocProcessSaga.SagaStep compensationStep) {
DocProcessSaga saga = sagaRepo.findById(sagaId)
.orElseThrow(() -> new SagaNotFoundException(sagaId));
log.info("Executing compensation step {} for saga {}", compensationStep, sagaId);
try {
switch (compensationStep) {
case REFUND_CREDITS -> {
Integer deductedCredits = (Integer) saga.getStepResults()
.get(DocProcessSaga.SagaStep.DEDUCT_CREDITS.name());
if (deductedCredits != null) {
creditsService.refundCredits(saga.getUserId(), deductedCredits,
"AI文档处理失败退款,Saga:" + sagaId);
}
}
case DELETE_PARSED_DATA -> {
String parsedDataId = (String) saga.getStepResults()
.get(DocProcessSaga.SagaStep.PARSE_DOCUMENT.name());
if (parsedDataId != null) {
parserService.deleteParsedData(parsedDataId);
}
}
case DELETE_VECTORS -> {
String vectorCollectionId = (String) saga.getStepResults()
.get(DocProcessSaga.SagaStep.VECTORIZE_CONTENT.name());
if (vectorCollectionId != null) {
vectorService.deleteVectors(vectorCollectionId, saga.getDocumentId());
}
}
case DELETE_KB_ENTRY -> {
String kbEntryId = (String) saga.getStepResults()
.get(DocProcessSaga.SagaStep.SAVE_TO_KNOWLEDGE_BASE.name());
if (kbEntryId != null) {
kbService.deleteEntry(kbEntryId);
}
}
}
// 补偿成功,执行下一个补偿步骤
DocProcessSaga.SagaStep nextCompensation = getNextCompensationStep(compensationStep);
if (nextCompensation != null) {
eventPublisher.publishCompensationEvent(sagaId, nextCompensation);
} else {
// 所有补偿完成
saga.setStatus(DocProcessSaga.SagaStatus.COMPENSATED);
saga.setUpdatedAt(LocalDateTime.now());
sagaRepo.save(saga);
// 通知用户处理失败
notificationService.notifyDocProcessFailed(
saga.getUserId(), saga.getDocumentId(), saga.getFailureReason());
log.info("Saga {} compensation completed", sagaId);
}
} catch (Exception e) {
log.error("Compensation step {} failed for saga {}: {}",
compensationStep, sagaId, e.getMessage());
// 补偿失败需要人工介入
saga.setStatus(DocProcessSaga.SagaStatus.FAILED);
saga.setFailureReason("Compensation failed: " + e.getMessage());
sagaRepo.save(saga);
// 告警通知运维
alertService.sendCompensationFailedAlert(sagaId, compensationStep, e);
}
}
private DocProcessSaga.SagaStep getNextStep(DocProcessSaga.SagaStep current) {
return switch (current) {
case DEDUCT_CREDITS -> DocProcessSaga.SagaStep.PARSE_DOCUMENT;
case PARSE_DOCUMENT -> DocProcessSaga.SagaStep.VECTORIZE_CONTENT;
case VECTORIZE_CONTENT -> DocProcessSaga.SagaStep.SAVE_TO_KNOWLEDGE_BASE;
case SAVE_TO_KNOWLEDGE_BASE -> DocProcessSaga.SagaStep.NOTIFY_USER;
case NOTIFY_USER -> null; // 最后一步
default -> null;
};
}
private DocProcessSaga.SagaStep getCompensationStep(DocProcessSaga.SagaStep forwardStep) {
return switch (forwardStep) {
case DEDUCT_CREDITS -> DocProcessSaga.SagaStep.REFUND_CREDITS;
case PARSE_DOCUMENT -> DocProcessSaga.SagaStep.DELETE_PARSED_DATA;
case VECTORIZE_CONTENT -> DocProcessSaga.SagaStep.DELETE_VECTORS;
case SAVE_TO_KNOWLEDGE_BASE -> DocProcessSaga.SagaStep.DELETE_KB_ENTRY;
default -> null;
};
}
}四、幂等性:Saga的隐性前提
Saga模式有个常被忽视的前提:所有步骤和补偿步骤必须是幂等的。
因为消息可能重复投递,网络故障时步骤可能被重复执行。如果不保证幂等,补偿本来是要退积分的,结果退了两次,更糟。
幂等实现的通用做法:
@Service
@Slf4j
public class CreditsService {
private final CreditsRepository creditsRepo;
private final CreditsTransactionRepository transactionRepo;
/**
* 扣积分 - 幂等实现
* @param sagaId 用于幂等key
*/
public int deductCredits(String userId, int amount, String sagaId) {
String idempotencyKey = "deduct:" + sagaId;
// 检查是否已经执行过
Optional<CreditsTransaction> existing =
transactionRepo.findByIdempotencyKey(idempotencyKey);
if (existing.isPresent()) {
log.info("Deduct credits already executed for sagaId: {}", sagaId);
return existing.get().getAmount();
}
// 执行扣积分
UserCredits credits = creditsRepo.findByUserId(userId)
.orElseThrow(() -> new InsufficientCreditsException(userId));
if (credits.getBalance() < amount) {
throw new InsufficientCreditsException(
"Insufficient credits: " + credits.getBalance() + " < " + amount);
}
credits.setBalance(credits.getBalance() - amount);
creditsRepo.save(credits);
// 记录事务,保证幂等
CreditsTransaction transaction = CreditsTransaction.builder()
.idempotencyKey(idempotencyKey)
.userId(userId)
.amount(amount)
.type(TransactionType.DEDUCT)
.sagaId(sagaId)
.createdAt(LocalDateTime.now())
.build();
transactionRepo.save(transaction);
return amount;
}
/**
* 退积分 - 幂等实现
*/
public void refundCredits(String userId, int amount, String reason) {
String idempotencyKey = "refund:" + reason; // reason里包含sagaId
if (transactionRepo.existsByIdempotencyKey(idempotencyKey)) {
log.info("Refund credits already executed: {}", idempotencyKey);
return;
}
UserCredits credits = creditsRepo.findByUserId(userId)
.orElseThrow();
credits.setBalance(credits.getBalance() + amount);
creditsRepo.save(credits);
CreditsTransaction transaction = CreditsTransaction.builder()
.idempotencyKey(idempotencyKey)
.userId(userId)
.amount(amount)
.type(TransactionType.REFUND)
.createdAt(LocalDateTime.now())
.build();
transactionRepo.save(transaction);
}
}五、超时与重试:处理AI推理的不确定性
AI推理的特殊之处在于耗时不确定——有时几秒,有时几十秒,有时直接超时。Saga需要专门处理这种情况。
@Component
@Slf4j
public class SagaStepExecutor {
private final DocProcessSagaOrchestrator orchestrator;
private final RetryTemplate retryTemplate;
@PostConstruct
public void initRetryTemplate() {
retryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000, 2.0, 30000) // 1s, 2s, 4s...最大30s
.retryOn(AIServiceTemporaryException.class)
.retryOn(VectorDBTemporaryException.class)
.notRetryOn(InvalidInputException.class)
.notRetryOn(InsufficientCreditsException.class)
.build();
}
@EventListener
public void onSagaStepEvent(SagaStepEvent event) {
String sagaId = event.getSagaId();
DocProcessSaga.SagaStep step = event.getStep();
try {
retryTemplate.execute(context -> {
if (context.getRetryCount() > 0) {
log.info("Retrying saga {} step {} (attempt {})",
sagaId, step, context.getRetryCount() + 1);
}
executeStep(sagaId, step);
return null;
}, context -> {
// 重试耗尽后的处理
Throwable lastException = context.getLastThrowable();
orchestrator.handleStepFailure(sagaId, step,
"Max retries exceeded: " + lastException.getMessage());
return null;
});
} catch (Exception e) {
orchestrator.handleStepFailure(sagaId, step, e.getMessage());
}
}
private void executeStep(String sagaId, DocProcessSaga.SagaStep step) {
DocProcessSaga saga = sagaRepo.findById(sagaId).orElseThrow();
switch (step) {
case DEDUCT_CREDITS -> {
int deducted = creditsService.deductCredits(
saga.getUserId(),
calculateRequiredCredits(saga.getDocumentUrl()),
sagaId
);
orchestrator.executeNextStep(sagaId, step, deducted);
}
case PARSE_DOCUMENT -> {
// AI解析文档,可能耗时较长
ParseResult result = documentParserService.parse(
saga.getDocumentUrl(),
ParseOptions.builder()
.extractTables(true)
.extractImages(false)
.timeout(Duration.ofMinutes(5))
.build()
);
orchestrator.executeNextStep(sagaId, step, result.getParsedDataId());
}
case VECTORIZE_CONTENT -> {
String parsedDataId = (String) saga.getStepResults()
.get(DocProcessSaga.SagaStep.PARSE_DOCUMENT.name());
VectorizeResult result = vectorService.vectorize(
parsedDataId, saga.getDocumentId(), saga.getUserId());
orchestrator.executeNextStep(sagaId, step, result.getCollectionId());
}
case SAVE_TO_KNOWLEDGE_BASE -> {
String vectorCollectionId = (String) saga.getStepResults()
.get(DocProcessSaga.SagaStep.VECTORIZE_CONTENT.name());
KBEntry entry = kbService.addDocument(
saga.getUserId(),
saga.getDocumentId(),
vectorCollectionId
);
orchestrator.executeNextStep(sagaId, step, entry.getId());
}
case NOTIFY_USER -> {
notificationService.notifyDocProcessCompleted(
saga.getUserId(), saga.getDocumentId());
orchestrator.executeNextStep(sagaId, step, "notified");
}
}
}
}六、悬挂事务:一个必须处理的边界情况
Saga有个经典的边界问题叫"悬挂事务"(Suspend Transaction)。
场景:协调器发出步骤A的命令,网络超时。协调器以为步骤A失败,触发补偿,发出"撤销A"的命令。但实际上步骤A的命令网络延迟了,后来才到达服务,执行成功了。结果:A执行了,但补偿"撤销A"也执行了,然后A的结果又被创建出来,留在那里没被清理。
解决方案是在每个服务里维护一个"防悬挂"记录表:
@Service
public class SagaTransactionGuard {
private final SagaTransactionGuardRepository guardRepo;
/**
* 在执行正向步骤前调用
* 如果已经存在补偿记录,说明这是一个悬挂事务,拒绝执行
*/
public boolean canExecuteForwardStep(String sagaId, String stepName) {
// 检查是否已经有对应的补偿记录
boolean compensationExists = guardRepo.existsBySagaIdAndStepNameAndType(
sagaId, stepName, TransactionType.COMPENSATION);
if (compensationExists) {
log.warn("Suspended transaction detected: sagaId={}, step={}",
sagaId, stepName);
return false;
}
// 记录正向步骤开始
guardRepo.save(SagaTransactionGuardRecord.builder()
.sagaId(sagaId)
.stepName(stepName)
.type(TransactionType.FORWARD)
.createdAt(LocalDateTime.now())
.build());
return true;
}
/**
* 在执行补偿步骤前调用
* 如果正向步骤还没有执行记录,说明是空补偿,直接返回成功
*/
public boolean shouldExecuteCompensation(String sagaId, String stepName) {
boolean forwardExecuted = guardRepo.existsBySagaIdAndStepNameAndType(
sagaId, stepName, TransactionType.FORWARD);
if (!forwardExecuted) {
log.info("Empty compensation: forward step not executed, sagaId={}, step={}",
sagaId, stepName);
return false;
}
// 记录补偿记录(防止后续悬挂的正向步骤执行)
guardRepo.save(SagaTransactionGuardRecord.builder()
.sagaId(sagaId)
.stepName(stepName)
.type(TransactionType.COMPENSATION)
.createdAt(LocalDateTime.now())
.build());
return true;
}
}七、Saga的监控与状态查询
Saga运行时的可观测性非常重要,特别是对长时间运行的AI工作流。
@RestController
@RequestMapping("/api/saga")
public class SagaMonitorController {
private final DocProcessSagaRepository sagaRepo;
@GetMapping("/{sagaId}/status")
public ResponseEntity<SagaStatusResponse> getSagaStatus(@PathVariable String sagaId) {
DocProcessSaga saga = sagaRepo.findById(sagaId)
.orElseThrow(() -> new NotFoundException("Saga not found: " + sagaId));
return ResponseEntity.ok(SagaStatusResponse.builder()
.sagaId(sagaId)
.status(saga.getStatus())
.currentStep(saga.getCurrentStep())
.completedSteps(getCompletedSteps(saga))
.failureReason(saga.getFailureReason())
.elapsedTime(calculateElapsedTime(saga))
.estimatedRemainingTime(estimateRemainingTime(saga))
.build());
}
// 查找需要人工介入的失败Saga
@GetMapping("/failed")
public ResponseEntity<Page<DocProcessSaga>> getFailedSagas(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
Pageable pageable = PageRequest.of(page, size,
Sort.by("updatedAt").descending());
Page<DocProcessSaga> failedSagas = sagaRepo.findByStatus(
DocProcessSaga.SagaStatus.FAILED, pageable);
return ResponseEntity.ok(failedSagas);
}
// 手动触发补偿(用于人工介入)
@PostMapping("/{sagaId}/compensate")
@PreAuthorize("hasRole('ADMIN')")
public ResponseEntity<Void> manualCompensate(@PathVariable String sagaId) {
DocProcessSaga saga = sagaRepo.findById(sagaId).orElseThrow();
orchestrator.handleStepFailure(sagaId, saga.getCurrentStep(),
"Manual compensation triggered by admin");
return ResponseEntity.ok().build();
}
}八、踩坑总结
坑一:补偿事务的幂等性经常被遗忘
我见过很多人写Saga,正向步骤做了幂等,补偿步骤忘了做。结果补偿消息重复投递时,退了两次积分。记住:每一步,无论正向还是补偿,都必须是幂等的,没有例外。
坑二:补偿步骤里直接调用远程服务没做超时
AI推理服务已经挂了(这才触发了补偿),补偿的时候还要删向量数据,如果这个删除操作没设超时,补偿步骤就会一直卡在那里。给所有远程调用加上合理的超时,是基本要求。
坑三:Saga状态表没做索引优化
Saga表经常要按状态查询(查运行中的、查失败的),如果没有在status和updatedAt上建联合索引,随着数据量增长,监控查询会越来越慢。这个坑早发现早治。
坑四:把Saga设计得太细
有的同学把Saga拆得特别细,每个服务调用都是一个步骤,结果一个简单流程有十几个步骤,协调器的代码维护起来很痛苦,而且每个步骤都要写补偿,工作量翻倍。
合理的粒度:一个步骤对应一个有意义的业务操作,而不是一次技术调用。
坑五:没有考虑Saga运行超时
AI工作流有时会被阻塞——比如向量化服务的队列爆了,Saga就一直在RUNNING状态。需要有一个定时任务扫描超时的Saga:
@Scheduled(fixedDelay = 60_000)
public void checkSagaTimeouts() {
LocalDateTime timeout = LocalDateTime.now().minusHours(1);
List<DocProcessSaga> stuck = sagaRepo.findByStatusAndUpdatedAtBefore(
DocProcessSaga.SagaStatus.RUNNING, timeout);
for (DocProcessSaga saga : stuck) {
log.warn("Saga {} stuck in RUNNING state for over 1 hour", saga.getSagaId());
orchestrator.handleStepFailure(saga.getSagaId(), saga.getCurrentStep(),
"Saga timeout: stuck for over 1 hour");
}
}九、整体流程图
十、小结
Saga模式在AI工作流里的应用,核心思想和传统分布式事务没有本质区别,但有几个AI特有的细节需要额外关注:
一是AI步骤的耗时不确定性,需要设计专门的超时和重试策略;二是补偿逻辑的完备性,AI操作有时有副作用(比如已经向用户展示了中间结果),补偿时需要一并处理;三是状态可观测性,长时间运行的Saga必须有完善的监控,否则出了问题根本不知道。
这套方案在我们系统里跑了大半年,线上至今没有发生积分和文档状态不一致的问题。投入回报比相当高。
