Spring AI 流式输出实战:打造丝滑的对话体验
2026/7/26大约 7 分钟Spring AI流式输出SSEWebFlux对话体验Java
Spring AI 流式输出实战:打造丝滑的对话体验
一、那个让老板当场皱眉的演示
去年11月,我们团队做了一个内部智能客服的演示。
功能做得很扎实:意图识别、知识库检索、多轮对话都到位了。Demo开始,产品经理打开页面,输入第一个问题,按下回车。
然后,等。
等了5秒,页面还是空白。
又等了3秒,突然刷出来了一大段文字,密密麻麻的300字回答,一下子全出来了。
老板看了看,问了一句:"怎么等那么久,而且是一下子蹦出来的,能不能像ChatGPT那样一个字一个字地打出来?"
产品经理转头看了我一眼。
我知道——是时候搞流式输出了。
这篇文章就是这个经历的产物。从原理到实现,帮你把"等半天一下蹦出来"变成"流畅的打字机效果"。
二、为什么需要流式输出
2.1 传统调用 vs 流式调用
用户体验上,流式输出有两个核心优势:
- 感知延迟更低:用户看到第一个字的时间(TTFT)从5-10秒缩短到0.5-2秒
- 等待焦虑减少:看到内容在逐渐出现,用户知道系统没有卡死
2.2 技术上的两种实现方案对比
| 方案 | 协议 | 适合场景 | 优缺点 |
|---|---|---|---|
| SSE(Server-Sent Events) | HTTP/1.1 | 单向推送,如对话流 | 简单,无需升级协议;单向 |
| WebSocket | WS | 双向实时通信 | 功能强大,但实现复杂 |
对话场景用 SSE 就足够了,Spring AI 的流式输出也是基于 SSE 实现的。
三、Spring AI 流式输出实现
3.1 后端实现
Spring AI 1.0 的流式输出通过 .stream() 方法开启,返回 Flux<String>:
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
public class StreamChatController {
private final ChatClient chatClient;
/**
* 流式对话接口
* 返回 text/event-stream 类型,支持SSE
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(
@RequestParam String message,
@RequestParam(defaultValue = "default") String sessionId) {
return chatClient.prompt()
.user(message)
.advisors(a -> a.param(AbstractChatMemoryAdvisor.CHAT_MEMORY_CONVERSATION_ID_KEY,
sessionId))
.stream()
.content()
// 将每个token包装成SSE事件
.map(token -> ServerSentEvent.<String>builder()
.data(token)
.event("token")
.build())
// 流结束时发送完成信号
.concatWith(Flux.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()))
// 错误处理
.onErrorResume(e -> {
log.error("Stream chat error", e);
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("服务暂时不可用,请稍后重试")
.build());
});
}
/**
* 非SSE版本的流式输出(适合不支持SSE的客户端)
* 直接返回 Flux<String>
*/
@PostMapping(value = "/stream-text",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatText(@RequestBody ChatRequest request) {
return chatClient.prompt()
.system(request.systemPrompt())
.user(request.userMessage())
.stream()
.content();
}
}3.2 带进度控制的流式输出
有时候我们需要在流式输出中插入额外信息(比如"正在检索知识库..."的提示):
@Service
@RequiredArgsConstructor
public class EnhancedStreamChatService {
private final ChatClient chatClient;
private final VectorStore vectorStore;
/**
* 增强流式输出:先显示检索状态,再流式输出AI回答
*/
public Flux<StreamEvent> streamWithStatus(String question, String sessionId) {
return Flux.create(emitter -> {
// 第一步:发送检索开始状态
emitter.next(StreamEvent.status("正在检索知识库..."));
// 第二步:执行RAG检索(同步)
List<Document> docs = vectorStore.similaritySearch(
SearchRequest.query(question).withTopK(3));
emitter.next(StreamEvent.status(
String.format("找到 %d 条相关内容,正在生成回答...", docs.size())));
// 第三步:流式生成回答
String context = docs.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n\n"));
chatClient.prompt()
.system("""
你是一个专业助手。请基于以下参考资料回答问题:
{context}
如果参考资料中没有相关信息,请直接说明。
""".replace("{context}", context))
.user(question)
.stream()
.content()
.subscribe(
token -> emitter.next(StreamEvent.token(token)),
error -> emitter.next(StreamEvent.error(error.getMessage())),
() -> {
emitter.next(StreamEvent.done());
emitter.complete();
}
);
});
}
public record StreamEvent(String type, String content) {
public static StreamEvent token(String content) {
return new StreamEvent("token", content);
}
public static StreamEvent status(String message) {
return new StreamEvent("status", message);
}
public static StreamEvent done() {
return new StreamEvent("done", "");
}
public static StreamEvent error(String message) {
return new StreamEvent("error", message);
}
}
}3.3 前端对接(JavaScript)
// 前端接收SSE流式输出的标准写法
function startStreamChat(message, sessionId) {
const url = `/api/chat/stream?message=${encodeURIComponent(message)}&sessionId=${sessionId}`;
const eventSource = new EventSource(url);
let responseDiv = document.getElementById('response');
responseDiv.innerHTML = ''; // 清空之前的内容
// 接收token事件(正常文字输出)
eventSource.addEventListener('token', (event) => {
responseDiv.innerHTML += event.data;
});
// 接收完成事件
eventSource.addEventListener('done', (event) => {
eventSource.close();
console.log('Stream completed');
});
// 接收状态事件(如"正在检索...")
eventSource.addEventListener('status', (event) => {
let statusDiv = document.getElementById('status');
statusDiv.textContent = event.data;
});
// 错误处理
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
responseDiv.innerHTML += '\n[连接中断,请刷新重试]';
};
}四、流式输出的工程化问题
4.1 流式输出的完整内容如何保存
流式输出期间,内容是一个个token流出来的。但对话结束后,我们往往需要把完整内容存到数据库(用于记录历史、分析等)。
/**
* 流式输出时收集完整内容并异步保存
*/
@Service
@RequiredArgsConstructor
public class StreamWithPersistenceService {
private final ChatClient chatClient;
private final ConversationRepository conversationRepo;
public Flux<String> streamAndSave(String sessionId, String userMessage) {
StringBuilder fullResponse = new StringBuilder();
return chatClient.prompt()
.user(userMessage)
.stream()
.content()
// 收集完整响应
.doOnNext(token -> fullResponse.append(token))
// 流结束时异步保存
.doOnComplete(() -> {
String completeResponse = fullResponse.toString();
// 异步保存,不阻塞流
CompletableFuture.runAsync(() ->
conversationRepo.save(ConversationRecord.builder()
.sessionId(sessionId)
.userMessage(userMessage)
.aiResponse(completeResponse)
.createdAt(LocalDateTime.now())
.build())
).exceptionally(e -> {
log.error("Failed to save conversation", e);
return null;
});
});
}
}4.2 超时与熔断
流式接口比普通接口更容易出现超时问题:
@Configuration
public class StreamChatConfig {
@Bean
public ChatClient streamOptimizedChatClient(ChatModel chatModel) {
return ChatClient.builder(chatModel)
.defaultOptions(OpenAiChatOptions.builder()
.model("gpt-4o")
.temperature(0.7)
// 流式接口适当增加超时,因为生成时间更长
.build())
.build();
}
}@Component
public class StreamTimeoutHandler {
/**
* 给流式输出添加超时控制
* 如果60秒内没有完成,自动中断
*/
public Flux<String> withTimeout(Flux<String> stream) {
return stream
// 单个token的最大等待时间:10秒
.timeout(Duration.ofSeconds(10))
// 整个流的最大时间:60秒
.take(Duration.ofSeconds(60))
.onErrorResume(TimeoutException.class, e ->
Flux.just("\n\n[响应超时,内容已截断]"));
}
}4.3 流式输出的背压处理
如果前端处理速度跟不上后端发送速度,会造成内存积压:
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChatWithBackpressure(
@RequestParam String message) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 使用背压策略:缓冲区满时丢弃最旧的数据
.onBackpressureBuffer(100, dropped ->
log.warn("Token dropped due to backpressure: {}", dropped))
.map(token -> ServerSentEvent.<String>builder()
.data(token)
.build());
}五、生产环境流式输出监控
/**
* 流式输出性能监控
* 追踪TTFT(Time To First Token)和TPS(Tokens Per Second)
*/
@Component
@RequiredArgsConstructor
public class StreamMetricsCollector {
private final MeterRegistry meterRegistry;
public Flux<String> withMetrics(Flux<String> stream, String model) {
AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
AtomicBoolean firstToken = new AtomicBoolean(true);
AtomicLong tokenCount = new AtomicLong(0);
return stream
.doOnNext(token -> {
tokenCount.incrementAndGet();
if (firstToken.compareAndSet(true, false)) {
// 记录TTFT(Time To First Token)
long ttft = System.currentTimeMillis() - startTime.get();
meterRegistry.timer("llm.stream.ttft",
"model", model)
.record(ttft, TimeUnit.MILLISECONDS);
}
})
.doOnComplete(() -> {
long totalTime = System.currentTimeMillis() - startTime.get();
long tokens = tokenCount.get();
double tps = tokens * 1000.0 / totalTime;
// 记录整体TPS
meterRegistry.gauge("llm.stream.tps",
Tags.of("model", model), tps);
log.info("Stream completed: model={}, tokens={}, totalMs={}, tps={}",
model, tokens, totalTime, String.format("%.1f", tps));
});
}
}六、流式输出技术选型全景
七、总结
流式输出是AI对话应用的标配体验,实现起来并不复杂,但有几个关键点:
- Spring AI的
.stream().content()是核心API,返回Flux<String> - 前端用SSE,标准HTML5技术,无需额外库
- 完整内容收集用
doOnNext+ StringBuilder,异步持久化 - 超时控制不能忘,流式接口容易卡住
- TTFT监控是核心指标,目标控制在1-2秒内
把这套做扎实,用户体验能提升一个档次。下一篇我们聊对话记忆管理——如何让AI"记住"之前聊过的内容。
