Spring AI流式输出实战:SSE + Reactor响应式编程深度解析
2026/4/30大约 3 分钟
Spring AI流式输出实战:SSE + Reactor响应式编程深度解析
适读人群:想实现ChatGPT式打字机效果的Java工程师
文章价值:流式输出原理 + SSE实现 + WebSocket对比 + 前后端联调
为什么流式输出是AI应用的标配?
用户等待5秒然后看到完整回答 vs 0.5秒看到第一个字然后逐渐显示——这就是流式输出的价值。
流式输出技术栈
SSE实现(推荐方案)
后端实现
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
public class StreamChatController {
private final ChatClient chatClient;
/**
* SSE流式聊天接口
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(@RequestParam String message,
@RequestParam String sessionId) {
return chatClient.prompt()
.user(message)
.stream()
.content()
.map(token -> ServerSentEvent.<String>builder()
.id(UUID.randomUUID().toString())
.event("message")
.data(token)
.build()
)
.concatWith(
// 发送完成信号
Flux.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build())
)
.onErrorMap(e -> {
log.error("流式输出异常", e);
return new RuntimeException("AI服务暂时不可用");
});
}
/**
* 流式输出 + 会话记忆
*/
@GetMapping(value = "/stream/memory", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChatWithMemory(
@RequestParam String message,
@RequestParam String sessionId,
@AuthenticationPrincipal UserDetails user
) {
String fullSessionId = user.getUsername() + ":" + sessionId;
ChatMemory memory = createMemory(fullSessionId);
return chatClient.prompt()
.advisors(new MessageChatMemoryAdvisor(memory))
.user(message)
.stream()
.content()
.map(token -> ServerSentEvent.<String>builder()
.event("message")
.data(token)
.build());
}
}配置优化:支持高并发流式连接
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
// 增大SSE缓冲区
configurer.defaultCodecs().maxInMemorySize(1024 * 1024);
}
}
// application.yml
server:
netty:
connection-timeout: 60s
idle-timeout: 300s
spring:
webflux:
streaming-media-types:
- text/event-stream前端JavaScript实现
// 使用EventSource接收SSE
function streamChat(message, sessionId) {
const outputDiv = document.getElementById('chat-output');
outputDiv.innerHTML = '';
const url = `/api/chat/stream?message=${encodeURIComponent(message)}&sessionId=${sessionId}`;
const eventSource = new EventSource(url);
eventSource.addEventListener('message', (event) => {
if (event.data !== '[DONE]') {
outputDiv.innerHTML += event.data;
// 自动滚动到底部
outputDiv.scrollTop = outputDiv.scrollHeight;
}
});
eventSource.addEventListener('done', () => {
eventSource.close();
console.log('流式输出完成');
});
eventSource.onerror = (error) => {
console.error('SSE连接错误:', error);
eventSource.close();
};
}
// 使用Fetch API(支持POST请求)
async function streamChatWithPost(message, sessionId) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, sessionId })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 解析SSE格式
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data !== '[DONE]') {
outputDiv.innerHTML += data;
}
}
}
}
}流式输出的Backpressure处理
@GetMapping(value = "/stream/controlled", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamWithBackpressure(@RequestParam String message) {
return chatClient.prompt()
.user(message)
.stream()
.content()
// 限制发送速率(防止客户端处理不过来)
.delayElements(Duration.ofMillis(10))
// 超时处理
.timeout(Duration.ofSeconds(60))
// 背压策略:缓冲,溢出时丢弃
.onBackpressureBuffer(100,
dropped -> log.warn("背压:丢弃token: {}", dropped))
.map(token -> ServerSentEvent.<String>builder()
.event("message")
.data(token)
.build());
}SSE vs WebSocket 选型
| 维度 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向(服务端→客户端) | 双向 |
| 协议 | HTTP/HTTPS | ws/wss |
| 复杂度 | 简单 | 较复杂 |
| 适用场景 | AI响应流、通知推送 | 实时聊天、游戏 |
| 浏览器支持 | 优秀 | 优秀 |
| 防火墙穿透 | 容易 | 可能受限 |
AI应用推荐SSE:AI回答是单向推送,SSE更简单、更标准。
流式输出监控
// 监控流式请求的首token延迟(TTFT)
@Aspect
@Component
public class StreamMetricsAspect {
private final MeterRegistry meterRegistry;
@Around("@annotation(StreamEndpoint)")
public Object measureTTFT(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
AtomicBoolean firstToken = new AtomicBoolean(false);
Object result = joinPoint.proceed();
if (result instanceof Flux) {
return ((Flux<?>) result).doOnNext(token -> {
if (firstToken.compareAndSet(false, true)) {
long ttft = System.currentTimeMillis() - startTime;
meterRegistry.timer("ai.stream.ttft").record(ttft, TimeUnit.MILLISECONDS);
log.info("TTFT: {}ms", ttft);
}
});
}
return result;
}
}