第2308篇:AI系统的幂等性设计进阶——分布式AI任务的精确一次语义
第2308篇:AI系统的幂等性设计进阶——分布式AI任务的精确一次语义
适读人群:AI平台工程师、分布式系统架构师 | 阅读时长:约18分钟 | 核心价值:系统掌握分布式AI任务的幂等性设计方法,彻底解决重试导致的重复执行问题
去年双十一前夕,我们的AI内容生成服务出了一个很尴尬的问题:因为消息队列的重试机制,同一个商品描述生成任务被执行了三次。三次调用了三次GPT-4,产生了三份不同的描述,写入了三条数据库记录,发送了三封"您的商品描述已生成"通知邮件。客户打电话来问:你们系统是不是有bug,我的商品怎么有三条描述?
更要命的是,三次LLM调用产生了三份不同的结果——同一个商品,三种不同的卖点描述,我们不知道该用哪个。
这个问题的根源是AI任务天然不幂等:同样的输入,每次调用LLM可能返回不同的输出。
AI任务幂等性的独特挑战
传统后端服务的幂等性相对好处理:数据库操作用唯一键去重,HTTP请求用幂等令牌。但AI任务有三个特殊性:
结果非确定性:同样的Prompt每次调用LLM可能返回不同内容。即使用了低temperature,也不保证完全相同。
操作链路长:一个AI任务可能包括检索、调用LLM、后处理、持久化、通知等多个步骤,任何步骤失败都可能触发重试。
副作用多样:AI任务的副作用不只是数据库写入,还有对外API调用、文件生成、消息推送等,每种副作用的去重方式不同。
幂等键的设计
幂等键是整个设计的基础。设计原则:相同语义的请求必须产生相同的幂等键。
/**
* 幂等键生成策略接口
*/
public interface IdempotencyKeyStrategy {
String generate(AITaskRequest request);
}
/**
* 基于内容哈希的幂等键策略
* 适用于"相同内容的任务只执行一次"的场景
*/
@Component
public class ContentHashIdempotencyKeyStrategy implements IdempotencyKeyStrategy {
@Override
public String generate(AITaskRequest request) {
// 提取关键业务字段,排除元数据(如requestTime、requesterId)
String keyContent = buildKeyContent(request);
return request.taskType() + "_" + sha256(keyContent);
}
private String buildKeyContent(AITaskRequest request) {
// 排序Map的key,确保字段顺序不影响哈希
TreeMap<String, Object> sortedParams = new TreeMap<>(request.businessParams());
return sortedParams.toString();
}
private String sha256(String content) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(content.getBytes(StandardCharsets.UTF_8));
return HexFormat.of().formatHex(hash).substring(0, 16); // 取前16位
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}
/**
* 基于业务ID的幂等键策略
* 适用于"每个业务对象只生成一次"的场景
*/
@Component
public class BusinessIdIdempotencyKeyStrategy implements IdempotencyKeyStrategy {
@Override
public String generate(AITaskRequest request) {
// 格式:taskType_tenantId_businessId
return String.join("_",
request.taskType(),
request.tenantId(),
request.businessId()
);
}
}幂等任务记录表设计
/**
* 幂等任务记录的持久化实体
*/
@Entity
@Table(name = "ai_idempotent_tasks",
indexes = {
@Index(name = "idx_idempotency_key", columnList = "idempotency_key", unique = true)
}
)
public class IdempotentTaskRecord {
@Id
@GeneratedValue
private Long id;
@Column(name = "idempotency_key", nullable = false, unique = true, length = 128)
private String idempotencyKey;
@Column(name = "task_type", nullable = false)
private String taskType;
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private TaskStatus status;
@Column(name = "result_ref") // 结果存储的引用(如S3路径或数据库ID)
private String resultRef;
@Column(name = "result_payload", columnDefinition = "TEXT") // 小结果直接存
private String resultPayload;
@Column(name = "error_message", columnDefinition = "TEXT")
private String errorMessage;
@Column(name = "attempt_count")
private int attemptCount = 0;
@Column(name = "created_at")
private Instant createdAt;
@Column(name = "completed_at")
private Instant completedAt;
@Column(name = "expires_at") // 幂等记录过期时间(过期后允许重新执行)
private Instant expiresAt;
@Version // 乐观锁,防并发问题
private Long version;
public enum TaskStatus {
PENDING, // 任务已创建,等待执行
RUNNING, // 正在执行
COMPLETED, // 执行完成
FAILED // 执行失败(允许重试)
}
}幂等执行器的核心逻辑
@Service
public class IdempotentAITaskExecutor {
private final IdempotentTaskRepository taskRepository;
private final AITaskProcessor taskProcessor;
private final ResultStorage resultStorage;
/**
* 幂等执行AI任务
* 保证:相同幂等键的任务,只会被成功执行一次
*/
@Transactional
public AITaskResult execute(AITaskRequest request) {
String idempotencyKey = request.idempotencyKey();
// 步骤1:查询是否已有记录
Optional<IdempotentTaskRecord> existingRecord =
taskRepository.findByIdempotencyKey(idempotencyKey);
if (existingRecord.isPresent()) {
return handleExistingTask(existingRecord.get(), request);
}
// 步骤2:尝试创建新记录(INSERT,如果冲突说明并发创建)
IdempotentTaskRecord record = createNewRecord(request, idempotencyKey);
try {
taskRepository.save(record);
} catch (DataIntegrityViolationException e) {
// 并发场景:另一个线程刚好也在创建,等一下再查
return handleRaceCondition(idempotencyKey, request);
}
// 步骤3:执行任务
return doExecuteTask(record, request);
}
private AITaskResult handleExistingTask(IdempotentTaskRecord record,
AITaskRequest request) {
return switch (record.getStatus()) {
case COMPLETED -> {
// 已完成,直接返回已有结果
log.info("幂等命中,返回已有结果: key={}", record.getIdempotencyKey());
yield loadResult(record);
}
case RUNNING -> {
// 正在执行,等待或返回"处理中"状态
log.info("任务正在执行中: key={}", record.getIdempotencyKey());
yield AITaskResult.inProgress(record.getIdempotencyKey());
}
case FAILED -> {
// 之前失败过,允许重试
log.info("之前执行失败,允许重试: key={}, attempts={}",
record.getIdempotencyKey(), record.getAttemptCount());
yield retryTask(record, request);
}
case PENDING -> {
// 创建后还没开始执行(可能Worker挂了),尝试接管
yield retryTask(record, request);
}
};
}
private AITaskResult doExecuteTask(IdempotentTaskRecord record, AITaskRequest request) {
// 更新状态为RUNNING
record.setStatus(IdempotentTaskRecord.TaskStatus.RUNNING);
record.setAttemptCount(record.getAttemptCount() + 1);
taskRepository.save(record);
try {
// 执行实际的AI任务
AITaskResult result = taskProcessor.process(request);
// 持久化结果
String resultRef = resultStorage.store(record.getIdempotencyKey(), result);
// 标记完成
record.setStatus(IdempotentTaskRecord.TaskStatus.COMPLETED);
record.setResultRef(resultRef);
record.setResultPayload(result.isSmall() ? result.toJson() : null);
record.setCompletedAt(Instant.now());
taskRepository.save(record);
return result;
} catch (Exception e) {
// 标记失败
record.setStatus(IdempotentTaskRecord.TaskStatus.FAILED);
record.setErrorMessage(e.getMessage());
taskRepository.save(record);
throw new AITaskExecutionException("AI任务执行失败: " + e.getMessage(), e);
}
}
private AITaskResult retryTask(IdempotentTaskRecord record, AITaskRequest request) {
// 最大重试次数检查
if (record.getAttemptCount() >= 3) {
return AITaskResult.permanentlyFailed(
record.getIdempotencyKey(), "超过最大重试次数"
);
}
return doExecuteTask(record, request);
}
private AITaskResult handleRaceCondition(String idempotencyKey, AITaskRequest request) {
// 稍等后重新查询
try { Thread.sleep(100); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); }
return taskRepository.findByIdempotencyKey(idempotencyKey)
.map(r -> handleExistingTask(r, request))
.orElseThrow(() -> new IllegalStateException("幂等记录消失了,这不应该发生"));
}
}多步骤AI任务的断点续传
对于包含多个步骤的长任务(如:检索→LLM→后处理→通知),不同步骤有不同的幂等需求:
/**
* 多步骤AI任务的检查点机制
* 每个步骤完成后保存检查点,重试时从最后成功的步骤继续
*/
@Service
public class CheckpointedAITaskExecutor {
private final CheckpointRepository checkpointRepo;
public MultiStepTaskResult execute(String taskId, MultiStepAITask task) {
TaskCheckpoint checkpoint = checkpointRepo.findByTaskId(taskId)
.orElse(new TaskCheckpoint(taskId));
// Step 1: 检索相关上下文
RetrievalResult retrievalResult;
if (checkpoint.hasStep("retrieval")) {
retrievalResult = checkpoint.getStepResult("retrieval", RetrievalResult.class);
log.info("Step retrieval: 从检查点恢复");
} else {
retrievalResult = task.executeRetrieval();
checkpoint.saveStep("retrieval", retrievalResult);
checkpointRepo.save(checkpoint);
}
// Step 2: LLM调用
// 注意:LLM结果要保存,重试时不重新调用(节省成本,保证一致性)
LLMResult llmResult;
if (checkpoint.hasStep("llm_generation")) {
llmResult = checkpoint.getStepResult("llm_generation", LLMResult.class);
log.info("Step llm_generation: 从检查点恢复,节省了一次LLM调用");
} else {
llmResult = task.executeLLM(retrievalResult);
checkpoint.saveStep("llm_generation", llmResult);
checkpointRepo.save(checkpoint);
}
// Step 3: 后处理(如格式化、过滤)
PostProcessedResult postResult;
if (checkpoint.hasStep("post_processing")) {
postResult = checkpoint.getStepResult("post_processing", PostProcessedResult.class);
} else {
postResult = task.executePostProcessing(llmResult);
checkpoint.saveStep("post_processing", postResult);
checkpointRepo.save(checkpoint);
}
// Step 4: 发送通知(特别重要:通知只能发一次)
if (!checkpoint.hasStep("notification_sent")) {
task.sendNotification(postResult);
checkpoint.saveStep("notification_sent", true);
checkpointRepo.save(checkpoint);
}
// 清理检查点
checkpointRepo.delete(checkpoint);
return MultiStepTaskResult.of(retrievalResult, llmResult, postResult);
}
}Kafka消息消费的幂等性
在消息驱动的AI任务系统中,Kafka的至少一次语义意味着消息可能被重复消费:
@KafkaListener(topics = "ai-task-requests", groupId = "ai-task-processors")
public class AITaskKafkaConsumer {
private final IdempotentAITaskExecutor idempotentExecutor;
private final ProcessedMessageTracker messageTracker;
@KafkaHandler
public void handleAITaskRequest(AITaskMessage message,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
// 用partition+offset作为幂等键,Kafka消息级别的去重
String messageKey = partition + "_" + offset;
if (messageTracker.isProcessed(messageKey)) {
log.info("消息已处理,跳过: partition={}, offset={}", partition, offset);
return;
}
try {
AITaskRequest request = message.toRequest();
// 业务级别的幂等执行
idempotentExecutor.execute(request);
// 标记消息已处理
messageTracker.markProcessed(messageKey);
} catch (Exception e) {
log.error("处理AI任务消息失败", e);
// 不标记为已处理,允许重试
throw e;
}
}
}生产经验:精确一次不是银弹
实现了幂等性之后,我们发现了一些新问题:
幂等记录的清理:幂等记录不能无限增长,需要设置过期时间。但过期时间设多久?太短了重复请求会被漏掉,太长了存储占用大。我们的经验是:根据业务场景的"有效重试窗口"来定,大多数场景24小时-7天合适。
结果变更的处理:某些场景下用户明确要求"重新生成"——这时候需要主动失效幂等记录,允许重新执行。设计一个invalidate接口。
部分成功的处理:多步骤任务中,步骤1成功、步骤2失败,重试时步骤1不需要重新执行。检查点机制解决了这个问题,但要注意不同步骤的幂等粒度可能不同。
