第2121篇:LLM流式输出的工程实践——让AI回答"打字机效果"背后的技术细节
第2121篇:LLM流式输出的工程实践——让AI回答"打字机效果"背后的技术细节
适读人群:构建AI对话界面的全栈工程师 | 阅读时长:约20分钟 | 核心价值:掌握SSE流式输出的完整实现,处理背压、断线重连、错误恢复等生产级问题
"为什么我们的AI回答要等30秒才出来,而ChatGPT是一个字一个字地显示?"
这是产品经理在第一次演示内部AI产品时问我的问题。当时我们用的是最简单的同步调用方式:等LLM生成完整答案,再一次性返回给前端。对于短回答还好,一旦遇到需要生成长文档的场景,用户面对的就是白屏等待。
加上流式输出之后,用户体验发生了质变。不仅感知延迟从"首字节"降到了几百毫秒,用户还可以在AI"打字"的过程中就开始阅读,提前判断方向是否正确。
但流式输出的工程实现远比看起来复杂。这篇文章把我踩过的坑都写出来。
流式输出的基本原理
/**
* 为什么流式输出体验更好?
*
* ===== 非流式(批量)模式 =====
*
* 时间轴:
* 0ms ───→ 发送请求
* ... ───→ 等待(用户看白屏)
* 8000ms → 收到完整响应
* 8010ms → 显示给用户
*
* 问题:
* - 用户感知的延迟 = 完整生成时间
* - 长回答越久越痛苦
* - 无法提前中断(已经等这么久了)
*
* ===== 流式模式 =====
*
* 时间轴:
* 0ms ───→ 发送请求
* 300ms ───→ 收到第一个token,立即显示
* 350ms ───→ 更多token,继续显示...
* 8000ms → 流结束
*
* 好处:
* - 用户感知延迟 = 首token时间(300ms vs 8000ms)
* - 用户可以边读边等,认知更流畅
* - 方向不对可以提前中断
*
* ===== 实现技术对比 =====
*
* WebSocket:双向通信,适合需要客户端也发送实时消息的场景
* 复杂,需要维护连接状态
*
* SSE(Server-Sent Events):单向,服务器推送给客户端
* 简单,基于HTTP,天然支持断线重连
* 适合AI对话(客户端发消息→服务器持续推送)
*
* 长轮询:最落后的方案,延迟高,服务器压力大
*
* 结论:AI对话场景用SSE是最自然的选择
*/Spring Boot的SSE流式实现
/**
* AI流式响应控制器
*
* 使用Spring的SseEmitter实现Server-Sent Events
*
* SSE协议格式(标准HTTP文本):
* data: {"type":"token","content":"你"}\n\n
* data: {"type":"token","content":"好"}\n\n
* data: {"type":"done"}\n\n
*/
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
@Slf4j
public class StreamingChatController {
private final StreamingChatService chatService;
/**
* 流式对话接口
*
* 注意:@CrossOrigin对SSE很重要,不然浏览器会拒绝接收
*/
@CrossOrigin
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(@RequestBody ChatRequest request,
HttpServletResponse response) {
// SSE必须禁用缓冲,否则nginx等反向代理会缓存数据
response.setHeader("X-Accel-Buffering", "no");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
// 超时时间设为5分钟(长对话可能需要更长)
SseEmitter emitter = new SseEmitter(5 * 60 * 1000L);
// 在单独的线程里执行流式生成
// 不能在HTTP线程里阻塞,否则会导致线程池耗尽
chatService.streamResponse(request, emitter);
return emitter;
}
/**
* GET方式的流式接口(适合简单场景和调试)
*/
@CrossOrigin
@GetMapping(value = "/stream/simple", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamSimple(@RequestParam String question) {
SseEmitter emitter = new SseEmitter(300_000L);
chatService.streamResponse(
ChatRequest.builder().userMessage(question).build(),
emitter
);
return emitter;
}
@Data
@Builder
public static class ChatRequest {
private String userMessage;
private String conversationId;
private String systemPrompt;
@Builder.Default
private Map<String, Object> metadata = new HashMap<>();
}
}核心流式服务实现
/**
* 流式对话服务
*
* 关键:LangChain4j的StreamingChatLanguageModel接口
* 提供了onNext(String token)回调
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class StreamingChatService {
private final StreamingChatLanguageModel streamingModel;
// 用于异步执行流式生成的线程池
// 不能用公共线程池,防止流式任务占满线程
private final Executor streamingExecutor = Executors.newFixedThreadPool(
50,
new ThreadFactoryBuilder().setNameFormat("sse-stream-%d").build()
);
public void streamResponse(
StreamingChatController.ChatRequest request,
SseEmitter emitter) {
String conversationId = Optional.ofNullable(request.getConversationId())
.orElse(UUID.randomUUID().toString());
// 发送对话ID给客户端(用于后续请求)
sendEvent(emitter, "start", Map.of("conversationId", conversationId));
streamingExecutor.execute(() -> {
try {
// 构建消息列表
List<ChatMessage> messages = buildMessages(request);
// 流式生成的结果收集(用于后续存储)
StringBuilder fullResponse = new StringBuilder();
AtomicInteger tokenCount = new AtomicInteger(0);
// 调用流式模型
streamingModel.generate(messages, new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
// 每收到一个token,立即推送给客户端
fullResponse.append(token);
tokenCount.incrementAndGet();
sendEvent(emitter, "token", Map.of("content", token));
}
@Override
public void onComplete(Response<AiMessage> response) {
// 生成完成
log.debug("流式生成完成: conversationId={}, tokens={}",
conversationId, tokenCount.get());
// 发送完成事件(包含token使用量,用于计费)
sendEvent(emitter, "done", Map.of(
"totalTokens", tokenCount.get(),
"finishReason", response.finishReason().toString()
));
emitter.complete();
// 异步保存对话历史(不阻塞流式响应)
saveConversationAsync(conversationId, request, fullResponse.toString());
}
@Override
public void onError(Throwable error) {
log.error("流式生成错误: conversationId={}", conversationId, error);
// 发送错误事件,让客户端知道发生了什么
sendEvent(emitter, "error", Map.of(
"message", sanitizeErrorMessage(error.getMessage())
));
emitter.completeWithError(error);
}
});
} catch (Exception e) {
log.error("流式服务异常: {}", e.getMessage(), e);
sendEvent(emitter, "error", Map.of("message", "服务暂时不可用,请稍后重试"));
emitter.completeWithError(e);
}
});
}
/**
* 发送SSE事件
*
* 标准SSE事件格式:
* event: token\n
* data: {"content":"..."}\n
* \n
*/
private void sendEvent(SseEmitter emitter, String eventType, Object data) {
try {
String json = new ObjectMapper().writeValueAsString(data);
emitter.send(
SseEmitter.event()
.name(eventType)
.data(json, MediaType.APPLICATION_JSON)
);
} catch (IOException e) {
// 客户端断开连接时会抛出IOException
// 这是正常情况(用户关闭页面),不需要打印错误日志
if (!e.getMessage().contains("Broken pipe") &&
!e.getMessage().contains("Connection reset")) {
log.warn("SSE发送失败: eventType={}, error={}", eventType, e.getMessage());
}
}
}
private List<ChatMessage> buildMessages(StreamingChatController.ChatRequest request) {
List<ChatMessage> messages = new ArrayList<>();
if (request.getSystemPrompt() != null) {
messages.add(SystemMessage.from(request.getSystemPrompt()));
}
messages.add(UserMessage.from(request.getUserMessage()));
return messages;
}
private String sanitizeErrorMessage(String message) {
// 不向客户端暴露内部错误细节
if (message == null) return "未知错误";
if (message.contains("API key")) return "认证失败";
if (message.contains("timeout")) return "请求超时,请重试";
if (message.contains("rate limit")) return "请求太频繁,请稍后重试";
return "AI服务暂时不可用";
}
@Async
protected void saveConversationAsync(
String conversationId,
StreamingChatController.ChatRequest request,
String fullResponse) {
// 异步保存对话历史到数据库
// 实现略
}
}背压处理与流量控制
/**
* 背压处理
*
* 问题:LLM生成token的速度可能超过客户端消费速度
* 或者服务端生成太快,队列堆积
*
* 这在实际生产中不常发生(因为网络通常是瓶颈),
* 但在高并发场景需要考虑
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class BackpressureAwareStreamingService {
private final StreamingChatLanguageModel streamingModel;
// 每个连接的token队列(用于流量控制)
private final ConcurrentHashMap<String, BlockingQueue<StreamEvent>> connectionQueues
= new ConcurrentHashMap<>();
/**
* 带背压控制的流式输出
*
* 使用生产者-消费者模型:
* - LLM callback 作为生产者,把token放入队列
* - 推送线程作为消费者,从队列取token发给客户端
*
* 这样可以:
* 1. 控制推送速率(避免客户端被淹没)
* 2. 在连接断开时优雅停止生成
*/
public void streamWithBackpressure(
String conversationId,
List<ChatMessage> messages,
SseEmitter emitter) {
// 每个连接最多缓冲100个token
BlockingQueue<StreamEvent> queue = new LinkedBlockingQueue<>(100);
connectionQueues.put(conversationId, queue);
// 生产者:LLM生成线程
CompletableFuture.runAsync(() -> {
streamingModel.generate(messages, new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
try {
// 如果队列满了,等待(实现背压)
// 超时说明客户端消费太慢或连接断了
boolean offered = queue.offer(
new StreamEvent("token", token),
2, TimeUnit.SECONDS
);
if (!offered) {
log.warn("背压触发,客户端消费太慢: conversationId={}", conversationId);
// 可以选择丢弃或者抛出异常中止生成
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void onComplete(Response<AiMessage> response) {
queue.offer(new StreamEvent("done", ""));
}
@Override
public void onError(Throwable error) {
queue.offer(new StreamEvent("error", error.getMessage()));
}
});
});
// 消费者:推送线程
CompletableFuture.runAsync(() -> {
try {
while (true) {
// 从队列取事件,超时说明生成卡住了
StreamEvent event = queue.poll(30, TimeUnit.SECONDS);
if (event == null) {
// 30秒没有新token,认为生成超时
sendSseEvent(emitter, "error", "生成超时");
emitter.complete();
break;
}
if ("done".equals(event.type()) || "error".equals(event.type())) {
sendSseEvent(emitter, event.type(), event.content());
emitter.complete();
break;
}
sendSseEvent(emitter, "token", event.content());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
connectionQueues.remove(conversationId);
}
});
// 客户端断开时,清理队列
emitter.onCompletion(() -> {
connectionQueues.remove(conversationId);
// 通知生产者停止(通过往队列放一个特殊事件)
BlockingQueue<StreamEvent> q = connectionQueues.get(conversationId);
if (q != null) q.offer(new StreamEvent("cancel", ""));
});
}
private void sendSseEvent(SseEmitter emitter, String type, String content) {
try {
Map<String, String> data = Map.of("type", type, "content", content);
emitter.send(SseEmitter.event()
.name(type)
.data(new ObjectMapper().writeValueAsString(data)));
} catch (IOException ignored) {}
}
record StreamEvent(String type, String content) {}
}断线重连与续传
/**
* 断线重连与流式续传
*
* 场景:用户网络抖动,SSE连接断开
* 如果没有续传,用户需要重新发问题,重新等待完整生成
*
* SSE协议内置了Last-Event-ID机制,但需要服务端支持
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ResumableStreamingService {
// 缓存进行中的流式响应(用于续传)
// Key: conversationId, Value: 已生成的内容
private final Cache<String, StreamingSession> activeSessions;
public ResumableStreamingService() {
this.activeSessions = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(Duration.ofMinutes(10)) // 10分钟内可以续传
.build();
}
/**
* 支持续传的流式接口
*
* @param lastEventId 客户端上次收到的最后一个事件ID(HTTP Header: Last-Event-ID)
* 如果是新连接,lastEventId为null
*/
public void streamWithResume(
String conversationId,
String lastEventId,
List<ChatMessage> messages,
SseEmitter emitter,
StreamingChatLanguageModel model) {
// 检查是否有可续传的会话
StreamingSession existingSession = activeSessions.getIfPresent(conversationId);
if (existingSession != null && lastEventId != null) {
// 有续传请求:先把已生成的内容重新推送给客户端
int lastTokenIndex = parseLastTokenIndex(lastEventId);
String alreadyGenerated = existingSession.getContentUpTo(lastTokenIndex);
log.info("流式续传: conversationId={}, resumeFrom={}", conversationId, lastTokenIndex);
// 一次性推送已缓存的内容(前面的部分)
sendEvent(emitter, "resume", Map.of(
"content", alreadyGenerated,
"resumeFrom", lastTokenIndex
));
// 如果生成还没结束,继续监听后续token
if (!existingSession.isCompleted()) {
existingSession.addEmitter(emitter);
return;
}
// 生成已完成,直接结束
sendEvent(emitter, "done", Map.of("totalContent", alreadyGenerated));
emitter.complete();
return;
}
// 新会话:创建StreamingSession来跟踪生成进度
StreamingSession session = new StreamingSession(conversationId);
activeSessions.put(conversationId, session);
session.addEmitter(emitter);
AtomicInteger tokenIndex = new AtomicInteger(0);
model.generate(messages, new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
int index = tokenIndex.getAndIncrement();
// 缓存token
session.appendToken(token);
// 推送给所有连接的emitter(包括可能的续传连接)
String eventId = "token-" + index; // 用于Last-Event-ID
session.broadcastToEmitters(emitter -> {
try {
emitter.send(
SseEmitter.event()
.id(eventId) // 设置Event ID,浏览器会记住
.name("token")
.data(Map.of("content", token, "index", index))
);
} catch (IOException e) {
// 连接断开,从广播列表移除
session.removeEmitter(emitter);
}
});
}
@Override
public void onComplete(Response<AiMessage> response) {
session.markCompleted();
session.broadcastToEmitters(emitter -> {
try {
emitter.send(SseEmitter.event().name("done").data("{}"));
emitter.complete();
} catch (IOException ignored) {}
});
}
@Override
public void onError(Throwable error) {
session.markCompleted();
session.broadcastToEmitters(emitter -> {
try {
emitter.send(SseEmitter.event().name("error")
.data(Map.of("message", "生成失败")));
emitter.complete();
} catch (IOException ignored) {}
});
}
});
}
private void sendEvent(SseEmitter emitter, String type, Object data) {
try {
emitter.send(SseEmitter.event().name(type)
.data(new ObjectMapper().writeValueAsString(data)));
} catch (IOException ignored) {}
}
private int parseLastTokenIndex(String lastEventId) {
if (lastEventId == null || !lastEventId.startsWith("token-")) return 0;
try {
return Integer.parseInt(lastEventId.substring(6));
} catch (NumberFormatException e) {
return 0;
}
}
/**
* 流式会话状态
*/
public static class StreamingSession {
private final String conversationId;
private final StringBuilder content = new StringBuilder();
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
private volatile boolean completed = false;
public StreamingSession(String conversationId) {
this.conversationId = conversationId;
}
public void appendToken(String token) {
content.append(token);
}
public String getContentUpTo(int tokenIndex) {
// 简化实现:返回完整内容
// 精确实现需要按token边界截断
return content.toString();
}
public void addEmitter(SseEmitter emitter) {
emitters.add(emitter);
emitter.onCompletion(() -> emitters.remove(emitter));
}
public void removeEmitter(SseEmitter emitter) {
emitters.remove(emitter);
}
public void broadcastToEmitters(Consumer<SseEmitter> action) {
emitters.forEach(action);
}
public void markCompleted() {
this.completed = true;
}
public boolean isCompleted() {
return completed;
}
}
}前端消费SSE的最佳实践
/**
* 前端SSE客户端实现要点
*
* 下面用注释描述前端JavaScript应该怎么写
* (实际是JS代码,用注释方式说明关键点)
*
* ===== 基础用法 =====
*
* // GET请求(最简单)
* const evtSource = new EventSource('/api/chat/stream?question=xxx');
* evtSource.addEventListener('token', (e) => {
* const data = JSON.parse(e.data);
* appendToDisplay(data.content); // 追加显示
* });
* evtSource.addEventListener('done', () => evtSource.close());
* evtSource.addEventListener('error', () => evtSource.close());
*
* ===== POST请求(需要fetch + ReadableStream)=====
*
* EventSource只支持GET,POST需要用fetch API手动处理流
*
* const response = await fetch('/api/chat/stream', {
* method: 'POST',
* headers: {'Content-Type': 'application/json'},
* body: JSON.stringify({userMessage: '...'}),
* });
*
* const reader = response.body.getReader();
* const decoder = new TextDecoder();
*
* while (true) {
* const {done, value} = await reader.read();
* if (done) break;
*
* const chunk = decoder.decode(value, {stream: true});
* // 解析SSE格式的文本
* parseSSEChunk(chunk).forEach(event => {
* if (event.type === 'token') appendToDisplay(event.data.content);
* if (event.type === 'done') finishDisplay();
* });
* }
*
* ===== 关键注意点 =====
*
* 1. Markdown渲染时机:
* 不要每个token都重新render整个Markdown
* 先显示原始文本,生成完成后再render
* 或者用增量render(更复杂但体验好)
*
* 2. 滚动行为:
* 自动滚动到底部,但要检测用户是否手动向上滚了
* 如果用户在阅读,不要强制滚到底
*
* 3. 中断生成:
* 前端关闭连接(close/abort),服务端需要检测并停止生成
*/
/**
* 服务端:检测客户端是否断开,并取消LLM生成
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class CancellableStreamingService {
private final StreamingChatLanguageModel streamingModel;
// 用于取消进行中的生成任务
private final Map<String, CompletableFuture<?>> activeTasks = new ConcurrentHashMap<>();
/**
* 可取消的流式生成
*/
public void streamWithCancellation(
String conversationId,
List<ChatMessage> messages,
SseEmitter emitter) {
// 注册取消回调
emitter.onCompletion(() -> {
CompletableFuture<?> task = activeTasks.remove(conversationId);
if (task != null) {
task.cancel(true);
log.debug("客户端断开,取消生成: conversationId={}", conversationId);
}
});
emitter.onTimeout(() -> {
activeTasks.remove(conversationId);
log.warn("SSE连接超时: conversationId={}", conversationId);
});
AtomicBoolean cancelled = new AtomicBoolean(false);
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
streamingModel.generate(messages, new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
if (cancelled.get() || Thread.currentThread().isInterrupted()) {
return; // 已取消,不再推送
}
try {
emitter.send(SseEmitter.event().name("token")
.data(Map.of("content", token)));
} catch (IOException e) {
// 推送失败 = 客户端断开
cancelled.set(true);
}
}
@Override
public void onComplete(Response<AiMessage> response) {
if (!cancelled.get()) {
try {
emitter.send(SseEmitter.event().name("done").data("{}"));
emitter.complete();
} catch (IOException ignored) {}
}
activeTasks.remove(conversationId);
}
@Override
public void onError(Throwable error) {
activeTasks.remove(conversationId);
if (!cancelled.get()) {
try {
emitter.send(SseEmitter.event().name("error")
.data(Map.of("message", "生成失败")));
emitter.complete();
} catch (IOException ignored) {}
}
}
});
});
activeTasks.put(conversationId, task);
}
/**
* 主动取消某个会话的生成(用于"停止生成"按钮)
*/
public void cancelGeneration(String conversationId) {
CompletableFuture<?> task = activeTasks.remove(conversationId);
if (task != null) {
task.cancel(true);
log.info("主动取消生成: conversationId={}", conversationId);
}
}
}Nginx配置与生产部署
/**
* 生产环境的常见坑
*
* ===== Nginx配置坑 =====
*
* 问题1:Nginx默认会缓冲响应,SSE数据被堆积
*
* 解决:在location配置里加:
* proxy_buffering off;
* proxy_cache off;
* proxy_read_timeout 600s; # 长超时,避免长对话被断开
*
* 问题2:HTTP/2下SSE工作正常,HTTP/1.1下可能有问题
* (HTTP/1.1连接数有限制,大量SSE连接会耗尽)
*
* 建议:SSE接口走HTTP/2
*
* ===== 连接数问题 =====
*
* 每个SSE连接保持打开状态,会占用一个线程(Tomcat默认)
* 100个并发用户 = 100个线程一直在"等待"
*
* 解决方案:
* 1. 使用WebFlux(Reactor)替代Servlet,每个连接占用极少资源
* 2. 或者增大Tomcat线程池(治标不治本)
*/
/**
* WebFlux版本的流式实现(推荐用于高并发)
*
* 优点:非阻塞IO,1000个并发连接只用少量线程
*/
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
@Slf4j
public class ReactiveStreamingController {
private final StreamingChatLanguageModel streamingModel;
/**
* 使用Project Reactor的Flux实现流式响应
*
* 每个连接不占用专用线程,系统可以支撑更多并发
*/
@CrossOrigin
@PostMapping(value = "/stream/reactive", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamReactive(
@RequestBody StreamingChatController.ChatRequest request) {
return Flux.create(sink -> {
List<ChatMessage> messages = List.of(
UserMessage.from(request.getUserMessage())
);
streamingModel.generate(messages, new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
if (!sink.isCancelled()) {
sink.next(token);
}
}
@Override
public void onComplete(Response<AiMessage> response) {
sink.complete();
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
});
// 客户端断开时取消(背压传播)
sink.onCancel(() -> log.debug("客户端取消了流式请求"));
})
.map(token -> ServerSentEvent.<String>builder()
.event("token")
.data(token)
.build())
.onErrorResume(e -> Flux.just(
ServerSentEvent.<String>builder()
.event("error")
.data("{\"message\":\"生成失败\"}")
.build()
));
}
}实践建议
先解决Nginx缓冲问题,再想其他
我遇到过很多次:流式接口明明在本地测试好好的,部署到生产后又变成"等待完整响应"。几乎每次都是Nginx缓冲的问题。养成习惯:SSE/流式接口上线前,检查Nginx配置是否有proxy_buffering off,检查响应Header里是否有X-Accel-Buffering: no。这两行配置能解决80%的"流式不流"问题。
高并发场景优先考虑WebFlux,而不是调大Tomcat线程池
每个SSE连接如果占用一个线程,1000个并发用户就是1000个线程。Java线程不便宜,每个线程默认1MB栈,1000个线程就是1GB内存只用来"等待"。WebFlux基于Netty的非阻塞IO,同样的机器能支撑10倍以上的并发连接。如果你的AI产品用户量增长,迁移到WebFlux是值得的投资——代价是代码风格从命令式变成响应式,有一定学习曲线。
Token积累与UI渲染要分离
前端最常见的错误:每收到一个token就重新渲染整个Markdown。如果回答是1000个token,就会触发1000次重新渲染,低端设备上会明显卡顿。正确做法是:原始文本累积,用requestAnimationFrame批量更新DOM,完成后再做完整的Markdown渲染。或者直接用成熟的流式Markdown渲染库(如marked的流式版本)。
