第2059篇:AI应用流式响应——SSE和WebSocket的完整实现
2026/4/30大约 5 分钟
第2059篇:AI应用流式响应——SSE和WebSocket的完整实现
适读人群:需要实现AI流式输出的Java工程师 | 阅读时长:约19分钟 | 核心价值:掌握SSE和WebSocket两种流式响应方案,以及背压、断连重试等生产级细节
做AI应用的时候,有个用户体验问题绕不开:LLM生成回答通常需要5-30秒,如果等完了再返回,用户看到的是长时间白屏,体验很差。
流式输出解决这个问题——LLM生成多少就传多少,用户实时看到文字逐渐出现,就像有人在实时打字一样。
两种流式方案的选择
对于大多数AI聊天场景,SSE就够了——用户提问后等AI回答,没有真正的双向实时通信需求。
SSE实现
Spring AI的流式SSE
/**
* Spring AI + SSE 流式响应
*/
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/chat")
public class StreamingChatController {
private final ChatClient chatClient;
/**
* 标准SSE流式接口
* 前端用EventSource连接
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(
@RequestParam String message,
@RequestParam(required = false) String sessionId) {
String sid = sessionId != null ? sessionId : UUID.randomUUID().toString();
return chatClient.prompt()
.system("你是一个智能助手")
.user(message)
.advisors(a -> a.param(CHAT_MEMORY_CONVERSATION_ID_KEY, sid))
.stream()
.content()
.map(token -> ServerSentEvent.<String>builder()
.data(token)
.build())
.concatWith(
// 流结束时发送结束标记
Flux.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build())
)
.onErrorResume(e -> {
log.error("流式响应错误", e);
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("发生错误:" + e.getMessage())
.build());
});
}
/**
* POST方式的流式接口(支持发送更多参数)
*/
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChatPost(
@RequestBody ChatRequest request) {
return chatClient.prompt()
.system(request.getSystemPrompt())
.user(request.getUserMessage())
.options(ChatOptions.builder()
.temperature(request.getTemperature())
.build())
.stream()
.content()
.map(token -> ServerSentEvent.<String>builder()
.id(String.valueOf(System.currentTimeMillis()))
.data(token)
.build())
.concatWith(Flux.just(
ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()
));
}
@Data
public static class ChatRequest {
private String systemPrompt = "你是一个智能助手";
private String userMessage;
private String sessionId;
private double temperature = 0.7;
}
}LangChain4j的流式SSE
/**
* LangChain4j + SSE 流式响应
*/
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/lc4j/chat")
@Slf4j
public class LangChain4jStreamController {
private final StreamingChatLanguageModel streamingModel;
/**
* LangChain4j的流式输出需要SseEmitter
* 注意:LangChain4j使用回调风格,需要手动管理SseEmitter
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(
@RequestParam String message,
@RequestParam(required = false, defaultValue = "30000") long timeout) {
SseEmitter emitter = new SseEmitter(timeout);
// 设置超时处理
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
emitter.complete();
});
// 设置错误处理
emitter.onError(e -> {
log.error("SSE发送错误", e);
});
// 在新线程中执行LLM调用(避免阻塞Servlet线程)
CompletableFuture.runAsync(() -> {
try {
streamingModel.generate(
List.of(UserMessage.from(message)),
new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
try {
emitter.send(SseEmitter.event()
.data(token)
.build());
} catch (IOException e) {
log.warn("发送token失败(客户端可能已断连)");
emitter.completeWithError(e);
}
}
@Override
public void onComplete(Response<AiMessage> response) {
try {
// 发送结束标记
emitter.send(SseEmitter.event()
.name("done")
.data("[DONE]")
.build());
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
}
@Override
public void onError(Throwable error) {
log.error("LLM流式响应错误", error);
try {
emitter.send(SseEmitter.event()
.name("error")
.data("错误:" + error.getMessage())
.build());
} catch (IOException e) {
// ignore
}
emitter.completeWithError(error);
}
}
);
} catch (Exception e) {
log.error("启动流式响应失败", e);
emitter.completeWithError(e);
}
});
return emitter;
}
}WebSocket实现
对于需要双向通信(用户中途打断、服务端主动推送)的场景:
/**
* WebSocket的AI流式响应
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class AiWebSocketHandler extends TextWebSocketHandler {
private final ChatLanguageModel chatModel;
private final StreamingChatLanguageModel streamingModel;
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CompletableFuture<?>> activeTasks = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.put(session.getId(), session);
log.info("WebSocket连接建立: {}", session.getId());
sendMessage(session, new WsMessage("connected", "连接成功", null));
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) {
WsRequest request = parseRequest(textMessage.getPayload());
switch (request.action()) {
case "chat" -> handleChatRequest(session, request);
case "cancel" -> cancelCurrentTask(session.getId());
case "ping" -> sendMessage(session, new WsMessage("pong", null, null));
default -> sendMessage(session, new WsMessage("error", "未知操作: " + request.action(), null));
}
}
private void handleChatRequest(WebSocketSession session, WsRequest request) {
String sessionId = session.getId();
// 取消之前的任务(如果有)
cancelCurrentTask(sessionId);
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
try {
// 通知客户端开始流式输出
sendMessage(session, new WsMessage("start", null, null));
AtomicBoolean cancelled = new AtomicBoolean(false);
streamingModel.generate(
List.of(UserMessage.from(request.content())),
new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
if (cancelled.get() || !session.isOpen()) return;
sendMessage(session, new WsMessage("token", token, null));
}
@Override
public void onComplete(Response<AiMessage> response) {
if (!session.isOpen()) return;
sendMessage(session, new WsMessage("complete",
response.content().text(), null));
}
@Override
public void onError(Throwable error) {
log.error("WebSocket LLM错误", error);
sendMessage(session, new WsMessage("error", error.getMessage(), null));
}
}
);
} catch (Exception e) {
if (!Thread.currentThread().isInterrupted()) {
log.error("WebSocket处理错误", e);
sendMessage(session, new WsMessage("error", e.getMessage(), null));
}
}
});
activeTasks.put(sessionId, task);
}
private void cancelCurrentTask(String sessionId) {
CompletableFuture<?> existing = activeTasks.remove(sessionId);
if (existing != null && !existing.isDone()) {
existing.cancel(true);
WebSocketSession session = sessions.get(sessionId);
if (session != null && session.isOpen()) {
sendMessage(session, new WsMessage("cancelled", "已取消", null));
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String sessionId = session.getId();
sessions.remove(sessionId);
cancelCurrentTask(sessionId); // 连接关闭时取消任务
log.info("WebSocket连接关闭: {}, 原因: {}", sessionId, status);
}
private void sendMessage(WebSocketSession session, WsMessage message) {
try {
if (session.isOpen()) {
session.sendMessage(new TextMessage(toJson(message)));
}
} catch (IOException e) {
log.warn("WebSocket发送消息失败: {}", e.getMessage());
}
}
public record WsRequest(String action, String content) {}
public record WsMessage(String type, String content, Object metadata) {}
}
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
private final AiWebSocketHandler wsHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(wsHandler, "/ws/chat")
.setAllowedOrigins("*"); // 生产环境要限制来源
}
}前端集成示例
// SSE前端代码
class AiStreamClient {
// SSE方式
startSSEStream(message, sessionId, onToken, onDone, onError) {
const params = new URLSearchParams({ message, sessionId });
const eventSource = new EventSource(`/api/chat/stream?${params}`);
eventSource.onmessage = (event) => {
const data = event.data;
if (data === '[DONE]') {
eventSource.close();
onDone();
} else {
onToken(data);
}
};
eventSource.addEventListener('error', (event) => {
eventSource.close();
onError(event.data);
});
// 返回关闭函数
return () => eventSource.close();
}
// WebSocket方式(支持取消)
startWSStream(message, onToken, onComplete, onError) {
const ws = new WebSocket('ws://localhost:8080/ws/chat');
ws.onopen = () => {
ws.send(JSON.stringify({ action: 'chat', content: message }));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
switch (msg.type) {
case 'token': onToken(msg.content); break;
case 'complete': onComplete(msg.content); break;
case 'error': onError(msg.content); break;
}
};
// 取消正在生成的回答
const cancel = () => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ action: 'cancel' }));
}
};
return { cancel, close: () => ws.close() };
}
}SSE和WebSocket各有适用场景,选SSE还是WebSocket的关键问题:用户需要随时打断或发送新消息吗? 如果需要,用WebSocket;如果是等待完整回答再交互,SSE更简单。
