第1819篇:消息幂等性与精确一次语义在AI任务队列中的实现
第1819篇:消息幂等性与精确一次语义在AI任务队列中的实现
有一次我们的AI任务队列出了个问题,让我记忆深刻。
那是一个文档处理任务队列,用户上传文档,系统异步做OCR识别和LLM摘要提取,然后写入数据库。某天消费者服务重启了,重启后从最新的offset开始消费,结果把重启前没来得及ack的几百条消息丢了。用户的文档处理请求就这样石沉大海,用户投诉,我们排查了一个下午。
这个事故暴露了一个根本性的问题:我们的AI任务队列既没有幂等性保障,也没有精确一次语义。当时用的"至少一次"配置理论上应该重发,但重发又会导致用户看到重复的处理结果。两边都是坑。
这篇文章就是把消息幂等性和精确一次语义在AI任务队列里的实现彻底说清楚。
先把概念说明白
消息投递语义有三种:
- 至多一次(At Most Once):消息可能丢失,但不会重复。适合允许丢失的场景(如监控日志)
- 至少一次(At Least Once):消息不会丢失,但可能重复。最常见的默认语义
- 精确一次(Exactly Once):消息不丢不重。理想情况,实现最复杂
幂等性:相同的操作执行多次,结果与执行一次相同。
关系:在"至少一次"的消息系统里,通过业务层实现幂等性,可以达到"精确一次"的业务效果(即使底层消息会重复投递)。
注意这里的区分:Kafka Streams的"精确一次"是消费-处理-生产这个原子操作的精确一次;而AI任务队列的精确一次,包含了LLM调用、数据库写入等副作用操作,需要更细粒度的控制。
AI任务队列的特殊挑战
传统消息队列的幂等性相对简单——通常是数据库的唯一约束就能搞定。但AI任务队列复杂得多:
- LLM调用本身不幂等:同一个prompt调两次可能得到不同的结果(有temperature参数)
- LLM调用有成本:重复调用意味着重复花钱
- 处理时间长:LLM调用可能需要几十秒,期间消费者可能超时重启
- 副作用链复杂:一个任务可能涉及多个外部调用(LLM + 数据库 + 文件存储)
幂等性的核心设计
幂等键(Idempotency Key)是整个方案的基础:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AITask {
private String taskId; // 全局唯一任务ID(幂等键)
private String taskType; // 任务类型:SUMMARIZE/TRANSLATE/CLASSIFY
private String inputContent; // 输入内容
private String userId;
private int maxRetries;
private Map<String, String> params;
private long createdAt;
// 幂等键:基于输入内容的哈希,确保相同输入的幂等
public String getIdempotencyKey() {
String contentHash = Hashing.sha256()
.hashString(taskType + ":" + inputContent, StandardCharsets.UTF_8)
.toString();
return taskType + "_" + contentHash.substring(0, 16);
}
}
@Data
@Builder
@Entity
@Table(name = "ai_task_results")
public class AITaskResult {
@Id
private String taskId;
@Column(name = "idempotency_key", unique = true) // 唯一约束!
private String idempotencyKey;
private String status; // PROCESSING / COMPLETED / FAILED
private String result; // LLM返回结果
private String errorMessage;
private int retryCount;
private long processingStartAt;
private long completedAt;
@Version
private long version; // 乐观锁
}消息去重:多层防护
@Service
@Slf4j
public class IdempotentTaskProcessor {
private final AITaskResultRepository taskResultRepo;
private final LLMService llmService;
private final RedisTemplate<String, String> redisTemplate;
// Redis分布式锁用于防止并发处理同一任务
private static final String TASK_LOCK_PREFIX = "task:lock:";
private static final long LOCK_TTL_SECONDS = 300; // 5分钟锁超时
@Transactional
public AITaskResult processTask(AITask task) {
String idempotencyKey = task.getIdempotencyKey();
// === 第一层:数据库去重检查 ===
Optional<AITaskResult> existing = taskResultRepo.findByIdempotencyKey(idempotencyKey);
if (existing.isPresent()) {
AITaskResult result = existing.get();
if ("COMPLETED".equals(result.getStatus())) {
log.info("Task already completed, returning cached result. key: {}", idempotencyKey);
return result; // 直接返回历史结果,不重复处理
}
if ("PROCESSING".equals(result.getStatus())) {
// 正在处理中:检查是否是死锁(处理时间过长)
long processingDuration = System.currentTimeMillis() - result.getProcessingStartAt();
if (processingDuration < LOCK_TTL_SECONDS * 1000) {
log.info("Task is being processed by another instance. key: {}", idempotencyKey);
throw new TaskAlreadyProcessingException("任务正在处理中,请等待");
}
// 超时了,认为上次处理失败,允许重试
log.warn("Task processing timeout detected, allowing retry. key: {}", idempotencyKey);
}
}
// === 第二层:Redis分布式锁(防止并发处理) ===
String lockKey = TASK_LOCK_PREFIX + idempotencyKey;
String lockValue = UUID.randomUUID().toString();
Boolean locked = redisTemplate.opsForValue().setIfAbsent(
lockKey, lockValue, Duration.ofSeconds(LOCK_TTL_SECONDS));
if (!Boolean.TRUE.equals(locked)) {
throw new TaskAlreadyProcessingException("获取任务锁失败,可能已在处理中");
}
try {
return doProcessTask(task, idempotencyKey, lockValue);
} finally {
// 释放锁(仅释放自己持有的锁)
releaseLock(lockKey, lockValue);
}
}
private AITaskResult doProcessTask(AITask task, String idempotencyKey, String lockValue) {
// 创建或更新任务记录为PROCESSING状态
AITaskResult taskResult = taskResultRepo.findByIdempotencyKey(idempotencyKey)
.orElseGet(() -> AITaskResult.builder()
.taskId(task.getTaskId())
.idempotencyKey(idempotencyKey)
.build());
taskResult.setStatus("PROCESSING");
taskResult.setRetryCount(taskResult.getRetryCount() + 1);
taskResult.setProcessingStartAt(System.currentTimeMillis());
taskResultRepo.save(taskResult);
// === 实际处理:调用LLM ===
try {
String llmResult = llmService.process(task);
// 处理成功,更新状态
taskResult.setStatus("COMPLETED");
taskResult.setResult(llmResult);
taskResult.setCompletedAt(System.currentTimeMillis());
taskResultRepo.save(taskResult);
log.info("Task completed successfully. key: {}, duration: {}ms",
idempotencyKey, taskResult.getCompletedAt() - taskResult.getProcessingStartAt());
return taskResult;
} catch (Exception e) {
taskResult.setStatus("FAILED");
taskResult.setErrorMessage(e.getMessage());
taskResultRepo.save(taskResult);
log.error("Task processing failed. key: {}", idempotencyKey, e);
throw new RuntimeException("任务处理失败", e);
}
}
private void releaseLock(String lockKey, String expectedValue) {
// Lua脚本保证原子性:只有值匹配才删除(防止误删别人的锁)
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey),
expectedValue
);
}
}Kafka消费者的精确一次配置
消费者层面也需要正确配置:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, AITask> aiTaskConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ai-task-processor");
// 关键:禁用自动提交!改为手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 每次拉取的消息数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
// 最大处理时间(LLM调用可能很长)
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600_000); // 10分钟
// 会话超时(心跳间隔的3倍以上)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(AITask.class));
}
@Bean
public KafkaListenerContainerFactory<?> aiTaskContainerFactory(
ConsumerFactory<String, AITask> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, AITask> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 手动ack模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// 并发消费者数量(根据处理能力设置)
factory.setConcurrency(3);
// 错误处理
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(5000L, 3) // 间隔5秒,最多重试3次
));
return factory;
}
}消费者实现:手动ACK + 幂等处理
@Service
@Slf4j
public class AITaskConsumer {
private final IdempotentTaskProcessor processor;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(
topics = "ai-tasks",
groupId = "ai-task-processor",
containerFactory = "aiTaskContainerFactory"
)
public void consumeTask(
ConsumerRecord<String, AITask> record,
Acknowledgment ack) {
AITask task = record.value();
log.info("Received task: {}, partition: {}, offset: {}",
task.getTaskId(), record.partition(), record.offset());
try {
// 幂等处理
AITaskResult result = processor.processTask(task);
// 处理成功才ACK
ack.acknowledge();
log.info("Task ACKed: {}", task.getTaskId());
} catch (TaskAlreadyProcessingException e) {
// 任务正在被其他实例处理,不ack(会重新投递,但有幂等性保护)
log.warn("Task is processing elsewhere: {}, will retry", task.getTaskId());
// 故意不ACK,让Kafka重新投递
// 但加一个短暂延迟避免频繁重试
try { Thread.sleep(5000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); }
ack.acknowledge(); // 延迟后ack,等另一个实例完成
} catch (Exception e) {
log.error("Task processing failed: {}", task.getTaskId(), e);
// 判断是否需要重试
if (shouldRetry(task, e)) {
// 不ACK,让Kafka根据重试策略重新投递
// 实际上ErrorHandler会处理重试逻辑
throw e; // 抛出异常,触发ErrorHandler
} else {
// 不可恢复的错误(如数据格式错误),发到DLQ,然后ACK
sendToDeadLetterQueue(task, e);
ack.acknowledge();
}
}
}
private boolean shouldRetry(AITask task, Exception e) {
// 不可重试的错误类型
if (e instanceof IllegalArgumentException) return false;
if (e instanceof DataIntegrityViolationException) return false;
// 超过最大重试次数
if (task.getMaxRetries() <= 0) return false;
// 其他错误(网络超时、LLM API错误等)可以重试
return true;
}
private void sendToDeadLetterQueue(AITask task, Exception e) {
DeadLetterMessage dlm = DeadLetterMessage.builder()
.originalTask(task)
.errorMessage(e.getMessage())
.errorType(e.getClass().getSimpleName())
.failedAt(System.currentTimeMillis())
.build();
kafkaTemplate.send("ai-tasks-dlq", task.getTaskId(), dlm);
log.warn("Task sent to DLQ: {}", task.getTaskId());
}
}Kafka生产者的幂等配置
生产者端也需要配置幂等性,防止网络重试导致重复消息:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> idempotentProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
// 启用生产者幂等性(防止网络重试导致重复消息)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 幂等性要求的配套配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // 幂等性最大值
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试直到成功
// 批量发送配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024); // 16KB批
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 最多等5ms凑批
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024L); // 32MB缓冲
return new DefaultKafkaProducerFactory<>(props,
new StringSerializer(),
new JsonSerializer<>());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template =
new KafkaTemplate<>(idempotentProducerFactory());
// 设置默认topic
template.setDefaultTopic("ai-tasks");
return template;
}
}事务性消息:更强的保证
对于"必须与数据库操作原子" 的场景,需要用Kafka事务:
@Service
@Slf4j
public class TransactionalTaskService {
private final KafkaTemplate<String, Object> transactionalKafkaTemplate;
private final AITaskRepository taskRepository;
/**
* 原子性地:创建数据库记录 + 发送Kafka消息
* 两者要么同时成功,要么同时失败
*
* 注意:这里用的是本地事务 + Kafka事务的组合,
* 无法做到真正的分布式事务,只是尽力保证
*/
@Transactional // 数据库事务
public void createTaskAtomically(CreateTaskRequest request) {
// 1. 先写数据库(在DB事务中)
AITask task = AITask.builder()
.taskId(UUID.randomUUID().toString())
.taskType(request.getTaskType())
.inputContent(request.getContent())
.userId(request.getUserId())
.createdAt(System.currentTimeMillis())
.build();
taskRepository.save(task);
// 2. 发Kafka消息(在DB事务提交前)
// 使用事务性发送,确保消息投递与DB写入的顺序性
transactionalKafkaTemplate.executeInTransaction(operations -> {
operations.send("ai-tasks", task.getTaskId(), task);
return null;
});
// 3. DB事务提交
// 如果这里失败,DB回滚,但Kafka消息已经发出(可能重复)
// 这就是为什么消费端必须实现幂等性
}
/**
* 更健壮的方案:发件箱模式(Outbox Pattern)
* 先只写数据库(包含outbox表),
* 由专门的relay服务从outbox表读取并发到Kafka
* 保证DB和Kafka的最终一致性
*/
@Transactional
public void createTaskWithOutbox(CreateTaskRequest request) {
AITask task = AITask.builder()
.taskId(UUID.randomUUID().toString())
.taskType(request.getTaskType())
.inputContent(request.getContent())
.userId(request.getUserId())
.createdAt(System.currentTimeMillis())
.build();
taskRepository.save(task);
// 写入outbox表(同一DB事务)
OutboxEvent event = OutboxEvent.builder()
.eventId(UUID.randomUUID().toString())
.aggregateType("AITask")
.aggregateId(task.getTaskId())
.eventType("TaskCreated")
.payload(JsonUtils.toJson(task))
.createdAt(System.currentTimeMillis())
.status("PENDING")
.build();
outboxRepository.save(event);
// DB事务提交后,relay服务会读取outbox表并发到Kafka
// 这样确保了:要么两个操作都发生,要么都不发生
}
}发件箱模式的Relay服务
@Service
@Slf4j
public class OutboxRelayService {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Scheduled(fixedDelay = 1000) // 每秒轮询一次
@Transactional
public void relay() {
// 获取一批待发送的事件
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAt("PENDING", PageRequest.of(0, 100));
if (pendingEvents.isEmpty()) return;
for (OutboxEvent event : pendingEvents) {
try {
// 发到Kafka
kafkaTemplate.send(
topicForEventType(event.getEventType()),
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS); // 等待确认
// 标记为已发送
event.setStatus("SENT");
event.setSentAt(System.currentTimeMillis());
outboxRepository.save(event);
} catch (Exception e) {
log.error("Failed to relay outbox event: {}", event.getEventId(), e);
event.setRetryCount(event.getRetryCount() + 1);
outboxRepository.save(event);
// 重试次数过多,标记为死信
if (event.getRetryCount() >= 10) {
event.setStatus("DEAD");
outboxRepository.save(event);
}
}
}
}
private String topicForEventType(String eventType) {
return switch (eventType) {
case "TaskCreated" -> "ai-tasks";
case "TaskCompleted" -> "ai-task-results";
default -> "ai-events";
};
}
}一个完整的流程图
这套方案的完整性在于:每个步骤都有对应的幂等/原子性保障,即使在任何一个步骤失败重试,整体结果也是正确的。
在我们的AI文档处理系统中,实施这套方案后,重复处理率从原来的约2%(每天约几百次)降到了0.01%以下(主要是极端并发场景),LLM重复调用带来的额外费用基本消除。更重要的是,用户看到的处理结果是一致的、可预期的,投诉清零。
