AI 应用的 SSE 生产实践——Server-Sent Events 的那些坑
AI 应用的 SSE 生产实践——Server-Sent Events 的那些坑
大概一年前,我把一个 AI 对话系统从 WebSocket 换成了 SSE。当时的想法很简单:SSE 是单向的(服务端推客户端),对话场景里确实不需要客户端推数据,用 SSE 比 WebSocket 更轻量,而且浏览器原生支持,不用额外的库。
第一版上线很顺利,测试环境跑得很好。但生产环境出现问题的时候,我才发现 SSE 在生产级场景里有一堆细节问题——断线重连逻辑、心跳保活、Nginx 反向代理的超时、多实例部署下的连接管理……每一个单独拿出来都不复杂,但组合在一起,坑踩了不少。
这篇文章把这一年踩过的坑全部整理出来。
SSE 的基本原理
Server-Sent Events 是 HTML5 标准的一部分。原理很简单:客户端发一个普通的 HTTP 请求,服务端不关闭连接,持续往这个连接里写数据,每条数据用 data: 前缀标识,两条数据之间用空行分隔。
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: 第一条消息
data: 第二条消息
data: {"type":"token","content":"你好"}
data: [DONE]客户端用 EventSource API 订阅:
const source = new EventSource('/api/stream/chat');
source.onmessage = (event) => {
console.log('收到:', event.data);
if (event.data === '[DONE]') {
source.close();
}
};
source.onerror = (error) => {
console.error('连接错误:', error);
};听起来很简单对吗?但细节魔鬼多。
坑一:Nginx 超时截断
这是我在生产环境遇到的第一个坑,也是最典型的。
问题现象:用户发起一个比较长的 AI 生成请求(比如生成一份 3000 字的报告),等待了一段时间之后,界面突然卡住,之后没有任何输出了,也没有报错提示。
根本原因:Nginx 默认配置了 proxy_read_timeout 60s,也就是说,如果上游服务 60 秒内没有向 Nginx 写入任何数据,Nginx 就会主动关闭连接。
但对于 SSE,连接应该是长期保持的,中间可能有很长时间没有新数据(比如 LLM 在思考的时候)。
解决方案一:修改 Nginx 配置
location /api/stream/ {
proxy_pass http://backend;
# 关闭缓冲,确保数据实时推送
proxy_buffering off;
proxy_cache off;
# 增加超时时间
proxy_read_timeout 3600s;
proxy_connect_timeout 60s;
proxy_send_timeout 300s;
# SSE 必需的响应头
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;
# 禁止 gzip 压缩(会影响流式传输)
gzip off;
}解决方案二:服务端发送心跳
Nginx 超时的条件是"上游没有写入数据",所以即使没有 LLM 输出,也定期发一个心跳消息,就能保持连接:
// 每 20 秒发送一个心跳注释
// SSE 中以 : 开头的行是注释,客户端会忽略
data: :heartbeat\n\n坑二:浏览器的自动重连行为
浏览器的 EventSource API 有一个"贴心"的特性:当连接断开时,它会自动重连。默认重试间隔是 3 秒。
这本来是好事,但会引发一个奇怪的问题:用户的 AI 请求还没完成,连接中途断了,浏览器自动重连之后,服务端会把同一个请求重新执行一遍,于是用户看到内容被重复输出了。
解决方案:利用 SSE 的 id 字段实现续传
SSE 协议支持为每条消息设置 id,断线重连时浏览器会在请求头中携带 Last-Event-ID,告诉服务端"我上次收到的最后一条消息是 id=xxx,请从这里继续"。
// SSE 消息格式
id: 42\n
data: 这是第 42 个 token\n
\n服务端实现续传:
@RestController
@RequestMapping("/api/sse")
@Slf4j
public class SseController {
private final SseContinuationService continuationService;
private final ChatClient chatClient;
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chat(
@RequestParam String question,
@RequestParam String sessionId,
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
// 如果客户端携带了 Last-Event-ID,说明是断线重连
if (lastEventId != null) {
log.info("断线重连,sessionId: {},lastEventId: {}", sessionId, lastEventId);
int lastIndex = Integer.parseInt(lastEventId);
// 从缓存中找到未发送的消息,从断点续传
List<String> cachedTokens = continuationService.getTokensAfter(sessionId, lastIndex);
if (!cachedTokens.isEmpty()) {
log.info("从断点续传,共 {} 个 token", cachedTokens.size());
return Flux.fromIterable(cachedTokens)
.index()
.map(indexed -> ServerSentEvent.<String>builder()
.id(String.valueOf(lastIndex + indexed.getT1() + 1))
.data(indexed.getT2())
.build());
}
}
// 新请求:开始 LLM 调用
AtomicInteger tokenIndex = new AtomicInteger(0);
return chatClient.prompt()
.user(question)
.stream()
.content()
.doOnNext(token -> {
// 把每个 token 缓存,供断线重连时使用
continuationService.cacheToken(sessionId, tokenIndex.get(), token);
})
.map(token -> ServerSentEvent.<String>builder()
.id(String.valueOf(tokenIndex.getAndIncrement()))
.event("token")
.data(token)
.build())
.concatWith(Flux.just(
ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()
))
.doOnComplete(() -> {
// 完成后延迟清理缓存(给可能的最后一次重连留时间)
continuationService.scheduleCleanup(sessionId, Duration.ofMinutes(5));
});
}
}Token 缓存服务
@Service
@Slf4j
public class SseContinuationService {
// 用 Redis 存储 token 序列,支持多实例部署
private final RedisTemplate<String, String> redisTemplate;
private static final String KEY_PREFIX = "sse:tokens:";
private static final Duration TOKEN_TTL = Duration.ofMinutes(10);
public void cacheToken(String sessionId, int index, String token) {
String key = KEY_PREFIX + sessionId;
// 用 Redis List 存储有序的 token 序列
redisTemplate.opsForList().set(key, index, token);
// 设置 TTL,防止内存泄漏
redisTemplate.expire(key, TOKEN_TTL);
}
public List<String> getTokensAfter(String sessionId, int lastIndex) {
String key = KEY_PREFIX + sessionId;
Long size = redisTemplate.opsForList().size(key);
if (size == null || size <= lastIndex + 1) {
return Collections.emptyList();
}
// 获取 lastIndex 之后的所有 token
List<String> tokens = redisTemplate.opsForList().range(key, lastIndex + 1, -1);
return tokens != null ? tokens : Collections.emptyList();
}
public void scheduleCleanup(String sessionId, Duration delay) {
String key = KEY_PREFIX + sessionId;
redisTemplate.expire(key, delay);
}
}坑三:心跳保活的实现
Nginx 超时问题的根本解法之一就是心跳。但心跳怎么发,也有讲究。
错误做法:用定时器单独发心跳,和正常的 SSE 流合并时容易出问题。
正确做法:把心跳集成到 Flux 中,使用 Flux.interval + mergeWith:
@Service
@Slf4j
public class HeartbeatAwareSseService {
private final ChatClient chatClient;
/**
* 带心跳的 SSE 流
* 当 LLM 没有输出时,定期发送心跳注释,保持连接活跃
*/
public Flux<ServerSentEvent<String>> streamWithHeartbeat(String question,
Duration heartbeatInterval) {
// LLM 输出流
Flux<ServerSentEvent<String>> contentFlux = chatClient.prompt()
.user(question)
.stream()
.content()
.map(token -> ServerSentEvent.<String>builder()
.event("token")
.data(token)
.build());
// 心跳流:每隔 heartbeatInterval 发送一个心跳
Flux<ServerSentEvent<String>> heartbeatFlux = Flux.interval(heartbeatInterval)
.map(tick -> ServerSentEvent.<String>builder()
.comment("heartbeat") // SSE 注释,客户端会忽略
.build());
// 合并:LLM 输出优先,心跳在没有输出时补位
return Flux.merge(contentFlux, heartbeatFlux)
.takeUntil(event -> {
// 当收到 [DONE] 事件时,停止心跳
return "done".equals(event.event());
})
.concatWith(Flux.just(
ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()
));
}
}客户端处理心跳:
const source = new EventSource('/api/sse/chat?question=...');
source.addEventListener('token', (event) => {
appendToChat(event.data);
});
source.addEventListener('done', (event) => {
source.close();
markChatComplete();
});
// 心跳是 comment,浏览器 EventSource 默认忽略,不需要特殊处理
// 如果想明确处理,可以监听 message 事件(注释不会触发 message)坑四:多实例部署下的连接亲和性问题
这是我踩过的最棘手的一个坑。
问题场景:系统部署了 3 个实例,前面是负载均衡器(Nginx + round-robin)。用户发起一个 SSE 请求,连到了实例 A。请求处理过程中,负载均衡器的健康检查导致实例 A 短暂不可用,于是下一个请求被路由到了实例 B。
SSE 连接断了之后,浏览器重连,这次连到了实例 B。但实例 B 上没有这个 session 的上下文(LLM 调用状态、已发送的 token 等),无法续传。
解决方案一:会话亲和性(Session Stickiness)
在 Nginx 层配置 IP hash 或 sticky 模块,让同一个客户端的请求始终路由到同一个实例:
upstream ai_backend {
ip_hash; # 基于客户端 IP 做哈希路由
server backend1:8080;
server backend2:8080;
server backend3:8080;
}这种方式简单,但会导致负载不均匀。
解决方案二:状态外置 + 无状态实例
把 SSE 连接状态(已发送的 token、进度等)存储到 Redis,服务实例变成无状态的,任意实例都可以处理续传请求:
@Service
@Slf4j
public class StatefulSseService {
private final ChatClient chatClient;
private final RedisTemplate<String, Object> redisTemplate;
/**
* 会话状态键
*/
private String sessionKey(String sessionId) {
return "sse:session:" + sessionId;
}
/**
* 开始一个新的 SSE 会话
*/
public Flux<ServerSentEvent<String>> startSession(String sessionId, String question) {
// 初始化会话状态
Map<String, Object> sessionState = new HashMap<>();
sessionState.put("question", question);
sessionState.put("status", "running");
sessionState.put("tokenCount", 0);
redisTemplate.opsForHash().putAll(sessionKey(sessionId), sessionState);
redisTemplate.expire(sessionKey(sessionId), Duration.ofHours(1));
return doStream(sessionId, question, 0);
}
/**
* 从断点续传
*/
public Flux<ServerSentEvent<String>> resumeSession(String sessionId, int lastTokenIndex) {
// 检查会话是否存在
Map<Object, Object> sessionState = redisTemplate.opsForHash()
.entries(sessionKey(sessionId));
if (sessionState.isEmpty()) {
return Flux.error(new SessionNotFoundException("会话不存在或已过期: " + sessionId));
}
String status = (String) sessionState.get("status");
if ("completed".equals(status)) {
// 会话已完成,从缓存中返回剩余 token
return getTokensFromCache(sessionId, lastTokenIndex);
} else if ("running".equals(status)) {
// 会话还在运行,但连接断了
// 需要找到当前这个会话在哪个实例上,或者重新开始(复杂场景)
log.warn("会话 {} 状态为 running 但连接断开,尝试获取缓存 token", sessionId);
return getTokensFromCache(sessionId, lastTokenIndex);
}
return Flux.error(new SessionException("会话状态异常: " + status));
}
private Flux<ServerSentEvent<String>> doStream(String sessionId,
String question,
int startIndex) {
AtomicInteger tokenIndex = new AtomicInteger(startIndex);
String tokenListKey = "sse:tokens:" + sessionId;
return chatClient.prompt()
.user(question)
.stream()
.content()
.doOnNext(token -> {
int idx = tokenIndex.get();
// 把 token 追加到 Redis List
redisTemplate.opsForList().rightPush(tokenListKey, token);
redisTemplate.expire(tokenListKey, Duration.ofHours(1));
// 更新 tokenCount
redisTemplate.opsForHash().put(sessionKey(sessionId), "tokenCount", idx + 1);
})
.map(token -> ServerSentEvent.<String>builder()
.id(String.valueOf(tokenIndex.getAndIncrement()))
.event("token")
.data(token)
.build())
.doOnComplete(() -> {
redisTemplate.opsForHash().put(sessionKey(sessionId), "status", "completed");
log.info("SSE 会话 {} 完成,共 {} 个 token",
sessionId, tokenIndex.get());
});
}
private Flux<ServerSentEvent<String>> getTokensFromCache(String sessionId, int lastIndex) {
String tokenListKey = "sse:tokens:" + sessionId;
Long size = redisTemplate.opsForList().size(tokenListKey);
if (size == null || size <= lastIndex + 1) {
return Flux.empty();
}
List<Object> tokens = redisTemplate.opsForList().range(tokenListKey, lastIndex + 1, -1);
if (tokens == null) return Flux.empty();
AtomicInteger idx = new AtomicInteger(lastIndex + 1);
return Flux.fromIterable(tokens)
.map(token -> ServerSentEvent.<String>builder()
.id(String.valueOf(idx.getAndIncrement()))
.event("token")
.data((String) token)
.build());
}
}坑五:HTTP/2 的 SSE 行为差异
HTTP/2 对 SSE 的处理和 HTTP/1.1 有一些差异,主要体现在:
HTTP/1.1 + SSE:每个 SSE 连接占用一个 TCP 连接,浏览器对同一域名有连接数限制(通常是 6)。如果同时打开多个 Tab 都用 SSE,可能耗尽连接数。
HTTP/2 + SSE:HTTP/2 的多路复用让多个 SSE 流可以共享同一个 TCP 连接,解决了连接数限制问题。但需要确保服务端支持 HTTP/2。
Spring Boot 开启 HTTP/2:
server:
http2:
enabled: true
ssl:
# HTTP/2 需要 TLS(至少在大多数浏览器要求下)
enabled: true
key-store: classpath:keystore.p12
key-store-password: password
key-store-type: PKCS12完整的生产级 SSE Controller
@RestController
@RequestMapping("/api/sse")
@Slf4j
public class ProductionSseController {
private final StatefulSseService sseService;
private final HeartbeatAwareSseService heartbeatService;
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chat(
@RequestParam String question,
@RequestParam(defaultValue = "") String sessionId,
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId,
ServerHttpResponse response) {
// 设置必要的响应头
response.getHeaders().set("Cache-Control", "no-cache");
response.getHeaders().set("X-Accel-Buffering", "no"); // 禁止 Nginx 缓冲
response.getHeaders().set("Access-Control-Allow-Origin", "*");
// 生成 session ID(如果没有)
final String finalSessionId = sessionId.isBlank() ?
UUID.randomUUID().toString() : sessionId;
Flux<ServerSentEvent<String>> contentStream;
if (lastEventId != null && !lastEventId.isBlank()) {
// 断线重连:续传
log.info("SSE 续传请求,sessionId: {},lastEventId: {}",
finalSessionId, lastEventId);
int lastIndex = parseLastEventId(lastEventId);
contentStream = sseService.resumeSession(finalSessionId, lastIndex);
} else {
// 新请求
log.info("SSE 新请求,sessionId: {},问题: {}", finalSessionId,
question.length() > 50 ? question.substring(0, 50) + "..." : question);
contentStream = sseService.startSession(finalSessionId, question);
}
// 添加心跳
return heartbeatService.addHeartbeat(contentStream, Duration.ofSeconds(20))
.doOnSubscribe(sub -> log.debug("SSE 客户端连接,sessionId: {}", finalSessionId))
.doOnCancel(() -> log.info("SSE 客户端断开,sessionId: {}", finalSessionId))
.doOnComplete(() -> log.info("SSE 流完成,sessionId: {}", finalSessionId))
.onErrorResume(SessionNotFoundException.class, e -> {
log.warn("会话不存在,返回错误事件");
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("{\"code\":\"SESSION_NOT_FOUND\",\"message\":\"会话已过期,请重新开始\"}")
.build());
})
.onErrorResume(e -> {
log.error("SSE 流异常", e);
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("{\"code\":\"SERVER_ERROR\",\"message\":\"服务暂时不可用\"}")
.build());
});
}
private int parseLastEventId(String lastEventId) {
try {
return Integer.parseInt(lastEventId);
} catch (NumberFormatException e) {
log.warn("无法解析 lastEventId: {}", lastEventId);
return -1;
}
}
}SSE 连接管理的 Mermaid 图
总结
SSE 在 AI 应用中确实比 WebSocket 简单,但"简单"不等于"不需要考虑细节"。生产级的 SSE 实现需要处理:
- Nginx 超时:调整
proxy_read_timeout,或者加心跳 - 断线重连重复执行:利用
Last-Event-ID实现续传,把 token 缓存到 Redis - 多实例部署:状态外置(Redis),让任意实例都能处理续传
- 心跳保活:将心跳集成到 Flux,与内容流合并
- 连接数限制(HTTP/1.1):考虑迁移到 HTTP/2
这些问题单独处理都不难,但每一个不处理好,生产环境都会出问题。
