第1934篇:工具调用的超时与重试——生产环境下的弹性工具执行框架
第1934篇:工具调用的超时与重试——生产环境下的弹性工具执行框架
前阵子有个同学在星球里问我:他们的AI客服上线一个月了,白天一切正常,但每天凌晨两三点总会有一波用户反馈说AI没有反应。他们看了看日志,发现是工具调用超时——凌晨是某个数据库的维护窗口,查询响应会变慢。
这个问题很典型。大多数团队在开发阶段只考虑工具"能用",到生产环境才发现"稳定用"是另一个层次的问题。网络抖动、下游服务维护、资源争用……工具调用失败不是偶发事件,是必然会发生的事情。
弹性工具执行框架,就是为了让你的Agent在这种必然会出现的失败面前,能够优雅地活下去。
超时的本质问题
先说清楚一件事:超时设置的本质是资源保护,不是功能开关。
如果工具调用没有超时设置,会发生什么?一个慢查询可以把整个线程池耗尽。在Agent场景下,一个用户的工具调用卡住,可能让整个服务的所有并发请求都在等待,直到OOM。
// 错误示范:没有超时的工具调用
public String callExternalApi(String params) {
// 这个调用可能永远不返回
return httpClient.get("https://api.example.com/query?params=" + params);
}这在传统API开发里就是问题,在AI Agent里更致命,因为一次Agent执行可能触发十几次工具调用,每个都可能卡住。
超时设置需要分层考虑,不是设置一个全局超时就完了:
整个Agent执行超时(例如60秒)
└── 单次LLM调用超时(例如30秒)
└── 单次工具调用超时(例如10秒)
└── 工具内部的下游调用超时(例如5秒)每一层的超时都有意义,缺少任何一层都可能导致问题。
超时框架的设计
先看一个完整的超时控制实现:
@Component
public class TimedToolExecutor {
private final ScheduledExecutorService timeoutExecutor =
Executors.newScheduledThreadPool(5);
private final ExecutorService workerPool =
Executors.newFixedThreadPool(20);
/**
* 带超时的工具调用
* @param toolName 工具名称
* @param execution 工具执行逻辑
* @param timeoutMs 超时时间(毫秒)
*/
public <T> CompletableFuture<T> executeWithTimeout(
String toolName,
Callable<T> execution,
long timeoutMs) {
CompletableFuture<T> future = new CompletableFuture<>();
// 提交实际执行任务
Future<?> workFuture = workerPool.submit(() -> {
try {
T result = execution.call();
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
// 设置超时取消任务
timeoutExecutor.schedule(() -> {
if (!future.isDone()) {
workFuture.cancel(true);
future.completeExceptionally(
new ToolTimeoutException(
String.format("工具%s执行超时(%dms)", toolName, timeoutMs)
)
);
}
}, timeoutMs, TimeUnit.MILLISECONDS);
return future;
}
/**
* 带超时和fallback的工具调用
*/
public <T> T executeWithTimeoutAndFallback(
String toolName,
Callable<T> execution,
long timeoutMs,
Supplier<T> fallback) {
try {
return executeWithTimeout(toolName, execution, timeoutMs)
.get(timeoutMs + 100, TimeUnit.MILLISECONDS);
} catch (TimeoutException | ToolTimeoutException e) {
log.warn("工具{}超时,使用fallback", toolName);
return fallback.get();
} catch (Exception e) {
log.error("工具{}执行异常,使用fallback", toolName, e);
return fallback.get();
}
}
}重试策略的类型与选择
不同的工具调用失败场景,需要不同的重试策略。把常见的几种策略都实现出来,按需选择:
public interface RetryStrategy {
/**
* 判断是否应该重试
* @param attempt 当前是第几次尝试(从1开始)
* @param exception 上次失败的异常
* @return 是否重试,如果是,下次重试前等待多少毫秒
*/
OptionalLong shouldRetry(int attempt, Exception exception);
}
// 固定间隔重试
public class FixedIntervalRetry implements RetryStrategy {
private final int maxAttempts;
private final long intervalMs;
private final Set<Class<? extends Exception>> retryableExceptions;
@Override
public OptionalLong shouldRetry(int attempt, Exception exception) {
if (attempt >= maxAttempts) {
return OptionalLong.empty();
}
if (!isRetryable(exception)) {
return OptionalLong.empty();
}
return OptionalLong.of(intervalMs);
}
private boolean isRetryable(Exception e) {
return retryableExceptions.stream()
.anyMatch(cls -> cls.isInstance(e));
}
}
// 指数退避重试
public class ExponentialBackoffRetry implements RetryStrategy {
private final int maxAttempts;
private final long initialDelayMs;
private final double multiplier;
private final long maxDelayMs;
@Override
public OptionalLong shouldRetry(int attempt, Exception exception) {
if (attempt >= maxAttempts) return OptionalLong.empty();
if (!isRetryable(exception)) return OptionalLong.empty();
long delay = (long) (initialDelayMs * Math.pow(multiplier, attempt - 1));
delay = Math.min(delay, maxDelayMs);
return OptionalLong.of(delay);
}
}
// 带抖动的指数退避(防止惊群效应)
public class JitteredExponentialBackoffRetry implements RetryStrategy {
private final int maxAttempts;
private final long baseDelayMs;
private final double multiplier;
private final long maxDelayMs;
private final Random random = new Random();
@Override
public OptionalLong shouldRetry(int attempt, Exception exception) {
if (attempt >= maxAttempts) return OptionalLong.empty();
if (!isRetryable(exception)) return OptionalLong.empty();
long baseDelay = (long) (baseDelayMs * Math.pow(multiplier, attempt - 1));
baseDelay = Math.min(baseDelay, maxDelayMs);
// 在[0.5*baseDelay, 1.5*baseDelay]范围内加入随机抖动
long jitter = (long) (baseDelay * 0.5 * random.nextDouble());
long delay = baseDelay / 2 + jitter + (long) (baseDelay * 0.5 * random.nextDouble());
return OptionalLong.of(delay);
}
}熔断器模式的引入
只有重试还不够,如果某个工具或下游服务已经彻底挂了,重试只会浪费资源。这时候需要熔断器:
@Component
public class CircuitBreaker {
public enum State {
CLOSED, // 正常,允许调用
OPEN, // 熔断,拒绝所有调用
HALF_OPEN // 试探,允许少量调用测试是否恢复
}
private volatile State state = State.CLOSED;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicInteger successCount = new AtomicInteger(0);
private volatile long openTimestamp = 0;
// 配置参数
private final int failureThreshold; // 触发熔断的连续失败次数
private final long openDurationMs; // 熔断持续时间
private final int halfOpenSuccessNeeded; // 半开状态下需要几次成功才能恢复
public <T> T execute(String toolName, Callable<T> execution) throws Exception {
// 检查熔断状态
switch (state) {
case OPEN -> {
// 检查是否可以进入半开状态
if (System.currentTimeMillis() - openTimestamp > openDurationMs) {
transitionTo(State.HALF_OPEN);
log.info("熔断器{}进入半开状态,尝试恢复", toolName);
} else {
throw new CircuitBreakerOpenException(
String.format("工具%s熔断中,预计%ds后恢复",
toolName,
(openDurationMs - (System.currentTimeMillis() - openTimestamp)) / 1000)
);
}
}
case HALF_OPEN -> {
// 半开状态只允许有限的调用通过
log.info("熔断器{}半开状态,允许探测调用", toolName);
}
case CLOSED -> {
// 正常状态,直接执行
}
}
try {
T result = execution.call();
onSuccess(toolName);
return result;
} catch (Exception e) {
onFailure(toolName, e);
throw e;
}
}
private void onSuccess(String toolName) {
if (state == State.HALF_OPEN) {
int successes = successCount.incrementAndGet();
if (successes >= halfOpenSuccessNeeded) {
transitionTo(State.CLOSED);
log.info("熔断器{}恢复正常", toolName);
}
} else {
failureCount.set(0); // 成功后重置失败计数
}
}
private void onFailure(String toolName, Exception e) {
if (state == State.HALF_OPEN) {
// 半开状态下失败,重新熔断
transitionTo(State.OPEN);
log.warn("熔断器{}半开测试失败,重新熔断", toolName);
} else {
int failures = failureCount.incrementAndGet();
if (failures >= failureThreshold) {
transitionTo(State.OPEN);
log.error("工具{}连续失败{}次,触发熔断", toolName, failures);
}
}
}
private void transitionTo(State newState) {
this.state = newState;
if (newState == State.OPEN) {
this.openTimestamp = System.currentTimeMillis();
this.successCount.set(0);
} else if (newState == State.CLOSED) {
this.failureCount.set(0);
this.successCount.set(0);
}
}
}把所有组件组合成弹性工具执行框架
把超时、重试、熔断器整合到一个统一的执行框架里:
@Component
public class ResilientToolExecutor {
private final TimedToolExecutor timedExecutor;
private final Map<String, CircuitBreaker> circuitBreakers = new ConcurrentHashMap<>();
private final Map<String, RetryStrategy> retryStrategies = new HashMap<>();
private final ToolCallMonitor monitor;
// 初始化各工具的策略配置
@PostConstruct
public void init() {
// 快速内部查询:2秒超时,3次重试,短退避
registerTool("get_order_by_id",
ToolConfig.builder()
.timeoutMs(2000)
.retryStrategy(new ExponentialBackoffRetry(3, 200, 2.0, 2000))
.circuitBreaker(new CircuitBreaker(5, 30000, 2))
.build()
);
// 外部API调用:5秒超时,2次重试,带抖动
registerTool("external_logistics_api",
ToolConfig.builder()
.timeoutMs(5000)
.retryStrategy(new JitteredExponentialBackoffRetry(2, 1000, 2.0, 5000))
.circuitBreaker(new CircuitBreaker(3, 60000, 3))
.build()
);
// 数据库写操作:不重试(幂等性不保证)
registerTool("create_order",
ToolConfig.builder()
.timeoutMs(3000)
.retryStrategy(NoRetryStrategy.INSTANCE)
.circuitBreaker(new CircuitBreaker(5, 30000, 2))
.build()
);
}
public ToolResult execute(String toolName, Map<String, Object> params, AgentContext context) {
ToolConfig config = getConfig(toolName);
CircuitBreaker breaker = circuitBreakers.get(toolName);
RetryStrategy retryStrategy = config.getRetryStrategy();
long startTime = System.currentTimeMillis();
Exception lastException = null;
int attempt = 0;
while (true) {
attempt++;
try {
// 通过熔断器执行
ToolResult result = breaker.execute(toolName, () ->
timedExecutor.executeWithTimeout(
toolName,
() -> doExecute(toolName, params, context),
config.getTimeoutMs()
).get()
);
monitor.recordSuccess(toolName, System.currentTimeMillis() - startTime);
return result;
} catch (CircuitBreakerOpenException e) {
// 熔断器打开,直接返回失败,不重试
monitor.recordCircuitBreakerOpen(toolName);
return ToolResult.failure(
ToolResult.ErrorType.SERVICE_UNAVAILABLE,
"CIRCUIT_OPEN",
e.getMessage()
);
} catch (Exception e) {
lastException = e;
monitor.recordFailure(toolName, e, System.currentTimeMillis() - startTime);
// 查询重试策略
OptionalLong retryDelay = retryStrategy.shouldRetry(attempt, e);
if (retryDelay.isEmpty()) {
log.warn("工具{}第{}次尝试失败,不再重试:{}", toolName, attempt, e.getMessage());
break;
}
log.info("工具{}第{}次尝试失败,{}ms后重试", toolName, attempt, retryDelay.getAsLong());
try {
Thread.sleep(retryDelay.getAsLong());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
// 所有重试失败,返回最终错误
return ToolResult.failure(
classifyException(lastException),
"EXECUTION_FAILED",
String.format("工具%s执行失败(已尝试%d次):%s", toolName, attempt, lastException.getMessage())
);
}
private ToolResult.ErrorType classifyException(Exception e) {
if (e instanceof ToolTimeoutException) return ToolResult.ErrorType.TIMEOUT;
if (e instanceof RateLimitException) return ToolResult.ErrorType.RATE_LIMITED;
if (e instanceof IllegalArgumentException) return ToolResult.ErrorType.INVALID_PARAMS;
return ToolResult.ErrorType.UNKNOWN_ERROR;
}
}超时配置的科学方法
超时数值怎么设?不是拍脑袋,要基于数据:
// 通过历史数据计算合理的超时阈值
public class TimeoutCalculator {
private final MetricsRepository metricsRepo;
/**
* 基于P95响应时间设置超时
* 通常设置为P95的1.5到2倍
*/
public long calculateTimeout(String toolName) {
List<Long> recentLatencies = metricsRepo.getRecentLatencies(
toolName, Duration.ofHours(24)
);
if (recentLatencies.isEmpty()) {
return 5000L; // 没有历史数据,默认5秒
}
recentLatencies.sort(Long::compareTo);
int p95Index = (int) (recentLatencies.size() * 0.95);
long p95Latency = recentLatencies.get(p95Index);
// P95的2倍作为超时,但有上下限
long timeout = p95Latency * 2;
timeout = Math.max(timeout, 1000L); // 最少1秒
timeout = Math.min(timeout, 30000L); // 最多30秒
log.info("工具{}的P95延迟:{}ms,推荐超时:{}ms", toolName, p95Latency, timeout);
return timeout;
}
}这个方法在我们的生产环境里用了一段时间,效果不错。每周自动重新计算一次各工具的超时阈值,既不会设得太小导致误超时,也不会设得太大浪费资源。
Mermaid流程图:弹性执行框架的完整流程
写操作的特殊处理
前面的重试框架有一个重要的注意点:写操作通常不应该自动重试,因为你不确定第一次调用是否真的失败了(可能只是响应超时,但实际上执行成功了)。盲目重试写操作会导致重复创建订单、重复扣款等严重问题。
// 写操作需要幂等性支持才能重试
public class IdempotentWriteRetry implements RetryStrategy {
private final IdempotencyChecker idempotencyChecker;
@Override
public OptionalLong shouldRetry(int attempt, Exception exception) {
if (attempt >= 2) return OptionalLong.empty(); // 写操作最多重试一次
// 只有在确认操作未执行时才重试
// 超时的情况下需要先查询一次确认状态
if (exception instanceof ToolTimeoutException) {
// 超时后不立即重试,而是先查询操作是否已执行
return OptionalLong.of(2000L);
}
// 连接错误可以重试(操作肯定没执行)
if (exception instanceof ConnectionException) {
return OptionalLong.of(1000L);
}
return OptionalLong.empty();
}
}理想的做法是给写操作加上幂等键(idempotency key),这样即使重复调用也不会产生副作用。但这要求下游服务支持幂等键,需要和后端同学配合。
监控指标的设计
弹性框架的效果需要通过指标来验证:
// 关键指标
- tool_call_total{tool, status} // 总调用次数(成功/失败)
- tool_call_duration_p99{tool} // P99延迟
- tool_retry_count{tool} // 重试次数
- circuit_breaker_state{tool, state} // 熔断器状态
- circuit_breaker_open_total{tool} // 熔断触发次数在Grafana里配置这些指标的面板,当circuit_breaker_open_total在5分钟内上涨时触发告警,基本上能在用户大规模感知之前就发现问题。
弹性工具执行框架看起来工程量不小,但这些投入是值得的。一个在生产环境稳定运行的Agent,必须对工具调用的各种失败场景有完善的处理能力。超时、重试、熔断——这三个机制加在一起,能让你的Agent的可用性从"开发环境可用"提升到"生产环境可靠"。
