第1926篇:事件驱动AI的幂等性保障——防止重复处理带来的副作用
第1926篇:事件驱动AI的幂等性保障——防止重复处理带来的副作用
幂等性是分布式系统的老话题,但在AI场景里,它带来的副作用比传统场景严重得多。
传统服务里,重复处理一个订单支付,可能导致重复扣款——这是严重Bug,但通常能通过对账发现并回滚。
AI服务里,重复处理一个"给用户推荐内容"的任务,可能导致:
- 用户同一条内容被推荐两次(体验很差)
- AI生成了两份略有不同的结果,被合并成了奇怪的输出
- 训练数据里多了一条重复样本,污染了模型
更麻烦的是,AI的输出是非确定性的——同样的输入,两次推理的结果可能不完全一样(因为temperature>0)。这意味着,AI任务的重复处理往往不是"结果一样所以没影响",而是"结果不一样带来了意外副作用"。
今天这篇,聊聊如何在事件驱动的AI系统里做幂等性保障。
重复处理的来源
先梳理一下重复消费/重复处理的来源,不同来源需要不同的处理方式:
这些重复来源,有些是"设计内的重复"(比如at-least-once的Kafka),有些是"Bug引起的重复"(上游系统Bug)。不管哪种,我们都需要幂等性来保护系统。
幂等性的两种级别
在设计AI系统的幂等性时,需要区分两种不同的要求:
第一种:结果幂等(Outcome Idempotent)
多次处理同一个任务,最终状态只改变一次。比如"给文章打质量分",无论这个任务被处理多少次,数据库里这篇文章只有一个质量分,不会有多个重复记录。
第二种:输出幂等(Output Idempotent)
多次处理同一个任务,每次得到的输出完全一致。这在非确定性AI中很难做到(temperature>0时),需要特殊处理。
大多数场景我们追求"结果幂等"就够了,"输出幂等"只在特定场景(比如需要确定性输出的合同生成)才需要。
幂等性Key的设计
幂等性的核心是:给每个任务一个唯一的Key,处理前检查这个Key是否已经处理过。
Key的设计很关键,要能唯一标识一个"业务操作":
@Component
public class IdempotencyKeyGenerator {
/**
* 为不同类型的AI任务生成幂等Key
*/
public String generate(AITaskType taskType, Map<String, String> params) {
return switch (taskType) {
// 文章质量评分:文章ID + 版本号
// 同一篇文章的不同版本是不同的任务
case ARTICLE_QUALITY_SCORE ->
"qs:" + params.get("articleId") + ":" + params.get("version");
// 关键词提取:文章ID + 版本号 + 语言
case KEYWORD_EXTRACTION ->
"kw:" + params.get("articleId") + ":" +
params.get("version") + ":" + params.get("language");
// 用户内容推荐:用户ID + 推荐场景 + 时间窗口
// 同一个用户在同一个时间窗口内的推荐任务,只处理一次
case USER_RECOMMENDATION ->
"rec:" + params.get("userId") + ":" +
params.get("scenario") + ":" + getCurrentTimeWindow();
// 消息Kafka偏移量作为Key(完全基于消息本身)
case GENERIC ->
params.getOrDefault("messageId", UUID.randomUUID().toString());
default -> throw new IllegalArgumentException("未知任务类型: " + taskType);
};
}
private String getCurrentTimeWindow() {
// 以小时为窗口,同一小时内同一用户的推荐只做一次
long hourWindow = System.currentTimeMillis() / (1000 * 3600);
return String.valueOf(hourWindow);
}
}Key设计的原则:
- 包含足够的业务信息,不同语义的操作Key不能相同
- 不要用随机UUID(每次生成不同),要基于业务数据生成
- 对于有时效性的任务(如推荐),在Key里加时间窗口
幂等性检查器:Redis实现
@Component
public class IdempotencyChecker {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String PREFIX = "ai:idempotency:";
private static final String STATUS_PROCESSING = "PROCESSING";
private static final String STATUS_COMPLETED = "COMPLETED";
private static final String STATUS_FAILED = "FAILED";
/**
* 尝试标记任务为"处理中"
* 返回true表示可以处理(之前没有处理过)
* 返回false表示已经在处理或已完成,应该跳过
*/
public boolean tryMarkProcessing(String idempotencyKey, Duration ttl) {
String redisKey = PREFIX + idempotencyKey;
// 用SET NX(不存在才设置)保证原子性
Boolean success = redisTemplate.opsForValue().setIfAbsent(
redisKey,
buildStatusJson(STATUS_PROCESSING, null),
ttl
);
return Boolean.TRUE.equals(success);
}
/**
* 标记任务完成,并保存结果
*/
public void markCompleted(String idempotencyKey, Object result, Duration ttl) {
String redisKey = PREFIX + idempotencyKey;
redisTemplate.opsForValue().set(
redisKey,
buildStatusJson(STATUS_COMPLETED, result),
ttl
);
}
/**
* 标记任务失败
*/
public void markFailed(String idempotencyKey, String errorMessage, Duration ttl) {
String redisKey = PREFIX + idempotencyKey;
redisTemplate.opsForValue().set(
redisKey,
buildStatusJson(STATUS_FAILED, errorMessage),
ttl
);
}
/**
* 查询任务状态
*/
public IdempotencyStatus getStatus(String idempotencyKey) {
String redisKey = PREFIX + idempotencyKey;
String value = redisTemplate.opsForValue().get(redisKey);
if (value == null) {
return IdempotencyStatus.NOT_FOUND;
}
return parseStatus(value);
}
/**
* 获取已完成任务的结果(用于返回重复请求)
*/
public <T> Optional<T> getCompletedResult(String idempotencyKey, Class<T> resultType) {
IdempotencyStatus status = getStatus(idempotencyKey);
if (status.getStatus().equals(STATUS_COMPLETED)) {
return Optional.ofNullable(status.getResult(resultType));
}
return Optional.empty();
}
private String buildStatusJson(String status, Object result) {
Map<String, Object> data = new HashMap<>();
data.put("status", status);
data.put("timestamp", System.currentTimeMillis());
if (result != null) {
data.put("result", result);
}
try {
return new ObjectMapper().writeValueAsString(data);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}在Consumer中集成幂等性检查
把幂等性检查织入到Consumer的处理流程中:
@Component
public class IdempotentAIConsumer {
@Autowired
private IdempotencyChecker idempotencyChecker;
@Autowired
private IdempotencyKeyGenerator keyGenerator;
@Autowired
private QualityScoreService qualityScoreService;
@KafkaListener(topics = "ai.task.quality-score", groupId = "qs-consumer")
public void consume(ConsumerRecord<String, String> record,
Acknowledgment ack) {
AITaskMessage message = deserialize(record.value());
// 生成幂等Key
String idempotencyKey = keyGenerator.generate(
AITaskType.ARTICLE_QUALITY_SCORE,
Map.of(
"articleId", message.getPayloadField("articleId"),
"version", message.getPayloadField("version")
)
);
// 检查是否已经处理过
IdempotencyStatus existingStatus = idempotencyChecker.getStatus(idempotencyKey);
if (existingStatus.isCompleted()) {
// 已经处理完成,直接用缓存结果
log.info("任务已处理,使用缓存结果: key={}", idempotencyKey);
QualityScoreResult cachedResult = existingStatus.getResult(QualityScoreResult.class);
// 可以选择直接用缓存结果,或者忽略
ack.acknowledge();
return;
}
if (existingStatus.isProcessing()) {
// 正在处理中,说明有另一个实例在处理这个任务
// 这种情况下怎么办?有两种策略:
// 1. 等待一段时间后重试(可能那个实例崩了)
// 2. 直接跳过(接受另一个实例的结果)
// 根据业务选择,这里选择策略2
log.warn("任务正在被其他实例处理中,跳过: key={}", idempotencyKey);
ack.acknowledge();
return;
}
// 任务未处理过,尝试抢占处理权
boolean acquired = idempotencyChecker.tryMarkProcessing(
idempotencyKey,
Duration.ofMinutes(5) // PROCESSING状态5分钟后过期,防止持有者崩溃后永远占用
);
if (!acquired) {
// 抢占失败(并发情况下另一个Consumer刚抢到了)
log.info("任务抢占失败,跳过: key={}", idempotencyKey);
ack.acknowledge();
return;
}
// 成功抢占,执行AI任务
try {
QualityScoreResult result = qualityScoreService.score(
message.getPayloadField("articleText")
);
// 标记完成,保存结果
idempotencyChecker.markCompleted(
idempotencyKey,
result,
Duration.ofHours(24)
);
// 保存到业务数据库
saveResult(message.getTaskId(), result);
ack.acknowledge();
log.info("质量评分完成: taskId={}, key={}", message.getTaskId(), idempotencyKey);
} catch (Exception e) {
// 处理失败,更新状态以便重试
idempotencyChecker.markFailed(
idempotencyKey,
e.getMessage(),
Duration.ofMinutes(30) // 失败状态30分钟后可以重试
);
// 不提交offset,让消息被重新处理
throw new RuntimeException("AI任务处理失败,触发重试", e);
}
}
}关键问题:PROCESSING状态的超时处理
上面代码里有一个关键设计:tryMarkProcessing时设置了5分钟的TTL。
这是为了处理"持有者崩溃"的情况:如果Consumer A成功标记了PROCESSING,然后崩溃了,那个Key会一直是PROCESSING状态,其他Consumer都不敢处理。设置TTL让这个状态在5分钟后自动失效,其他Consumer就可以重新抢占。
但这带来另一个问题:如果正常处理需要超过5分钟怎么办?解决方案是心跳刷新:
@Component
public class ProcessingHeartbeatKeeper {
@Autowired
private StringRedisTemplate redisTemplate;
private final Map<String, ScheduledFuture<?>> heartbeats = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
/**
* 开始对某个Key发送心跳(每隔1分钟刷新TTL)
*/
public void startHeartbeat(String idempotencyKey, Duration ttl) {
String redisKey = "ai:idempotency:" + idempotencyKey;
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
() -> {
// 每1分钟刷新TTL,防止PROCESSING状态过期
redisTemplate.expire(redisKey, ttl);
log.debug("心跳刷新: key={}", idempotencyKey);
},
ttl.toMinutes() / 2, // 在TTL一半时开始刷新
ttl.toMinutes() / 2,
TimeUnit.MINUTES
);
heartbeats.put(idempotencyKey, future);
}
/**
* 停止心跳
*/
public void stopHeartbeat(String idempotencyKey) {
ScheduledFuture<?> future = heartbeats.remove(idempotencyKey);
if (future != null) {
future.cancel(false);
}
}
}AI输出的幂等性:确定性输出的处理
对于需要确定性输出的AI任务,有一个专门的技术手段:锁定随机种子。
大多数大模型API不支持设置随机种子(因为底层采样是在GPU上完成的,不暴露seed参数)。但如果你用的是本地部署的模型(vLLM、Ollama等),通常支持:
@Component
public class DeterministicInferenceClient {
@Autowired
private VllmClient vllmClient;
/**
* 确定性推理:相同输入保证相同输出
* 通过锁定随机种子实现
*/
public String deterministicGenerate(String prompt, String idempotencyKey) {
// 用幂等Key的哈希值作为随机种子
// 保证同一个幂等Key对应的推理总是用同一个seed
int seed = Math.abs(idempotencyKey.hashCode());
VllmRequest request = VllmRequest.builder()
.prompt(prompt)
.temperature(0.0f) // temperature=0,完全贪心解码
.seed(seed) // 锁定随机种子
.maxTokens(512)
.build();
return vllmClient.generate(request).getText();
}
}当temperature=0时,输出是完全确定的(贪心解码)。这适合不需要创意性的任务,比如信息提取、分类、翻译。
对于需要多样性的创意类任务,接受"结果幂等"而非"输出幂等"——同一个任务只执行一次,第一次的结果被缓存和复用。
处理"部分成功"的AI任务
有些AI任务是由多个步骤组成的,可能出现"部分步骤成功"的情况。这时幂等性的处理更复杂:
@Component
public class PartialIdempotencyHandler {
@Autowired
private IdempotencyChecker checker;
/**
* 处理多步骤AI任务的幂等性
* 每个步骤独立记录幂等状态
*/
public MultiStepResult executeWithIdempotency(String taskId,
List<AIStep> steps) {
MultiStepResult result = new MultiStepResult(taskId);
for (AIStep step : steps) {
String stepKey = taskId + ":" + step.getStepId();
// 每个步骤独立检查幂等
Optional<Object> cached = checker.getCompletedResult(
stepKey, Object.class
);
if (cached.isPresent()) {
// 这步已经完成了,用缓存结果
result.addStepResult(step.getStepId(), cached.get());
log.info("步骤已完成,使用缓存: taskId={}, stepId={}",
taskId, step.getStepId());
continue;
}
// 执行这个步骤
try {
checker.tryMarkProcessing(stepKey, Duration.ofMinutes(10));
Object stepResult = step.execute();
checker.markCompleted(stepKey, stepResult, Duration.ofHours(24));
result.addStepResult(step.getStepId(), stepResult);
} catch (Exception e) {
checker.markFailed(stepKey, e.getMessage(), Duration.ofMinutes(30));
result.addStepError(step.getStepId(), e.getMessage());
// 根据步骤配置决定是否继续
if (step.isRequired()) {
throw new StepExecutionException("必需步骤失败", e);
}
}
}
return result;
}
}这样的设计里,如果任务在第3步失败了,重试时第1步和第2步的结果会直接从缓存取,不会重新执行。只有第3步会重新执行。
数据库层的幂等性保障
Redis幂等是内存级别的,Redis重启后状态会丢失(如果没有持久化)。对于高可靠性要求,需要数据库层面的幂等保障:
@Service
public class AITaskResultService {
@Autowired
private AITaskResultRepository repository;
/**
* 幂等地保存AI任务结果
* 使用数据库唯一约束保证幂等
*/
@Transactional
public AITaskResult saveResultIdempotently(String taskId,
String taskType,
String resultJson) {
// 先检查是否已存在
Optional<AITaskResult> existing = repository.findByTaskIdAndTaskType(
taskId, taskType
);
if (existing.isPresent()) {
log.info("任务结果已存在,返回已有结果: taskId={}, taskType={}", taskId, taskType);
return existing.get();
}
// 尝试插入(依赖数据库唯一约束)
try {
AITaskResult result = AITaskResult.builder()
.taskId(taskId)
.taskType(taskType)
.resultJson(resultJson)
.createdAt(Instant.now())
.build();
return repository.save(result);
} catch (DataIntegrityViolationException e) {
// 唯一约束冲突:并发情况下另一个线程刚刚插入了
// 读出它的结果返回
return repository.findByTaskIdAndTaskType(taskId, taskType)
.orElseThrow(() -> new IllegalStateException("幂等保存异常"));
}
}
}对应的数据库DDL:
CREATE TABLE ai_task_result (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL,
task_type VARCHAR(32) NOT NULL,
result_json TEXT,
created_at DATETIME NOT NULL,
UNIQUE KEY uk_task_id_task_type (task_id, task_type)
);唯一约束是数据库层面的最终保障。即使Redis挂了、代码里的幂等检查失效了,数据库的唯一约束也能防止重复插入。
监控:幂等性拦截率
上了幂等性检查之后,需要监控拦截了多少重复处理:
@Component
public class IdempotencyMetrics {
private final Counter totalChecks;
private final Counter duplicatesIntercepted;
private final Counter processingCollisions; // PROCESSING状态冲突
public IdempotencyMetrics(MeterRegistry registry) {
totalChecks = Counter.builder("ai.idempotency.checks.total")
.description("幂等性检查总次数")
.register(registry);
duplicatesIntercepted = Counter.builder("ai.idempotency.duplicates")
.description("拦截的重复请求数")
.register(registry);
processingCollisions = Counter.builder("ai.idempotency.collisions")
.description("PROCESSING状态并发冲突数")
.register(registry);
}
// 关键指标:重复率 = duplicatesIntercepted / totalChecks
// 正常情况下重复率应该很低(<5%)
// 如果重复率突然升高,说明上游有异常(比如重试风暴)
}一个容易忽视的问题:消息顺序与幂等性
在多分区Kafka场景里,同一个任务的两条消息可能被不同分区消费,处理顺序不保证。这给幂等性带来了额外挑战:
消息A: taskId=001, type=CREATE_TASK, publishedAt=10:00:00
消息B: taskId=001, type=CANCEL_TASK, publishedAt=10:00:05如果B比A先处理(乱序),我们先执行了CANCEL,然后又执行了CREATE,逻辑就乱了。
解决方案:在幂等性检查时加入版本号或时间戳,拒绝处理"过时"的消息:
public boolean shouldProcess(String taskId, String messageType, long publishedAt) {
String lastEventKey = "ai:task:last-event:" + taskId;
String lastEventJson = redisTemplate.opsForValue().get(lastEventKey);
if (lastEventJson != null) {
LastEvent lastEvent = parseLastEvent(lastEventJson);
if (publishedAt < lastEvent.getPublishedAt()) {
// 这是一条过时的消息,跳过
log.warn("跳过过时消息: taskId={}, type={}, publishedAt={} < lastEventAt={}",
taskId, messageType, publishedAt, lastEvent.getPublishedAt());
return false;
}
}
// 更新最后处理的事件时间戳
redisTemplate.opsForValue().set(
lastEventKey,
buildLastEventJson(messageType, publishedAt),
Duration.ofHours(24)
);
return true;
}总结
AI系统的幂等性设计,核心是这几个层次:
- 消息层:为每条消息生成唯一ID,Consumer处理前检查是否已处理
- 任务层:基于业务语义生成幂等Key,支持同一任务重复触发时的去重
- 结果层:数据库唯一约束,作为最终的兜底保障
- 输出层(可选):对于需要确定性的任务,通过temperature=0和固定seed实现
不要指望"消息不会重复",在分布式系统里这个假设迟早会被打破。幂等性设计不是锦上添花,是系统可靠性的基础。
