第2143篇:AI工作流的幂等性设计——重试、去重与状态一致性的工程实践
第2143篇:AI工作流的幂等性设计——重试、去重与状态一致性的工程实践
适读人群:构建AI工作流和自动化任务的后端工程师 | 阅读时长:约18分钟 | 核心价值:掌握AI任务的幂等性设计,解决重试导致的重复执行、状态不一致和数据污染问题
AI工作流有一个特殊挑战:任务耗时长(LLM调用可能几十秒),中间节点任何一步失败,都需要重试。但重试不加控制会带来严重问题。
我们之前有个自动化批处理任务:对10万条文档做AI分析,分批处理,每批100条。有一次因为网络波动,第300批处理到一半报错,重试时从头处理这一批。结果:这批的前50条被处理了两次,数据库里有重复记录,下游报表数据错了。
查错查了两天,问题根源是:AI任务没有做幂等性设计。
这篇文章讲如何在AI工作流里正确实现幂等性,让重试安全、可靠。
AI工作流的幂等性挑战
/**
* 为什么AI工作流比普通任务更难做幂等性?
*
* ===== 挑战一:AI输出的不确定性 =====
*
* 普通任务幂等性:同一输入,总是产生同一输出
* AI任务:同一输入,输出可能不同
*
* 这不影响幂等性的本质目标:
* 同一任务多次执行,副作用和最终状态一致
* ("任务只需要执行一次",而不是"每次产生一样的输出")
*
* ===== 挑战二:多步骤任务的部分完成 =====
*
* AI任务通常是多步骤的:
* 1. 读取数据
* 2. 调用LLM分析
* 3. 解析结果
* 4. 写入数据库
* 5. 更新状态
*
* 在任意步骤失败,重试需要知道从哪一步继续,
* 而不是从头来(避免步骤1-3被重复执行)
*
* ===== 挑战三:外部副作用 =====
*
* AI任务执行过程中可能有外部副作用:
* - 发送邮件/通知
* - 调用第三方API
* - 修改数据库记录
*
* 这些副作用必须只执行一次,即使任务被重试
*
* ===== 设计原则 =====
*
* 1. 任务ID唯一:同一业务操作,生成唯一taskId
* 2. 状态持久化:每个步骤的状态写数据库,不在内存
* 3. 检查点机制:完成一步就更新状态,重试从检查点继续
* 4. 外部调用去重:调用外部API前,先检查是否已调用
*/任务ID与状态追踪
/**
* 幂等任务基础设施
*
* 任何AI任务在执行前,先注册任务ID,
* 重试时检查任务是否已完成
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class IdempotentTaskRegistry {
private final TaskExecutionRepository executionRepo;
private final RedisTemplate<String, String> redisTemplate;
/**
* 获取或创建任务执行记录
*
* 如果taskId已存在且已完成,直接返回之前的结果
* 如果taskId已存在但进行中,返回当前状态
* 如果taskId不存在,创建新记录,准备执行
*/
public TaskExecutionContext getOrCreate(String taskId, String taskType) {
// 先查Redis(快速路径,已完成的任务缓存在这里)
String cachedStatus = redisTemplate.opsForValue()
.get("task:status:" + taskId);
if ("COMPLETED".equals(cachedStatus)) {
TaskExecution existing = executionRepo.findByTaskId(taskId)
.orElseThrow(() -> new IllegalStateException("Redis有记录但DB无记录"));
return TaskExecutionContext.fromExisting(existing, true);
}
// 查数据库
Optional<TaskExecution> existing = executionRepo.findByTaskId(taskId);
if (existing.isPresent()) {
TaskExecution execution = existing.get();
log.debug("任务已存在: taskId={}, status={}", taskId, execution.getStatus());
return TaskExecutionContext.fromExisting(execution, false);
}
// 创建新记录
TaskExecution newExecution = TaskExecution.builder()
.taskId(taskId)
.taskType(taskType)
.status(TaskStatus.PENDING)
.checkpointStep(0)
.retryCount(0)
.createdAt(LocalDateTime.now())
.build();
try {
executionRepo.save(newExecution);
} catch (DataIntegrityViolationException e) {
// 并发创建,另一个线程抢先创建了,读取它的记录
existing = executionRepo.findByTaskId(taskId);
if (existing.isPresent()) {
return TaskExecutionContext.fromExisting(existing.get(), false);
}
throw e;
}
return TaskExecutionContext.fromNew(newExecution);
}
/**
* 更新检查点(标记某一步完成)
*
* 重试时从最后一个检查点继续
*/
public void updateCheckpoint(String taskId, int completedStep, String stepData) {
executionRepo.updateCheckpoint(taskId, completedStep, stepData, LocalDateTime.now());
log.debug("检查点更新: taskId={}, step={}", taskId, completedStep);
}
/**
* 标记任务完成
*/
public void markCompleted(String taskId, String resultSummary) {
executionRepo.markCompleted(taskId, resultSummary, LocalDateTime.now());
// 写入Redis缓存,快速响应后续重复请求
redisTemplate.opsForValue().set(
"task:status:" + taskId, "COMPLETED", Duration.ofDays(7));
log.info("任务完成: taskId={}", taskId);
}
/**
* 标记任务失败
*/
public void markFailed(String taskId, String errorMessage) {
executionRepo.markFailed(taskId, errorMessage, LocalDateTime.now());
log.warn("任务失败: taskId={}, error={}", taskId, errorMessage);
}
@Builder
@Entity
@Table(name = "task_executions")
public static class TaskExecution {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(unique = true)
private String taskId;
private String taskType;
@Enumerated(EnumType.STRING)
private TaskStatus status;
private int checkpointStep;
private String checkpointData; // JSON,存储检查点的中间数据
private int retryCount;
private String resultSummary;
private String errorMessage;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
private LocalDateTime completedAt;
}
public record TaskExecutionContext(TaskExecution execution,
boolean isAlreadyCompleted,
boolean isNew) {
public static TaskExecutionContext fromExisting(TaskExecution e, boolean completed) {
return new TaskExecutionContext(e, completed, false);
}
public static TaskExecutionContext fromNew(TaskExecution e) {
return new TaskExecutionContext(e, false, true);
}
public int resumeFromStep() { return execution.checkpointStep(); }
}
public enum TaskStatus { PENDING, RUNNING, COMPLETED, FAILED }
}检查点机制实现
/**
* 带检查点的AI任务执行器
*
* 任务被拆成多个步骤,每步完成后保存检查点
* 失败重试时,从上次的检查点继续
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class CheckpointedAiTaskExecutor {
private final IdempotentTaskRegistry registry;
private final ChatLanguageModel llm;
private final ObjectMapper objectMapper;
/**
* 执行带检查点的文档分析任务
*
* 步骤:
* 0 → 1: 获取文档内容
* 1 → 2: LLM分析(提取关键信息)
* 2 → 3: 结构化结果解析
* 3 → 4: 写入数据库
* 4 → 5: 发送完成通知(可选)
*/
public DocumentAnalysisResult analyzeDocument(String documentId, String documentContent) {
// 生成确定性的taskId(基于业务数据,而不是随机ID)
// 确保同一文档的分析任务,无论重试多少次,taskId相同
String taskId = "doc-analysis-" + documentId;
IdempotentTaskRegistry.TaskExecutionContext ctx =
registry.getOrCreate(taskId, "DOCUMENT_ANALYSIS");
// 已经完成,直接返回之前的结果
if (ctx.isAlreadyCompleted()) {
log.info("任务已完成,返回缓存结果: taskId={}", taskId);
return loadCompletedResult(taskId);
}
int startStep = ctx.resumeFromStep();
log.info("开始执行(从步骤{}继续): taskId={}", startStep, taskId);
// 从检查点恢复中间数据
CheckpointData checkpoint = loadCheckpointData(ctx.execution());
try {
// 步骤1:内容预处理(如果之前没做)
String processedContent;
if (startStep < 1) {
processedContent = preprocessContent(documentContent);
checkpoint.processedContent = processedContent;
registry.updateCheckpoint(taskId, 1, serializeCheckpoint(checkpoint));
} else {
processedContent = checkpoint.processedContent;
}
// 步骤2:LLM分析(如果之前没做)
String llmAnalysis;
if (startStep < 2) {
llmAnalysis = callLlmForAnalysis(processedContent);
checkpoint.llmAnalysis = llmAnalysis;
registry.updateCheckpoint(taskId, 2, serializeCheckpoint(checkpoint));
} else {
llmAnalysis = checkpoint.llmAnalysis;
}
// 步骤3:解析结果(如果之前没做)
AnalysisData parsedData;
if (startStep < 3) {
parsedData = parseAnalysisResult(llmAnalysis);
checkpoint.parsedDataJson = objectMapper.writeValueAsString(parsedData);
registry.updateCheckpoint(taskId, 3, serializeCheckpoint(checkpoint));
} else {
parsedData = objectMapper.readValue(
checkpoint.parsedDataJson, AnalysisData.class);
}
// 步骤4:写入数据库(幂等写操作)
if (startStep < 4) {
upsertAnalysisResult(documentId, parsedData); // upsert,不是insert
registry.updateCheckpoint(taskId, 4, serializeCheckpoint(checkpoint));
}
// 任务完成
DocumentAnalysisResult result = new DocumentAnalysisResult(documentId, parsedData);
registry.markCompleted(taskId, "分析完成,关键词: " + parsedData.keywords());
return result;
} catch (Exception e) {
registry.markFailed(taskId, e.getMessage());
throw new RuntimeException("任务执行失败: " + taskId, e);
}
}
/**
* 调用LLM进行分析
*
* 这是最耗时的步骤,也是最容易失败的地方
* 通过检查点确保不会重复调用
*/
private String callLlmForAnalysis(String content) {
String prompt = """
请分析以下文档,提取:
1. 核心主题(1-3个关键词)
2. 主要内容摘要(100字以内)
3. 情感倾向(正面/中性/负面)
4. 重要实体(人名/地名/机构名)
文档内容:
%s
返回JSON格式。
""".formatted(content.substring(0, Math.min(2000, content.length())));
return llm.generate(prompt);
}
private String preprocessContent(String content) {
return content.trim().replaceAll("\\s+", " ");
}
private AnalysisData parseAnalysisResult(String llmOutput) throws Exception {
int start = llmOutput.indexOf('{'); int end = llmOutput.lastIndexOf('}');
String json = (start >= 0 && end > start) ? llmOutput.substring(start, end + 1) : "{}";
return objectMapper.readValue(json, AnalysisData.class);
}
/**
* 幂等写入(upsert)
*
* 多次执行效果一样,不会产生重复数据
*/
private void upsertAnalysisResult(String documentId, AnalysisData data) {
// 使用upsert而不是insert,确保幂等性
// 实现依赖数据库,MySQL: INSERT INTO ... ON DUPLICATE KEY UPDATE ...
// PostgreSQL: INSERT INTO ... ON CONFLICT(document_id) DO UPDATE SET ...
log.debug("幂等写入分析结果: documentId={}", documentId);
}
private CheckpointData loadCheckpointData(IdempotentTaskRegistry.TaskExecution execution) {
if (execution.checkpointData() == null) return new CheckpointData();
try {
return objectMapper.readValue(execution.checkpointData(), CheckpointData.class);
} catch (Exception e) {
return new CheckpointData();
}
}
private String serializeCheckpoint(CheckpointData data) {
try {
return objectMapper.writeValueAsString(data);
} catch (Exception e) {
return "{}";
}
}
private DocumentAnalysisResult loadCompletedResult(String taskId) {
// 从数据库加载之前完成的结果
return new DocumentAnalysisResult(taskId.replace("doc-analysis-", ""), null);
}
// 检查点中间数据(需要跨重试保存的数据)
static class CheckpointData {
public String processedContent;
public String llmAnalysis;
public String parsedDataJson;
}
public record AnalysisData(List<String> keywords, String summary,
String sentiment, List<String> entities) {}
public record DocumentAnalysisResult(String documentId, AnalysisData data) {}
}批量任务的幂等性
/**
* 批量AI任务的幂等性管理
*
* 处理大批量文档时,需要追踪每个文档的处理状态
* 允许中断后从断点继续,不重复处理已完成的文档
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class BatchAiTaskManager {
private final IdempotentTaskRegistry registry;
private final CheckpointedAiTaskExecutor executor;
private final BatchProgressRepository progressRepo;
/**
* 提交批量任务
*
* 生成批次ID,记录所有待处理的文档
*/
public String submitBatch(String batchName, List<String> documentIds) {
String batchId = "batch-" + batchName + "-" +
DigestUtils.md5Hex(String.join(",", documentIds)).substring(0, 8);
// 幂等提交:如果batchId已存在,不重复创建
if (progressRepo.existsByBatchId(batchId)) {
log.info("批次已存在: batchId={}", batchId);
return batchId;
}
// 记录批次信息
BatchProgress progress = BatchProgress.builder()
.batchId(batchId)
.totalCount(documentIds.size())
.processedCount(0)
.status(BatchStatus.PENDING)
.documentIds(String.join(",", documentIds))
.createdAt(LocalDateTime.now())
.build();
progressRepo.save(progress);
log.info("批次已创建: batchId={}, total={}", batchId, documentIds.size());
return batchId;
}
/**
* 执行批次(支持断点续传)
*
* 可以多次调用,每次只处理未完成的文档
*/
public BatchExecutionReport executeBatch(String batchId, String documentContent) {
BatchProgress progress = progressRepo.findByBatchId(batchId)
.orElseThrow(() -> new IllegalArgumentException("批次不存在: " + batchId));
List<String> allDocIds = Arrays.asList(progress.documentIds().split(","));
// 找出已完成的文档
Set<String> completedDocIds = findCompletedDocuments(batchId, allDocIds);
// 只处理未完成的
List<String> pendingDocIds = allDocIds.stream()
.filter(id -> !completedDocIds.contains(id))
.toList();
log.info("批次执行: batchId={}, total={}, pending={}",
batchId, allDocIds.size(), pendingDocIds.size());
int succeeded = 0, failed = 0;
List<String> failedDocIds = new ArrayList<>();
for (String docId : pendingDocIds) {
try {
// 每个文档的处理本身也是幂等的
executor.analyzeDocument(docId, documentContent);
succeeded++;
// 更新批次进度
progressRepo.incrementProcessed(batchId);
} catch (Exception e) {
failed++;
failedDocIds.add(docId);
log.error("文档处理失败: docId={}, error={}", docId, e.getMessage());
}
}
// 所有文档都处理完成(成功或失败)时,标记批次完成
if (succeeded + failed == pendingDocIds.size() && failedDocIds.isEmpty()) {
progressRepo.markCompleted(batchId, LocalDateTime.now());
}
return new BatchExecutionReport(batchId, allDocIds.size(),
completedDocIds.size() + succeeded, failed, failedDocIds);
}
/**
* 找出已完成的文档(通过检查任务注册表)
*/
private Set<String> findCompletedDocuments(String batchId, List<String> docIds) {
return docIds.stream()
.filter(docId -> {
String taskId = "doc-analysis-" + docId;
IdempotentTaskRegistry.TaskExecutionContext ctx =
registry.getOrCreate(taskId, "DOCUMENT_ANALYSIS");
return ctx.isAlreadyCompleted();
})
.collect(Collectors.toSet());
}
@Builder
@Entity
public static class BatchProgress {
@Id private String batchId;
private int totalCount;
private int processedCount;
@Enumerated(EnumType.STRING)
private BatchStatus status;
@Column(columnDefinition = "TEXT")
private String documentIds;
private LocalDateTime createdAt;
private LocalDateTime completedAt;
}
public record BatchExecutionReport(String batchId, int total, int succeeded,
int failed, List<String> failedDocIds) {
public boolean isFullySucceeded() { return failed == 0 && succeeded == total; }
}
enum BatchStatus { PENDING, RUNNING, COMPLETED, PARTIAL_COMPLETED }
}实践建议
任务ID的生成策略是幂等性的基础
任务ID必须是确定性的:同一业务操作,每次生成的ID一样。千万不要用UUID.randomUUID()——每次重试都生成新ID,幂等性就失效了。正确做法:把任务的业务标识组合起来生成ID。比如"对文档X的第一次分析":doc-analysis-{documentId}-v1;"给用户Y发第一次周报":weekly-report-{userId}-{weekNumber}。一旦ID生成策略确定,多次调用只会执行一次。
检查点粒度要平衡:太细浪费I/O,太粗重试代价大
每行代码都存检查点显然不合理;但只在任务最后存也没意义(中间失败了还是要从头来)。实践中,以每个"不可分割的外部调用"为粒度:一次LLM调用是一个检查点,一次数据库写入是一个检查点,一次第三方API调用是一个检查点。对应上面的例子,5个步骤对应5个检查点,每步骤大概1-2次I/O,开销可以接受,重试代价也可控。
幂等性和并发控制要一起考虑
如果同一个任务被并发地从两个线程/实例发起,两个线程都通过了"任务不存在"的检查,然后都开始执行,幂等性就破了。防并发的做法:用数据库的唯一约束保证任务记录的唯一性(我们代码里的DataIntegrityViolationException处理就是这个目的),或者用分布式锁(Redis setnx)保证只有一个实例在执行。两种方式各有适用场景,但都必须考虑进去,不能只设计幂等性不考虑并发。
