AI 调用的超时处理——不是设个 timeout 就完了
AI 调用的超时处理——不是设个 timeout 就完了
有一次我在用公司内部的 AI 助手写一份方案,点了"生成",然后盯着屏幕等。
等了 30 秒,什么都没有。
又等了 30 秒,还是什么都没有。
我以为卡住了,刷新页面,刚刚的请求没了,重新生成,又等了一分钟,这次终于出来了。
后来我去看服务端日志,发现第一次请求其实是成功的——AI 模型花了 47 秒生成了结果,但前端没有任何展示,因为我等不住刷新了。
去查代码,发现前端的 fetch 请求设了 30 秒超时,AI 调用超过 30 秒被前端直接断掉,用户什么都看不到。服务端还在傻乎乎地继续跑,Token 照样消耗,只是结果没人收。
这件事让我意识到:AI 调用的超时,是一个被严重低估的工程问题。
为什么 AI 超时比普通 HTTP 超时复杂
普通 HTTP 接口,响应时间通常在毫秒到几秒之间,超时设个 10 秒基本够用,超时了就报错重试,逻辑简单。
AI 调用有几个特殊性:
1. 响应时间变化范围极大
同样的接口,一句简单的问候可能 1 秒返回,一篇 2000 字的文章可能需要 40 秒甚至更长。你没法用一个固定超时来覆盖所有场景:设短了,长任务全部超时;设长了,真正卡死的请求要等很久才能被发现。
2. 流式响应的超时语义完全不同
非流式响应:等待服务器返回完整结果,超时是针对整个响应的。
流式响应(SSE / Server-Sent Events):服务器边生成边发,客户端边接收边展示。这里的超时有两个维度:
- 首 Token 超时:从发出请求到收到第一个 Token 的时间
- Token 间隔超时:相邻两个 Token 之间的最大等待时间(如果这段时间没收到新 Token,说明流可能中断了)
- 总时长超时:整个流式响应的总时间上限
这三个超时要分别设置,混在一起会出问题。
3. 超时之后用户体验的处理
超时了怎么告诉用户?这个问题在普通接口里很简单:返回错误。
但 AI 场景更复杂:
- 非流式:直接报错,让用户重试
- 流式:已经展示了一部分内容,超时时怎么处理?强制截断内容?还是展示"生成被中断"?
- 如果是复杂任务(比如分析一份报告),超时后应该保存中间结果,还是直接丢弃?
4. 超时和重试的配合
超时之后自动重试是对的吗?对于幂等的只读请求(比如"给这段文字做个摘要"),重试没问题。但对于会产生副作用的操作(比如"根据这条指令执行操作"),重试可能导致重复执行。
超时策略的分层设计
我们现在的超时策略分三层:
第一层:首 Token 超时
从请求发出到收到第一个 Token 的时间。这个时间正常不超过 5-8 秒(GPT-4o),如果超过 15 秒还没有首 Token,大概率是服务端有问题,可以考虑直接放弃或切换模型。
第二层:Token 流动心跳超时
在流式响应过程中,相邻 Token 之间的最大间隔。正常生成时,Token 是持续流动的,极少有超过 3 秒的空白。如果超过 10 秒没有新 Token,说明流可能已经断了(网络问题或服务端内部错误),需要处理。
第三层:总时长上限
整个请求从开始到结束的最大允许时间。这个值根据业务设置,一般 120-300 秒。超过这个时间强制终止,避免无限等待。
前端 SSE 心跳保活
流式响应依赖 SSE 连接保持,但在某些网络环境(尤其是通过代理的情况)下,长时间没有数据传输的 TCP 连接会被中间设备断开。
解决方案:服务端定期发送心跳事件(一个特殊的 SSE 事件,内容为空或者是进度信息),即使 AI 还在"思考"没有产出 Token,也保持连接活跃。
完整代码实现
服务端:流式响应 + 超时控制
@RestController
@RequestMapping("/api/ai")
@Slf4j
public class AiStreamController {
@Autowired
private UnifiedModelService modelService;
// 首 Token 超时
private static final long FIRST_TOKEN_TIMEOUT_MS = 15_000;
// Token 流动心跳间隔(服务端主动发送,保活连接)
private static final long HEARTBEAT_INTERVAL_MS = 3_000;
// 总时长上限
private static final long TOTAL_TIMEOUT_MS = 180_000;
@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(@RequestParam String message,
@RequestParam String conversationId,
@RequestHeader("X-User-Id") String userId) {
SseEmitter emitter = new SseEmitter(TOTAL_TIMEOUT_MS);
// 注册完成/超时/错误回调
emitter.onCompletion(() -> log.debug("SSE completed for conversation {}", conversationId));
emitter.onTimeout(() -> {
log.warn("SSE total timeout for conversation {}, user {}", conversationId, userId);
try {
emitter.send(SseEmitter.event()
.name("error")
.data("{\"code\":\"TOTAL_TIMEOUT\",\"message\":\"生成超时,请重试\"}"));
} catch (IOException e) {
// 发送失败忽略
}
emitter.complete();
});
emitter.onError(e -> {
log.error("SSE error for conversation {}: {}", conversationId, e.getMessage());
emitter.complete();
});
// 异步执行 AI 调用
CompletableFuture.runAsync(() -> {
executeStreamWithTimeout(emitter, message, conversationId, userId);
});
return emitter;
}
private void executeStreamWithTimeout(SseEmitter emitter, String message,
String conversationId, String userId) {
AtomicLong lastTokenTime = new AtomicLong(System.currentTimeMillis());
AtomicBoolean firstTokenReceived = new AtomicBoolean(false);
AtomicBoolean completed = new AtomicBoolean(false);
// 启动心跳线程(保持 SSE 连接,同时检测超时)
ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> heartbeatTask = heartbeatExecutor.scheduleAtFixedRate(() -> {
if (completed.get()) {
return;
}
long now = System.currentTimeMillis();
long sinceLastToken = now - lastTokenTime.get();
// 检查首 Token 超时
if (!firstTokenReceived.get() && sinceLastToken > FIRST_TOKEN_TIMEOUT_MS) {
log.warn("First token timeout for conversation {}", conversationId);
try {
emitter.send(SseEmitter.event()
.name("error")
.data("{\"code\":\"FIRST_TOKEN_TIMEOUT\",\"message\":\"模型响应超时,正在切换备用模型...\"}"));
// TODO: 触发切换到备用模型的逻辑
} catch (IOException e) {
log.debug("Heartbeat send failed: {}", e.getMessage());
}
completed.set(true);
emitter.complete();
return;
}
// 检查 Token 流动超时(首 Token 之后)
if (firstTokenReceived.get() && sinceLastToken > 10_000) {
log.warn("Token stream stalled for conversation {}, {}ms since last token",
conversationId, sinceLastToken);
try {
emitter.send(SseEmitter.event()
.name("error")
.data("{\"code\":\"STREAM_STALLED\",\"message\":\"内容生成中断,请重试\"}"));
} catch (IOException e) {
log.debug("Error send failed: {}", e.getMessage());
}
completed.set(true);
emitter.complete();
return;
}
// 发送心跳,保持连接
if (!firstTokenReceived.get()) {
try {
emitter.send(SseEmitter.event()
.name("heartbeat")
.data("{\"type\":\"thinking\"}"));
} catch (IOException e) {
completed.set(true);
}
}
}, HEARTBEAT_INTERVAL_MS, HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
try {
// 执行流式 AI 调用
UnifiedChatRequest request = buildRequest(message, conversationId);
modelService.chatStream(request)
.doOnNext(token -> {
firstTokenReceived.set(true);
lastTokenTime.set(System.currentTimeMillis());
try {
emitter.send(SseEmitter.event()
.name("token")
.data("{\"content\":" + escapeJson(token) + "}"));
} catch (IOException e) {
log.debug("Token send failed: {}", e.getMessage());
completed.set(true);
}
})
.doOnComplete(() -> {
completed.set(true);
try {
emitter.send(SseEmitter.event()
.name("done")
.data("{\"status\":\"completed\"}"));
emitter.complete();
} catch (IOException e) {
log.debug("Done event send failed: {}", e.getMessage());
}
})
.doOnError(e -> {
completed.set(true);
log.error("Stream error for conversation {}: {}", conversationId, e.getMessage());
try {
emitter.send(SseEmitter.event()
.name("error")
.data("{\"code\":\"AI_ERROR\",\"message\":\"生成失败,请重试\"}"));
} catch (IOException ex) {
log.debug("Error event send failed", ex);
}
emitter.completeWithError(e);
})
.block(); // 阻塞直到流完成
} catch (Exception e) {
completed.set(true);
log.error("Stream execution error for conversation {}: {}", conversationId, e.getMessage());
emitter.completeWithError(e);
} finally {
heartbeatTask.cancel(false);
heartbeatExecutor.shutdown();
}
}
private String escapeJson(String text) {
return "\"" + text
.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
+ "\"";
}
}前端:SSE 客户端 + 超时处理
class AiStreamClient {
constructor(options = {}) {
this.firstTokenTimeout = options.firstTokenTimeout || 15000;
this.streamIdleTimeout = options.streamIdleTimeout || 10000;
this.totalTimeout = options.totalTimeout || 180000;
}
async streamChat(message, conversationId, handlers) {
const { onToken, onError, onComplete, onThinking } = handlers;
let firstTokenReceived = false;
let lastTokenTime = Date.now();
let firstTokenTimer = null;
let idleTimer = null;
let totalTimer = null;
let eventSource = null;
const cleanup = () => {
clearTimeout(firstTokenTimer);
clearTimeout(idleTimer);
clearTimeout(totalTimer);
if (eventSource) {
eventSource.close();
eventSource = null;
}
};
const resetIdleTimer = () => {
clearTimeout(idleTimer);
idleTimer = setTimeout(() => {
cleanup();
onError({ code: 'STREAM_STALLED', message: '内容生成中断,请重试' });
}, this.streamIdleTimeout);
};
try {
const url = `/api/ai/chat/stream?message=${encodeURIComponent(message)}&conversationId=${conversationId}`;
eventSource = new EventSource(url);
// 首 Token 超时计时
firstTokenTimer = setTimeout(() => {
if (!firstTokenReceived) {
cleanup();
onError({ code: 'FIRST_TOKEN_TIMEOUT', message: '响应超时,请重试' });
}
}, this.firstTokenTimeout);
// 总时长超时
totalTimer = setTimeout(() => {
cleanup();
onError({ code: 'TOTAL_TIMEOUT', message: '生成超时,请重试' });
}, this.totalTimeout);
// 处理心跳事件(服务端在 AI "思考"时发送)
eventSource.addEventListener('heartbeat', (event) => {
const data = JSON.parse(event.data);
if (data.type === 'thinking' && onThinking) {
onThinking();
}
});
// 处理 Token 事件
eventSource.addEventListener('token', (event) => {
if (!firstTokenReceived) {
firstTokenReceived = true;
clearTimeout(firstTokenTimer);
}
lastTokenTime = Date.now();
resetIdleTimer(); // 每收到一个 Token,重置空闲计时
const data = JSON.parse(event.data);
onToken(data.content);
});
// 处理完成事件
eventSource.addEventListener('done', () => {
cleanup();
onComplete();
});
// 处理错误事件(服务端主动发送的业务错误)
eventSource.addEventListener('error', (event) => {
if (event.data) {
try {
const error = JSON.parse(event.data);
cleanup();
onError(error);
return;
} catch (e) {
// 不是 JSON,走默认处理
}
}
// SSE 连接断开
cleanup();
onError({ code: 'CONNECTION_ERROR', message: '连接断开,请重试' });
});
} catch (error) {
cleanup();
onError({ code: 'NETWORK_ERROR', message: '网络错误,请重试' });
}
// 返回取消函数
return () => {
cleanup();
};
}
}
// 使用示例
const client = new AiStreamClient({
firstTokenTimeout: 15000,
streamIdleTimeout: 10000,
totalTimeout: 180000
});
let fullContent = '';
let isGenerating = false;
const cancelFn = await client.streamChat(
userMessage,
conversationId,
{
onThinking: () => {
showThinkingIndicator(); // 展示"AI 思考中..."动画
},
onToken: (token) => {
hideThinkingIndicator();
fullContent += token;
updateDisplayContent(fullContent); // 实时更新显示
},
onError: (error) => {
isGenerating = false;
if (error.code === 'FIRST_TOKEN_TIMEOUT') {
showMessage('响应超时,正在重试...');
// 可以在这里触发重试逻辑
} else {
showError(error.message);
}
},
onComplete: () => {
isGenerating = false;
saveConversation(conversationId, fullContent);
}
}
);
// 用户点击"停止生成"按钮时
stopButton.onclick = () => {
cancelFn();
showMessage('已停止生成');
};超时处理决策树
几个实践细节
非流式调用的超时
非流式调用的超时相对简单,但也要按请求复杂度区分。我们的策略:
- 简单问答(Prompt 短,预期输出短):30 秒
- 内容生成(预期输出长):90 秒
- 复杂分析任务:180 秒
不要所有接口都用同一个超时,那样要么太短影响长任务,要么太长让卡死的请求拖很久。
超时后的资源释放
客户端超时断开连接后,服务端的 AI 调用不会自动停止。我们需要在连接断开时取消后台的 AI 请求(使用 CompletableFuture.cancel 或 Reactor 的 dispose),否则浪费的 Token 还是要计费。
用户端的超时提示
超时提示很影响用户体验。不要直接说"超时",用户不懂什么是超时。我们的文案:
- 首 Token 超时:「AI 正在思考,比平时慢一点,请稍候...」(配合重试逻辑)
- 流中断:「内容生成中断,已为您保存已生成的部分」
- 总时长超时:「这次生成时间较长,请重新尝试」
超时指标监控
超时率是一个重要的 AI 健康指标。我们监控:
- 首 Token 超时率(正常应低于 1%)
- 流中断率(正常应低于 0.5%)
- 各 P99/P95 延迟
这些指标一旦出现异常,往往是模型服务不稳定的早期信号。
最后
那个 30 秒超时截断 AI 结果的问题,修复起来很简单:把前端超时从 30 秒改到 180 秒,加上流式心跳保活,加上首 Token 超时检测。一个下午搞定。
但背后的道理想清楚花了不少时间。AI 调用的超时不是一个数字的问题,而是一套策略的问题:什么时候放弃、怎么告诉用户、放弃后怎么处理已有的中间结果、怎么避免浪费服务端资源。这些都想清楚了,实现起来才不会出各种奇怪的问题。
