Spring AI 的 Retry 机制深度解析——不是加个 @Retryable 就完了
Spring AI 的 Retry 机制深度解析——不是加个 @Retryable 就完了
有一段时间,我们的 AI 调用服务每天凌晨都会收到告警:成本异常飙升,Token 消耗超出预算两三倍。排查了很久,最后发现根因出乎意料——是重试策略写错了。
具体情况是这样的:我们调用某 LLM 的 API,正常情况下每分钟请求限制是 60 次。某个业务高峰期,请求量超了限速,API 返回了 429(Too Many Requests)。这本来是个正常的流量控制响应,但我们的重试策略直接配成了"遇到任何 HTTP 错误最多重试 5 次,间隔 500ms"。
结果:大量 429 响应触发了立即重试,重试本身又触发了更多 429,形成了雪崩式的重试风暴。在原本只需要等待一分钟让限速窗口重置的情况下,我们的系统发出了数倍于正常的请求,每个请求都产生了 input tokens 的消耗(即使最终因为 429 没有返回 output),成本直接翻了 3 倍多。
这个事故给我上了非常深刻的一课:AI 场景下的重试,和普通的 HTTP 服务重试有本质区别。
AI 调用重试的特殊性
普通 Web 服务的重试场景:网络抖动、服务暂时不可用、超时——这类场景下,快速重试是合理的,因为重试成本基本为零(或者极低)。
AI 调用的重试场景要复杂得多,错误类型完全不同:
错误类型分类
关键判断维度:
这个错误是暂时性的吗? 429 和 503 是暂时性的,400 和 401 是永久性的(针对当前请求)。
重试成本是多少? 如果是 Server Streaming,生成到一半断了,重试就要从头再消耗 input tokens。如果是 Unary call,重试就是重新消耗完整的 input tokens。
限速重试需要等多久? 不同的 LLM 提供商限速策略不同,有的是每分钟限速,有的是每天限速,直接决定了退避策略的等待时长。
上下文是否需要携带? 某些错误重试时需要保留会话上下文,不能直接重发,否则语义会断裂。
@Retryable 的局限性
Spring Retry 的 @Retryable 确实很方便,但在 AI 调用场景下有几个硬伤:
// 这是一个看起来没问题,实际有大坑的写法
@Service
public class NaiveAIService {
@Retryable(
value = {Exception.class}, // 所有异常都重试!这是第一个问题
maxAttempts = 5,
backoff = @Backoff(delay = 500) // 固定500ms,不区分错误类型!这是第二个问题
)
public String callAI(String prompt) {
return chatModel.call(prompt);
}
}问题一:不区分异常类型,所有异常都重试,包括 400 Bad Request(重试根本没用)和内容安全拒绝(重试还有合规风险)。
问题二:固定退避时间,429 限流需要等到限速窗口重置(可能是几十秒),固定 500ms 的重试只是在白白消耗配额。
问题三:没有 Token 成本感知,不知道重试会带来多少额外成本,也没有成本熔断机制。
问题四:无法处理流式场景,如果是 Streaming 调用,在中途失败时,@Retryable 无法区分"从未开始生成"和"生成了一半断了"。
退避策略设计
正确的退避策略应该是这样的:
代码:自定义 AI 重试策略的实现
核心重试策略
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffContext;
import org.springframework.retry.backoff.BackOffInterruptedException;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.BackOffContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
/**
* AI 专用重试策略,区分错误类型,避免盲目重试
*/
public class AIRetryPolicy implements RetryPolicy {
private final int maxRateLimitRetries;
private final int maxServiceErrorRetries;
public AIRetryPolicy(int maxRateLimitRetries, int maxServiceErrorRetries) {
this.maxRateLimitRetries = maxRateLimitRetries;
this.maxServiceErrorRetries = maxServiceErrorRetries;
}
@Override
public boolean canRetry(RetryContext context) {
Throwable lastThrowable = context.getLastThrowable();
if (lastThrowable == null) return true;
// 提取 HTTP 状态码
int statusCode = extractStatusCode(lastThrowable);
return switch (statusCode) {
case 429 -> {
// 限流:允许重试,但次数有限
int retryCount = context.getRetryCount();
yield retryCount < maxRateLimitRetries;
}
case 500, 502, 503, 504 -> {
// 服务端错误:允许重试
int retryCount = context.getRetryCount();
yield retryCount < maxServiceErrorRetries;
}
case 400, 401, 403, 404 -> false; // 客户端错误:不重试
default -> {
// 网络超时、连接异常:允许有限重试
if (lastThrowable instanceof java.net.SocketTimeoutException
|| lastThrowable instanceof java.io.IOException) {
yield context.getRetryCount() < 2;
}
yield false;
}
};
}
private int extractStatusCode(Throwable throwable) {
if (throwable instanceof HttpClientErrorException e) {
return e.getStatusCode().value();
}
if (throwable instanceof HttpServerErrorException e) {
return e.getStatusCode().value();
}
// Spring AI 的 API 异常通常包装成特定异常类型
if (throwable.getMessage() != null) {
if (throwable.getMessage().contains("429")) return 429;
if (throwable.getMessage().contains("503")) return 503;
}
return -1;
}
@Override
public RetryContext open(RetryContext parent) {
return new AIRetryContext(parent);
}
@Override
public void close(RetryContext context) {}
@Override
public void registerThrowable(RetryContext context, Throwable throwable) {
((AIRetryContext) context).registerThrowable(throwable);
}
// 内部 Context 类,可以携带额外信息
static class AIRetryContext extends org.springframework.retry.context.RetryContextSupport {
AIRetryContext(RetryContext parent) {
super(parent);
}
}
}智能退避策略
/**
* AI 场景下的智能退避策略
* - 429 错误:读取 Retry-After 头,或使用较长的初始等待时间
* - 其他错误:指数退避 + 随机抖动
*/
@Component
public class AIBackOffPolicy implements BackOffPolicy {
private static final Logger log = LoggerFactory.getLogger(AIBackOffPolicy.class);
// 限流重试的基础等待时间(秒)
private static final long RATE_LIMIT_BASE_WAIT_MS = 60_000L;
// 服务错误的基础等待时间
private static final long SERVICE_ERROR_BASE_WAIT_MS = 1_000L;
// 最大等待时间
private static final long MAX_WAIT_MS = 120_000L;
@Override
public BackOffContext start(RetryContext context) {
return new AIBackOffContext();
}
@Override
public void backOff(BackOffContext backOffContext) throws BackOffInterruptedException {
AIBackOffContext ctx = (AIBackOffContext) backOffContext;
RetryContext retryContext = ctx.getRetryContext();
Throwable lastThrowable = retryContext != null
? retryContext.getLastThrowable() : null;
long waitMs = calculateWaitTime(lastThrowable, ctx.getAttemptCount());
ctx.incrementAttempt();
log.info("AI调用重试等待 {}ms,当前重试次数: {},错误类型: {}",
waitMs, ctx.getAttemptCount(),
lastThrowable != null ? lastThrowable.getClass().getSimpleName() : "unknown");
try {
Thread.sleep(waitMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BackOffInterruptedException("Interrupted during AI retry backoff", e);
}
}
private long calculateWaitTime(Throwable throwable, int attempt) {
if (throwable == null) return SERVICE_ERROR_BASE_WAIT_MS;
// 提取 Retry-After 头(如果有的话)
Long retryAfterMs = extractRetryAfterMs(throwable);
if (retryAfterMs != null) {
log.info("使用 Retry-After 头中的等待时间: {}ms", retryAfterMs);
return Math.min(retryAfterMs + 1000, MAX_WAIT_MS); // 额外加 1s buffer
}
boolean isRateLimit = throwable.getMessage() != null
&& throwable.getMessage().contains("429");
long baseWait = isRateLimit ? RATE_LIMIT_BASE_WAIT_MS : SERVICE_ERROR_BASE_WAIT_MS;
long exponentialWait = (long) (baseWait * Math.pow(2, attempt - 1));
long jitter = ThreadLocalRandom.current().nextLong(0, exponentialWait / 4);
return Math.min(exponentialWait + jitter, MAX_WAIT_MS);
}
private Long extractRetryAfterMs(Throwable throwable) {
// 尝试从异常中提取 Retry-After 信息
if (throwable instanceof HttpClientErrorException e) {
String retryAfter = e.getResponseHeaders() != null
? e.getResponseHeaders().getFirst("Retry-After") : null;
if (retryAfter != null) {
try {
return Long.parseLong(retryAfter) * 1000L;
} catch (NumberFormatException ignored) {}
}
}
return null;
}
static class AIBackOffContext implements BackOffContext {
private int attemptCount = 1;
private RetryContext retryContext;
void incrementAttempt() { attemptCount++; }
int getAttemptCount() { return attemptCount; }
RetryContext getRetryContext() { return retryContext; }
void setRetryContext(RetryContext ctx) { this.retryContext = ctx; }
}
}重试模板配置与使用
@Configuration
public class RetryConfig {
@Bean
public RetryTemplate aiRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// 使用自定义 AI 重试策略
AIRetryPolicy retryPolicy = new AIRetryPolicy(
3, // 限流错误最多重试3次
2 // 服务错误最多重试2次
);
template.setRetryPolicy(retryPolicy);
// 使用自定义退避策略
template.setBackOffPolicy(new AIBackOffPolicy());
// 添加重试监听器,用于监控和日志
template.registerListener(new RetryListenerSupport() {
private static final Logger log = LoggerFactory.getLogger("AIRetryMonitor");
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback,
Throwable throwable) {
log.warn("AI调用失败,准备重试。重试次数: {}, 错误: {}",
context.getRetryCount(), throwable.getMessage());
// 上报到监控系统
// metricsService.recordRetry(throwable.getMessage());
}
@Override
public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback,
Throwable throwable) {
if (throwable != null) {
log.error("AI调用最终失败,已重试 {} 次", context.getRetryCount());
}
}
});
return template;
}
}
@Service
public class AICallService {
@Autowired
private ChatModel chatModel;
@Autowired
private RetryTemplate aiRetryTemplate;
@Autowired
private MeterRegistry meterRegistry; // Micrometer 监控
public String callWithRetry(String prompt) {
Timer.Sample sample = Timer.start(meterRegistry);
Counter retryCounter = meterRegistry.counter("ai.retry.count");
try {
return aiRetryTemplate.execute(context -> {
if (context.getRetryCount() > 0) {
retryCounter.increment();
log.info("第 {} 次重试调用 AI", context.getRetryCount());
}
return chatModel.call(prompt);
});
} finally {
sample.stop(meterRegistry.timer("ai.call.duration"));
}
}
}重试日志和监控
@Component
public class AIRetryMonitor {
private static final Logger log = LoggerFactory.getLogger(AIRetryMonitor.class);
@Autowired
private MeterRegistry meterRegistry;
// 记录重试统计
private final Map<String, AtomicInteger> retryStats = new ConcurrentHashMap<>();
/**
* 重试事件记录,用于实时监控和告警
*/
public void recordRetry(String errorType, int retryCount) {
String key = "retry_" + errorType;
retryStats.computeIfAbsent(key, k -> new AtomicInteger(0)).incrementAndGet();
// Micrometer 指标
meterRegistry.counter("ai.retry",
"error_type", errorType,
"retry_count", String.valueOf(retryCount)).increment();
// 超过阈值告警
if (retryStats.get(key).get() > 100) {
log.error("AI调用重试频率异常! 错误类型: {}, 最近重试次数: {}",
errorType, retryStats.get(key).get());
// 触发告警通知(钉钉/企微/PagerDuty等)
}
}
/**
* 定时重置统计,避免历史数据干扰
*/
@Scheduled(fixedRate = 60000)
public void resetStats() {
retryStats.clear();
}
/**
* 重试成本估算
*/
public double estimateRetryCost(int inputTokens, int retryCount) {
// 每次重试都会重新消耗 input tokens(部分提供商会,部分不会)
double costPerMillionInputTokens = 15.0; // 以 Claude Opus 为例
return inputTokens * retryCount * costPerMillionInputTokens / 1_000_000.0;
}
}真实事故复盘:错误的重试策略让成本翻了 3 倍
回到开头说的那个事故,根本原因在于几个错误叠加:
错误一:把 429 当做普通错误重试
429 是限速,不是错误。正确做法是等待限速窗口重置后再重试,而不是立即重试。立即重试只会产生更多 429,一次也进不去,但每次尝试都在消耗计数器。
错误二:没有指数退避
固定 500ms 间隔在限速场景下完全没用。如果限速窗口是 60 秒,500ms 的等待之后再试,结果还是 429,5 次重试全都失败,但消耗了 5 倍的 input tokens。
错误三:没有成本熔断
当重试消耗的 Token 成本超过某个阈值时,应该停止重试并告警,而不是继续无限重试。
改进方案:
@Component
public class CostAwareAIService {
@Autowired
private RetryTemplate aiRetryTemplate;
@Autowired
private ChatModel chatModel;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final double MAX_RETRY_COST_PER_HOUR = 10.0; // $10/小时的重试成本上限
public String callWithCostAwareRetry(String prompt, String tenantId) {
// 检查是否已超出重试成本上限
if (isRetryCostExceeded(tenantId)) {
throw new AIServiceException("AI重试成本超限,已暂停自动重试,请稍后再试");
}
int[] inputTokenCount = {0};
return aiRetryTemplate.execute(context -> {
if (context.getRetryCount() > 0) {
// 估算并记录重试成本
double retryCost = estimateCost(inputTokenCount[0]);
recordRetryCost(tenantId, retryCost);
log.info("AI重试成本记录: tenantId={}, retryCost=${:.4f}", tenantId, retryCost);
}
// 实际调用
String result = chatModel.call(prompt);
// 记录 token 数(用于后续重试成本估算)
inputTokenCount[0] = estimateInputTokens(prompt);
return result;
});
}
private boolean isRetryCostExceeded(String tenantId) {
String key = "retry:cost:" + tenantId + ":" + LocalDateTime.now().getHour();
String costStr = redisTemplate.opsForValue().get(key);
if (costStr == null) return false;
return Double.parseDouble(costStr) > MAX_RETRY_COST_PER_HOUR;
}
private void recordRetryCost(String tenantId, double cost) {
String key = "retry:cost:" + tenantId + ":" + LocalDateTime.now().getHour();
// 原子加
redisTemplate.opsForValue().increment(key, (long)(cost * 10000));
redisTemplate.expire(key, 2, TimeUnit.HOURS);
}
private double estimateCost(int inputTokens) {
return inputTokens * 15.0 / 1_000_000.0;
}
private int estimateInputTokens(String prompt) {
// 粗略估算:中文约1.5字符/token,英文约4字符/token
return prompt.length() / 3;
}
}总结
AI 调用的重试策略设计,核心原则有三条:
区分错误类型,不同类型用不同策略。429 需要等待限速重置,不能快速重试;400 根本不用重试;503 适合指数退避。
退避时间要与限速窗口匹配。读取
Retry-After响应头,如果没有,限流重试至少等 60 秒起步。加入成本熔断。重试会产生额外的 Token 成本,当重试成本超过阈值时,停止重试并告警,让人工介入。
不要用 @Retryable 一把梭,它太通用,没有 AI 场景的成本意识。用 RetryTemplate + 自定义 RetryPolicy + BackOffPolicy 的组合,才能把重试做对。
