AI 应用的消息幂等设计——同样的问题发了两次,不要回答两次
AI 应用的消息幂等设计——同样的问题发了两次,不要回答两次
上周有个用户来反馈说,他问了我们的 AI 助手一个问题,提交的时候网络卡了一下,结果同一个问题被提交了两次,AI 回答了两遍,还是两个不一样的答案。他问:"这到底哪个是对的?"
这个问题我听了直皱眉——不是因为用户刁难,而是这个情况说明我们的幂等设计做得不够好。
幂等性问题在 AI 应用里比普通 Web 应用更复杂,有几个 AI 特有的维度:
一、AI 的非确定性:同一个 prompt,两次调用几乎肯定得到不一样的答案(除非 temperature=0)。所以"幂等"在 AI 场景下不是"返回完全一样的内容",而是"对于同一个用户的同一次操作意图,只调用一次 AI"。
二、成本问题:每次 AI 调用都要花钱,重复调用不只是体验问题,还是成本问题。
三、一致性问题:如果同一个对话消息被处理了两次,会话历史里就有两条,后续对话的上下文就乱了。
今天这篇文章,从幂等场景分析到 Redis 实现,把 AI 请求幂等控制讲透。
AI 应用的幂等场景
先把需要处理幂等的场景梳理清楚:
场景一:网络重试
最常见的场景。用户点击"发送",请求到达服务器,服务器开始处理,但响应在回来的路上超时了。前端没收到响应,自动或手动重试,发出了第二个相同的请求。
此时服务器可能已经在处理(甚至已经处理完)第一个请求了。
场景二:前端重复提交
用户网络慢,点了一次"发送"没反应,又点了一次。虽然是两次点击,但用户的意图是一次——发送这条消息。
场景三:消息队列 at-least-once
如果你的 AI 请求是通过消息队列(Kafka、RocketMQ)异步处理的,at-least-once 语义意味着同一条消息可能被消费多次。
对于"发邮件"这类操作,重复消费很好处理——检查邮件是否已发即可。但对于"调用 AI"这类操作,你需要在消费前先判断:这条消息对应的 AI 调用是否已经完成了。
场景四:服务重启或超时
AI 调用通常需要 5-30 秒,在这个时间窗口内,如果 AI Gateway 重启了,正在处理的请求丢失,客户端重试时,新实例不知道这个请求之前已经处理过一半了。
幂等 Key 的设计
幂等 Key 的核心是:唯一标识一次用户操作意图。
错误的设计
// 错误:用请求内容的 Hash 做幂等 Key
String idempotencyKey = DigestUtils.md5Hex(prompt);这个设计的问题是:同一个用户先后问了两次相同的问题(比如第一次没看清楚答案,想再问一次),这两次是不同的操作意图,但 Key 相同,第二次会被拦截,用户得到的是第一次的缓存结果。
正确的设计
幂等 Key 应该包含三个维度:
- 用户标识:谁发的请求
- 会话标识:在哪个会话里
- 操作标识:这次操作的唯一性(由客户端生成)
// 正确:客户端生成唯一 ID,服务端以此为幂等 Key
String idempotencyKey = clientGeneratedRequestId; // UUID 或 ULID
// 幂等 Key 的命名空间设计
String redisKey = "idempotent:ai:" + userId + ":" + sessionId + ":" + requestId;客户端生成 ID 的规范:
// 前端:每次用户点击"发送"时生成新的 requestId
function sendMessage(content) {
const requestId = generateULID(); // 或者 crypto.randomUUID()
// requestId 随请求一起发送
fetch('/api/chat', {
method: 'POST',
headers: {
'X-Request-Id': requestId,
'Content-Type': 'application/json'
},
body: JSON.stringify({ content, sessionId })
});
}注意:requestId 应该在用户每次点击"发送"时生成,不是每次重试时生成。如果网络超时后重试,要复用同一个 requestId。
代码:基于 Redis 的 AI 请求幂等控制
幂等状态定义
public enum IdempotentStatus {
PROCESSING, // 处理中(第一次请求已到达,正在处理)
COMPLETED, // 已完成(有缓存结果)
FAILED // 已失败(可以重新提交)
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class IdempotentRecord {
private String requestId;
private IdempotentStatus status;
private String result; // 完成时的结果
private String errorMessage; // 失败时的错误信息
private long createdAt;
private long completedAt;
private int inputTokens;
private int outputTokens;
}幂等控制核心实现
@Component
public class AIIdempotencyController {
private static final Logger log = LoggerFactory.getLogger(AIIdempotencyController.class);
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 幂等记录的 TTL(24小时,给足够长的时间处理重试)
private static final Duration IDEMPOTENT_TTL = Duration.ofHours(24);
// 处理超时时间(如果超过这个时间还在 PROCESSING 状态,认为处理失败了)
private static final Duration PROCESSING_TIMEOUT = Duration.ofMinutes(3);
private static final String KEY_PREFIX = "idempotent:ai:";
/**
* 尝试占用幂等 Key(原子操作)
* 返回:
* - null:成功占用,可以开始处理
* - IdempotentRecord(PROCESSING状态):另一个实例正在处理,调用方等待
* - IdempotentRecord(COMPLETED状态):已有结果,直接返回
*/
public IdempotentRecord tryAcquire(String requestId, String userId, String sessionId) {
String key = buildKey(requestId, userId, sessionId);
// 使用 SET NX EX 原子地检查并设置幂等标记
IdempotentRecord processingRecord = IdempotentRecord.builder()
.requestId(requestId)
.status(IdempotentStatus.PROCESSING)
.createdAt(System.currentTimeMillis())
.build();
// Lua 脚本:检查 key 是否存在,不存在则设置为 PROCESSING
String luaScript = """
local existing = redis.call('GET', KEYS[1])
if existing then
return existing
end
local value = ARGV[1]
redis.call('SET', KEYS[1], value, 'EX', ARGV[2])
return nil
""";
String existingJson = (String) redisTemplate.execute(
new DefaultRedisScript<>(luaScript, String.class),
List.of(key),
serialize(processingRecord),
String.valueOf(IDEMPOTENT_TTL.getSeconds())
);
if (existingJson == null) {
// 成功占用,返回 null 表示"你来处理"
return null;
}
// Key 已存在,反序列化返回现有状态
IdempotentRecord existing = deserialize(existingJson);
// 检查 PROCESSING 状态是否超时(防止处理节点崩溃后永久卡死)
if (existing.getStatus() == IdempotentStatus.PROCESSING) {
long processingDurationMs = System.currentTimeMillis() - existing.getCreatedAt();
if (processingDurationMs > PROCESSING_TIMEOUT.toMillis()) {
log.warn("幂等Key处理超时,允许重新处理: key={}, duration={}ms",
key, processingDurationMs);
// 删除超时的 PROCESSING 状态,允许重新占用
redisTemplate.delete(key);
return null; // 让调用方重新处理
}
}
return existing;
}
/**
* 标记处理完成,存储结果
*/
public void markCompleted(String requestId, String userId, String sessionId,
String result, int inputTokens, int outputTokens) {
String key = buildKey(requestId, userId, sessionId);
IdempotentRecord completedRecord = IdempotentRecord.builder()
.requestId(requestId)
.status(IdempotentStatus.COMPLETED)
.result(result)
.createdAt(getCreatedAt(key))
.completedAt(System.currentTimeMillis())
.inputTokens(inputTokens)
.outputTokens(outputTokens)
.build();
redisTemplate.opsForValue().set(key, serialize(completedRecord), IDEMPOTENT_TTL);
log.info("AI请求幂等记录完成: requestId={}, tokens={}", requestId,
inputTokens + outputTokens);
}
/**
* 标记处理失败
*/
public void markFailed(String requestId, String userId, String sessionId,
String errorMessage) {
String key = buildKey(requestId, userId, sessionId);
IdempotentRecord failedRecord = IdempotentRecord.builder()
.requestId(requestId)
.status(IdempotentStatus.FAILED)
.errorMessage(errorMessage)
.createdAt(getCreatedAt(key))
.completedAt(System.currentTimeMillis())
.build();
// 失败状态保留较短时间,允许用户重新提交
redisTemplate.opsForValue().set(key, serialize(failedRecord), Duration.ofHours(1));
}
private String buildKey(String requestId, String userId, String sessionId) {
return KEY_PREFIX + userId + ":" + sessionId + ":" + requestId;
}
private long getCreatedAt(String key) {
String existing = (String) redisTemplate.opsForValue().get(key);
if (existing != null) {
IdempotentRecord record = deserialize(existing);
return record.getCreatedAt();
}
return System.currentTimeMillis();
}
private String serialize(IdempotentRecord record) {
// 使用 Jackson 序列化
try {
return new com.fasterxml.jackson.databind.ObjectMapper()
.writeValueAsString(record);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private IdempotentRecord deserialize(String json) {
try {
return new com.fasterxml.jackson.databind.ObjectMapper()
.readValue(json, IdempotentRecord.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}集成到 AI 服务层
@Service
public class IdempotentAIService {
private static final Logger log = LoggerFactory.getLogger(IdempotentAIService.class);
@Autowired
private ChatModel chatModel;
@Autowired
private AIIdempotencyController idempotencyController;
/**
* 带幂等控制的 AI 调用
*/
public AIResponse callWithIdempotency(String requestId, String userId,
String sessionId, String prompt) {
// 1. 尝试获取幂等锁
IdempotentRecord existing = idempotencyController.tryAcquire(
requestId, userId, sessionId);
// 2. 根据现有状态决策
if (existing != null) {
return switch (existing.getStatus()) {
case COMPLETED -> {
log.info("幂等命中,返回缓存结果: requestId={}", requestId);
yield AIResponse.builder()
.content(existing.getResult())
.fromCache(true)
.requestId(requestId)
.build();
}
case PROCESSING -> {
// 另一个实例正在处理,告知客户端等待
log.info("请求正在处理中: requestId={}", requestId);
yield AIResponse.builder()
.status("PROCESSING")
.requestId(requestId)
.message("请求正在处理中,请稍后查询结果")
.build();
}
case FAILED -> {
// 失败状态,允许重新处理
log.info("重新处理失败的请求: requestId={}", requestId);
yield processNewRequest(requestId, userId, sessionId, prompt);
}
};
}
// 3. 新请求,正常处理
return processNewRequest(requestId, userId, sessionId, prompt);
}
private AIResponse processNewRequest(String requestId, String userId,
String sessionId, String prompt) {
try {
// 实际调用 AI
ChatResponse chatResponse = chatModel.call(new Prompt(prompt));
String content = chatResponse.getResult().getOutput().getContent();
// 估算 token 数(实际项目里应该从响应元数据里取)
int inputTokens = prompt.length() / 3;
int outputTokens = content.length() / 3;
// 标记完成
idempotencyController.markCompleted(
requestId, userId, sessionId, content, inputTokens, outputTokens);
return AIResponse.builder()
.content(content)
.fromCache(false)
.requestId(requestId)
.build();
} catch (Exception e) {
log.error("AI调用失败: requestId={}, error={}", requestId, e.getMessage());
idempotencyController.markFailed(requestId, userId, sessionId, e.getMessage());
throw e;
}
}
}Controller 层集成
@RestController
@RequestMapping("/api/chat")
public class ChatController {
@Autowired
private IdempotentAIService aiService;
@PostMapping
public ResponseEntity<AIResponse> chat(
@RequestHeader("X-Request-Id") String requestId,
@RequestHeader("X-User-Id") String userId,
@RequestBody ChatRequest request) {
// requestId 必传,客户端负责生成
if (requestId == null || requestId.isBlank()) {
return ResponseEntity.badRequest()
.body(AIResponse.builder()
.message("X-Request-Id 头部不能为空")
.build());
}
AIResponse response = aiService.callWithIdempotency(
requestId,
userId,
request.getSessionId(),
request.getContent()
);
// 如果是处理中状态,返回 202 Accepted
if ("PROCESSING".equals(response.getStatus())) {
return ResponseEntity.accepted().body(response);
}
// 幂等命中,返回 200 但带上缓存标记
HttpHeaders headers = new HttpHeaders();
if (response.isFromCache()) {
headers.set("X-Idempotent-Replayed", "true");
}
return ResponseEntity.ok().headers(headers).body(response);
}
/**
* 查询请求处理状态(用于 PROCESSING 场景)
*/
@GetMapping("/status/{requestId}")
public ResponseEntity<AIResponse> getStatus(
@PathVariable String requestId,
@RequestHeader("X-User-Id") String userId,
@RequestParam String sessionId) {
// 简化:实际应该根据 requestId 查 Redis
return ResponseEntity.ok(AIResponse.builder()
.requestId(requestId)
.status("COMPLETED")
.build());
}
}消息队列场景的幂等处理
@Component
public class AITaskConsumer {
private static final Logger log = LoggerFactory.getLogger(AITaskConsumer.class);
@Autowired
private IdempotentAIService aiService;
@KafkaListener(topics = "ai-tasks", groupId = "ai-worker")
public void consume(ConsumerRecord<String, String> record) {
AITaskMessage task = deserialize(record.value());
try {
// 使用消息的业务 ID 作为幂等 Key
// 不用 Kafka 的 offset,因为重试时 offset 不同但业务 ID 相同
AIResponse response = aiService.callWithIdempotency(
task.getBusinessRequestId(), // 业务层生成的唯一 ID
task.getUserId(),
task.getSessionId(),
task.getPrompt()
);
if (response.isFromCache()) {
log.info("MQ消息幂等命中,跳过AI调用: businessId={}",
task.getBusinessRequestId());
}
// 处理结果(推送到用户、更新数据库等)
handleResult(task, response);
} catch (Exception e) {
log.error("AI任务处理失败: businessId={}", task.getBusinessRequestId(), e);
// 不 ack,让 Kafka 重试;幂等控制会保证重试时不重复调用 AI
throw e;
}
}
private AITaskMessage deserialize(String json) {
// 反序列化逻辑
return new AITaskMessage();
}
private void handleResult(AITaskMessage task, AIResponse response) {
// 处理 AI 响应结果
}
}幂等控制流程图
总结
AI 应用的幂等设计有三个核心要点:
幂等 Key 由客户端生成,包含用户+会话+操作三个维度,用 UUID/ULID,网络重试时复用同一个 Key。
PROCESSING 状态要有超时兜底,防止处理节点崩溃后请求永久卡死。超时后允许重新处理。
FAILED 状态允许重新提交,用户主动重试时应该能重新触发 AI 调用,不能因为上次失败就永久拦截。
幂等控制不只是工程健壮性问题,在 AI 场景下更是成本控制手段——每次被幂等拦截的重复请求,都是省下来的一笔 Token 费用。
