第1671篇:Agent工具调用的幂等性设计——防止重复执行与副作用控制
第1671篇:Agent工具调用的幂等性设计——防止重复执行与副作用控制
上周有个同事找我排查一个线上问题:他们的订单Agent在网络抖动之后给同一个用户发了三封确认邮件,扣了两次库存。这不是Bug,代码逻辑完全正确,问题出在设计层面——工具调用没有做幂等性保障。
这个问题比想象中普遍。很多团队在做Agent的时候,精力都放在Prompt工程、上下文管理、多轮对话上,结果忽略了最基础的工程问题:当Agent反复调用同一个工具,或者在重试场景下重复执行,系统会发生什么?
今天我们从零开始把这件事讲清楚。
为什么Agent工具调用特别容易出幂等问题
传统的HTTP接口,我们靠客户端控制重试,靠服务端做幂等检查,两边都有机会介入。但Agent场景有它特殊的地方。
第一,Agent的执行是非确定性的。LLM在推理过程中可能因为上下文截断、温度参数等原因,生成重复的工具调用请求。你以为Agent只会调用一次send_email,但它在某次推理里可能连续生成了两条工具调用指令。
第二,Agent框架的重试机制是自动的。当工具调用超时或返回错误,大多数Agent框架会自动重试,而这个重试对工具本身是透明的——工具不知道自己是在处理一个新请求还是一个重复请求。
第三,Agent状态恢复场景。当一个长任务的Agent崩溃重启后,它可能从某个检查点继续执行,这时候之前已经成功的工具调用可能会被再次触发。
来看一个最简单的例子,感受一下问题所在:
// 这是一个"危险"的工具实现
@Tool(name = "create_order", description = "创建订单")
public OrderResult createOrder(String userId, String productId, int quantity) {
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setQuantity(quantity);
order.setStatus("PENDING");
// 直接插入数据库,没有任何幂等保护
orderRepository.save(order);
// 扣减库存
inventoryService.deduct(productId, quantity);
// 发送确认邮件
emailService.sendConfirmation(userId, order.getId());
return OrderResult.success(order.getId());
}这段代码在正常流程下没问题,但只要Agent重试一次,就会创建两个订单、扣两次库存、发两封邮件。
幂等性的本质:相同输入,多次执行,结果一致
先把概念说清楚。幂等性(Idempotency)的数学定义是:f(f(x)) = f(x)。放到工程里说就是:对同一个操作请求执行一次和执行多次,系统最终状态是一样的。
注意这里有个细节:幂等不等于无副作用。幂等操作可以有副作用,但多次执行的副作用效果等同于一次执行。
HTTP方法的幂等性是个很好的类比:
- GET:幂等,查询不改变状态
- PUT:幂等,设置资源为某个值,重复设置结果相同
- DELETE:幂等,删除已经不存在的资源和删除存在的资源,最终都是"不存在"
- POST:非幂等,每次创建一个新资源
在Agent工具调用里,我们要把所有写操作都设计成幂等的,或者至少让Agent框架感知到哪些操作是非幂等的,从而避免重复执行。
方案一:幂等Token机制
这是业界最成熟的方案,支付宝、微信支付都用这个。核心思路是:调用方在每次请求时带上一个唯一的幂等Token,服务端根据这个Token来判断是否是重复请求。
// 幂等Token存储接口
public interface IdempotencyStore {
/**
* 尝试占用幂等Token
* @return true表示首次执行,false表示重复请求
*/
boolean tryAcquire(String token, Duration ttl);
/**
* 获取已有的执行结果
*/
Optional<Object> getResult(String token);
/**
* 保存执行结果
*/
void saveResult(String token, Object result);
}
// 基于Redis的实现
@Component
public class RedisIdempotencyStore implements IdempotencyStore {
private final RedisTemplate<String, Object> redisTemplate;
private static final String KEY_PREFIX = "idempotency:";
@Override
public boolean tryAcquire(String token, Duration ttl) {
String key = KEY_PREFIX + token + ":lock";
// 用SET NX保证原子性
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, "PROCESSING", ttl);
return Boolean.TRUE.equals(result);
}
@Override
public Optional<Object> getResult(String token) {
String key = KEY_PREFIX + token + ":result";
Object result = redisTemplate.opsForValue().get(key);
return Optional.ofNullable(result);
}
@Override
public void saveResult(String token, Object result) {
String key = KEY_PREFIX + token + ":result";
redisTemplate.opsForValue().set(key, result, Duration.ofHours(24));
}
}然后用AOP把幂等检查织入到所有工具调用中:
@Aspect
@Component
public class IdempotencyAspect {
@Autowired
private IdempotencyStore idempotencyStore;
@Around("@annotation(idempotent)")
public Object handleIdempotency(ProceedingJoinPoint pjp,
Idempotent idempotent) throws Throwable {
// 从请求上下文中获取幂等Token
String token = IdempotencyContext.getCurrentToken();
if (token == null || token.isEmpty()) {
// 没有Token,直接执行(适用于非关键路径)
return pjp.proceed();
}
// 先检查是否有已完成的结果
Optional<Object> cachedResult = idempotencyStore.getResult(token);
if (cachedResult.isPresent()) {
log.info("幂等命中,返回缓存结果, token={}", token);
return cachedResult.get();
}
// 尝试占用Token
boolean acquired = idempotencyStore.tryAcquire(token, Duration.ofMinutes(5));
if (!acquired) {
// Token被其他线程占用,说明正在处理中,等待一下再查结果
Thread.sleep(500);
Optional<Object> result = idempotencyStore.getResult(token);
return result.orElseThrow(() ->
new RetryableException("请求正在处理中,请稍后重试"));
}
// 执行实际逻辑
try {
Object result = pjp.proceed();
idempotencyStore.saveResult(token, result);
return result;
} catch (Exception e) {
// 执行失败,释放Token,允许重试
idempotencyStore.releaseToken(token);
throw e;
}
}
}在Agent框架层面,需要为每次工具调用生成并传递幂等Token:
@Service
public class AgentToolExecutor {
public ToolResult executeTool(String toolName, Map<String, Object> params,
String taskId, int retryCount) {
// 幂等Token = 任务ID + 工具名 + 参数哈希
// 这样同一个任务里相同参数的调用被视为同一个操作
String paramHash = DigestUtils.md5Hex(JSON.toJSONString(params));
String idempotencyToken = String.format("%s:%s:%s", taskId, toolName, paramHash);
// 设置到线程上下文
IdempotencyContext.setCurrentToken(idempotencyToken);
try {
return toolRegistry.invoke(toolName, params);
} finally {
IdempotencyContext.clear();
}
}
}方案二:数据库唯一约束兜底
幂等Token机制依赖Redis,有一定的中间件复杂度。对于一些简单场景,直接在数据库层面加唯一约束是更简单粗暴的方案。
// 订单表的建表SQL
CREATE TABLE orders (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
idempotency_key VARCHAR(64) NOT NULL UNIQUE, -- 核心:唯一键
user_id VARCHAR(32) NOT NULL,
product_id VARCHAR(32) NOT NULL,
quantity INT NOT NULL,
status VARCHAR(16) NOT NULL DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
// 在Java里处理唯一约束冲突
@Tool(name = "create_order", description = "创建订单")
public OrderResult createOrder(String userId, String productId,
int quantity, String idempotencyKey) {
try {
Order order = new Order();
order.setIdempotencyKey(idempotencyKey);
order.setUserId(userId);
order.setProductId(productId);
order.setQuantity(quantity);
orderRepository.save(order);
inventoryService.deduct(productId, quantity);
return OrderResult.success(order.getId());
} catch (DataIntegrityViolationException e) {
// 唯一键冲突,说明是重复请求,直接返回已有的订单
Order existing = orderRepository.findByIdempotencyKey(idempotencyKey)
.orElseThrow(() -> new RuntimeException("数据异常"));
log.info("重复订单请求,返回已有订单: {}", existing.getId());
return OrderResult.success(existing.getId());
}
}这个方案的优点是不需要额外的中间件,数据库保证了原子性。缺点是如果业务逻辑复杂(比如还要扣库存、发邮件),在主操作成功但后续操作失败的情况下,重试会因为唯一键冲突而直接返回"成功",但实际上库存没扣或邮件没发。
所以这个方案适合原子性强的单一操作,不适合多步骤的复合操作。
方案三:状态机驱动的幂等
对于复杂的多步操作,我更推荐用状态机来管理。每个操作有明确的状态转换,重复执行不会改变已到达的状态。
// 任务执行状态枚举
public enum TaskStatus {
PENDING, // 等待执行
RUNNING, // 执行中
COMPLETED, // 已完成
FAILED, // 执行失败
RETRYING // 等待重试
}
// 带状态机的工具调用记录
@Entity
@Table(name = "tool_execution_records")
public class ToolExecutionRecord {
@Id
private String executionId; // = taskId + toolName + paramHash
private TaskStatus status;
private String resultJson;
private int retryCount;
private LocalDateTime lastUpdatedAt;
// 状态转换方法
public boolean canTransitionTo(TaskStatus newStatus) {
return switch (this.status) {
case PENDING -> newStatus == TaskStatus.RUNNING;
case RUNNING -> newStatus == TaskStatus.COMPLETED
|| newStatus == TaskStatus.FAILED;
case FAILED -> newStatus == TaskStatus.RETRYING;
case RETRYING -> newStatus == TaskStatus.RUNNING;
case COMPLETED -> false; // 已完成的状态不可逆
};
}
}
@Service
public class StatefulToolExecutor {
@Transactional
public ToolResult executeWithStateMachine(String executionId,
Supplier<ToolResult> toolLogic) {
ToolExecutionRecord record = recordRepository
.findByIdWithLock(executionId) // 悲观锁
.orElseGet(() -> createNewRecord(executionId));
// 如果已经完成,直接返回结果
if (record.getStatus() == TaskStatus.COMPLETED) {
return JSON.parseObject(record.getResultJson(), ToolResult.class);
}
// 如果正在运行(可能是另一个线程在处理),等待
if (record.getStatus() == TaskStatus.RUNNING) {
throw new RetryableException("工具正在执行中,请稍后查询结果");
}
// 转换到RUNNING状态
if (!record.canTransitionTo(TaskStatus.RUNNING)) {
throw new IllegalStateException("状态转换非法: " + record.getStatus());
}
record.setStatus(TaskStatus.RUNNING);
recordRepository.save(record);
// 执行工具逻辑
try {
ToolResult result = toolLogic.get();
record.setStatus(TaskStatus.COMPLETED);
record.setResultJson(JSON.toJSONString(result));
recordRepository.save(record);
return result;
} catch (Exception e) {
record.setStatus(TaskStatus.FAILED);
record.setRetryCount(record.getRetryCount() + 1);
recordRepository.save(record);
throw e;
}
}
}用状态机的好处是整个执行过程是可追溯的,你可以查询任何一个工具调用的执行历史。这在生产环境排查问题的时候非常有用。
副作用控制:不是所有副作用都能幂等
聊完幂等,还要聊副作用控制。有些副作用从业务本身来说就不应该幂等——比如发短信通知用户。如果用户查询了3次订单状态,你不应该发3条短信告诉他。
这里有个分类思路,我在项目里总结出来的:
对于通知类操作,我一般用事件驱动而不是在工具里直接调用:
// 不推荐:工具直接发邮件(难以去重)
@Tool(name = "process_order")
public OrderResult processOrder(String orderId) {
orderService.process(orderId);
emailService.sendConfirmation(orderId); // 每次调用都发
return OrderResult.success();
}
// 推荐:工具只改状态,通过事件异步发送
@Tool(name = "process_order")
public OrderResult processOrder(String orderId) {
// 状态变更是幂等的:从PENDING变为PROCESSED
boolean changed = orderService.processIfPending(orderId);
if (changed) {
// 只有状态真正变更时才发布事件
eventPublisher.publish(new OrderProcessedEvent(orderId));
}
return OrderResult.success();
}
// 邮件发送由事件消费者处理,自带去重逻辑
@EventListener
public void onOrderProcessed(OrderProcessedEvent event) {
// 检查这个事件对应的邮件是否已发送
if (!emailTracker.isSent(event.getOrderId(), "CONFIRMATION")) {
emailService.sendConfirmation(event.getOrderId());
emailTracker.markSent(event.getOrderId(), "CONFIRMATION");
}
}Agent框架层面的全局幂等策略
前面说的都是单个工具的幂等处理。在Agent框架层面,还需要有全局的幂等策略,让整个Agent执行也具备幂等性。
@Service
public class IdempotentAgentRunner {
private final AgentExecutor agentExecutor;
private final AgentStateStore stateStore;
/**
* 幂等的Agent执行入口
* @param agentTaskId 任务ID,相同ID的请求视为同一任务
*/
public AgentResult runAgent(String agentTaskId, AgentRequest request) {
AgentState state = stateStore.findByTaskId(agentTaskId);
if (state == null) {
// 首次执行,创建任务状态
state = AgentState.newTask(agentTaskId, request);
stateStore.save(state);
}
switch (state.getStatus()) {
case COMPLETED -> {
// 任务已完成,直接返回结果
return state.getResult();
}
case RUNNING -> {
// 任务还在执行,返回进度信息
return AgentResult.inProgress(agentTaskId, state.getCurrentStep());
}
case FAILED -> {
if (state.canRetry()) {
// 任务失败但可以重试,从失败点继续
return resumeFromCheckpoint(state);
} else {
return AgentResult.failed(state.getError());
}
}
default -> {
// PENDING状态,开始执行
return executeAgent(state);
}
}
}
private AgentResult executeAgent(AgentState state) {
state.setStatus(AgentStatus.RUNNING);
stateStore.save(state);
try {
// 执行Agent,每一步都记录检查点
AgentResult result = agentExecutor.execute(state,
step -> stateStore.saveCheckpoint(state.getTaskId(), step));
state.setStatus(AgentStatus.COMPLETED);
state.setResult(result);
stateStore.save(state);
return result;
} catch (Exception e) {
state.setStatus(AgentStatus.FAILED);
state.setError(e.getMessage());
stateStore.save(state);
throw e;
}
}
}踩过的坑
讲几个我们团队真实踩过的坑。
坑1:幂等Token的粒度设计错误。
我们最开始用taskId + toolName作为幂等Token,结果同一个任务里对同一个工具用不同参数调用,都被当成重复请求处理了。后来加入了参数哈希,才解决这个问题。但参数哈希也有坑——参数里包含时间戳的话,每次调用的哈希值都不同,幂等完全失效。所以参数哈希时要排除时间相关字段。
坑2:Redis挂了导致幂等检查失败,业务全挂。
我们有个接口把幂等检查放在了请求链路的关键路径上,Redis一挂,所有请求都进不来。后来改成了降级策略:Redis不可用时,幂等检查直接跳过,允许执行,但同时告警并限流。
坑3:幂等结果缓存太久,导致"幂等结果过期后重复执行"。
幂等结果的缓存时间需要大于业务重试的时间窗口。我们有个场景,幂等Token缓存了1小时,但业务上有可能在2小时内重试,结果缓存过期后第二次执行还是重复了。
坑4:不同节点的时钟偏差。
分布式环境下,各节点时钟可能有偏差,如果用时间戳来生成幂等Token,需要考虑时钟偏差的影响。我们最后改用Snowflake ID + 业务参数哈希来生成Token。
测试幂等性的方法
最后说说测试。幂等性不是那种写完代码就能靠肉眼确认的特性,必须要有专门的测试。
@SpringBootTest
class OrderToolIdempotencyTest {
@Autowired
private OrderTool orderTool;
@Test
void testCreateOrderIdempotency() {
String idempotencyKey = UUID.randomUUID().toString();
// 第一次调用
OrderResult result1 = orderTool.createOrder("user123", "product456",
2, idempotencyKey);
// 第二次调用(模拟重试)
OrderResult result2 = orderTool.createOrder("user123", "product456",
2, idempotencyKey);
// 两次调用应该返回相同的订单ID
assertEquals(result1.getOrderId(), result2.getOrderId());
// 数据库里只有一条订单记录
List<Order> orders = orderRepository.findByUserId("user123");
assertEquals(1, orders.size());
// 库存只扣减了一次
int remainingInventory = inventoryRepository.findByProductId("product456")
.getStock();
assertEquals(initialStock - 2, remainingInventory);
}
@Test
void testConcurrentCreateOrderIdempotency() throws InterruptedException {
String idempotencyKey = UUID.randomUUID().toString();
int threadCount = 10;
CountDownLatch latch = new CountDownLatch(threadCount);
List<OrderResult> results = new CopyOnWriteArrayList<>();
// 10个线程同时发起相同的创建订单请求
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
OrderResult result = orderTool.createOrder(
"user123", "product456", 2, idempotencyKey);
results.add(result);
} finally {
latch.countDown();
}
});
}
latch.await(10, TimeUnit.SECONDS);
// 所有线程返回的订单ID应该相同
Set<String> orderIds = results.stream()
.map(OrderResult::getOrderId)
.collect(Collectors.toSet());
assertEquals(1, orderIds.size());
}
}幂等性是一个"不做不出问题,出了问题很难查"的典型工程问题。在Agent系统里,由于LLM的非确定性和框架的自动重试,这个问题被放大了好几倍。
我的建议是:在设计任何写操作工具的时候,第一个问题不是"怎么实现这个功能",而是"这个工具被重复调用10次,系统会怎样"。想清楚这个问题,幂等方案自然就出来了。
