第2090篇:LLM流式响应的工程实现——从SSE到前端渲染的完整链路
2026/4/30大约 8 分钟
第2090篇:LLM流式响应的工程实现——从SSE到前端渲染的完整链路
适读人群:正在构建流式AI对话体验的全栈工程师 | 阅读时长:约20分钟 | 核心价值:掌握LLM流式输出的后端实现、SSE推送、前端渲染以及异常处理的完整技术链路
GPT带火了一个交互模式:逐字输出。用户不需要等LLM生成完再看,而是看着文字一个个"打"出来。
这个体验背后涉及多个技术层:LLM的流式API → 后端流处理 → SSE推送 → 前端渲染。每一层都有坑。这篇文章把整条链路讲清楚。
为什么要流式输出
/**
* 流式 vs 非流式的体验差异
*
* 非流式:
* - 用户等待3-8秒(LLM生成完)
* - 文字突然全部出现
* - 首字节时间 = 完整响应时间
*
* 流式:
* - 用户0.5-1秒内看到第一个字
* - 文字逐步显示,感觉更快
* - 即使生成慢,用户也有进度感
*
* 实际上流式不会更快(总token数一样)
* 但用户感知的"速度"会快很多——这就是流式的价值
*/LangChain4j流式实现
/**
* LangChain4j的流式响应
* 两种方式:StreamingChatLanguageModel 和 @AiService流式接口
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class StreamingChatService {
// 流式模型(和普通ChatLanguageModel不同的接口)
private final StreamingChatLanguageModel streamingModel;
/**
* 方式一:使用Callback回调
* 适合需要精细控制的场景
*/
public void streamWithCallback(
String systemPrompt,
String userMessage,
Consumer<String> onToken, // 每个token的回调
Consumer<String> onComplete, // 完成时的回调
Consumer<Throwable> onError) { // 错误回调
streamingModel.generate(
List.of(
SystemMessage.from(systemPrompt),
UserMessage.from(userMessage)
),
new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
try {
onToken.accept(token);
} catch (Exception e) {
log.error("Token处理失败: {}", e.getMessage());
}
}
@Override
public void onComplete(Response<AiMessage> response) {
String fullText = response.content().text();
int totalTokens = response.tokenUsage() != null
? response.tokenUsage().totalTokenCount() : -1;
log.debug("流式生成完成: tokens={}", totalTokens);
onComplete.accept(fullText);
}
@Override
public void onError(Throwable error) {
log.error("流式生成失败: {}", error.getMessage());
onError.accept(error);
}
}
);
}
/**
* 方式二:转换为Project Reactor的Flux
* 适合Spring WebFlux响应式栈
*/
public Flux<String> streamAsFlux(String systemPrompt, String userMessage) {
return Flux.create(sink -> {
streamingModel.generate(
List.of(
SystemMessage.from(systemPrompt),
UserMessage.from(userMessage)
),
new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
if (!sink.isCancelled()) {
sink.next(token);
}
}
@Override
public void onComplete(Response<AiMessage> response) {
sink.complete();
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
}
);
}, FluxSink.OverflowStrategy.BUFFER);
}
}@AiService的流式接口
/**
* 使用@AiService定义流式接口
* 更简洁,LangChain4j自动处理底层细节
*/
@AiService
public interface StreamingAssistant {
/**
* TokenStream是LangChain4j的流式返回类型
* 比Flux更简单,不需要额外配置
*/
@SystemMessage("""
你是一个技术助手,擅长解答Java和AI相关问题。
回答要清晰、有条理,必要时给出代码示例。
""")
TokenStream chat(@MemoryId String sessionId, @UserMessage String message);
}
/**
* TokenStream的使用方式
*/
@RestController
@RequiredArgsConstructor
@Slf4j
public class StreamingChatController {
private final StreamingAssistant assistant;
/**
* 流式接口:返回SSE流
* 前端通过EventSource接收
*/
@GetMapping(value = "/api/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(
@RequestParam String sessionId,
@RequestParam String message) {
SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时
assistant.chat(sessionId, message)
.onNext(token -> {
try {
// 发送token
emitter.send(SseEmitter.event()
.data(token)
.id(String.valueOf(System.currentTimeMillis())));
} catch (IOException e) {
log.warn("SSE发送失败(客户端可能已断开): {}", e.getMessage());
emitter.completeWithError(e);
}
})
.onComplete(response -> {
try {
// 发送完成信号
emitter.send(SseEmitter.event()
.name("done")
.data("[DONE]"));
emitter.complete();
} catch (IOException e) {
emitter.complete();
}
})
.onError(throwable -> {
log.error("流式生成出错: {}", throwable.getMessage());
try {
emitter.send(SseEmitter.event()
.name("error")
.data("生成失败,请重试"));
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(throwable);
}
})
.start();
// 客户端断开时,清理资源
emitter.onCompletion(() -> log.debug("SSE连接完成: sessionId={}", sessionId));
emitter.onTimeout(() -> {
log.warn("SSE超时: sessionId={}", sessionId);
emitter.complete();
});
return emitter;
}
}Spring AI的流式实现
/**
* Spring AI的流式支持
* 使用ChatClient的stream()方法
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SpringAiStreamingService {
private final ChatClient chatClient;
/**
* Spring AI流式调用
* 返回Flux<String>,可以直接在Controller中使用
*/
public Flux<String> stream(String systemPrompt, String userMessage) {
return chatClient.prompt()
.system(systemPrompt)
.user(userMessage)
.stream()
.content();
}
/**
* 带元数据的流式响应(包含token用量等信息)
* ChatResponse比String更丰富,包含完整的响应元数据
*/
public Flux<ChatResponse> streamWithMetadata(String systemPrompt, String userMessage) {
return chatClient.prompt()
.system(systemPrompt)
.user(userMessage)
.stream()
.chatResponse();
}
/**
* WebFlux Controller中直接返回流
*/
@RestController
class SpringAiStreamController {
@GetMapping(value = "/api/spring-ai/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamResponse(
@RequestParam String question) {
return stream(
"你是一个有帮助的助手",
question
)
.map(token -> token) // 可以在这里做token级别的处理
.onErrorReturn("[ERROR] 生成失败,请重试");
}
}
}流式处理的关键挑战
挑战1:中间件超时
/**
* Nginx/网关超时问题处理
*
* 问题:LLM生成慢时,中间的Nginx/API网关可能因为超时断开连接
*
* 解决方案:定期发送心跳,保持连接活跃
*/
@Component
@Slf4j
public class HeartbeatSseEmitter extends SseEmitter {
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> heartbeatTask;
public HeartbeatSseEmitter(long timeout) {
super(timeout);
startHeartbeat();
}
private void startHeartbeat() {
heartbeatTask = scheduler.scheduleAtFixedRate(() -> {
try {
// 每15秒发送一个注释(冒号开头的SSE行,不触发事件,只保活)
send(SseEmitter.event().comment("heartbeat"));
} catch (IOException e) {
// 客户端断开了,取消心跳
stopHeartbeat();
}
}, 15, 15, TimeUnit.SECONDS);
onCompletion(this::stopHeartbeat);
onTimeout(this::stopHeartbeat);
onError(e -> stopHeartbeat());
}
private void stopHeartbeat() {
if (heartbeatTask != null) {
heartbeatTask.cancel(false);
}
scheduler.shutdown();
}
}挑战2:流式内容的安全过滤
/**
* 流式场景下的内容安全过滤
*
* 问题:逐token输出时,无法等完整响应再过滤
* 需要在流式传输过程中做实时过滤
*
* 解决方案:维护一个滑动窗口,检测跨token的敏感内容
*/
@Component
@Slf4j
public class StreamingSafetyFilter {
// 敏感词长度通常不超过10个字符
private static final int WINDOW_SIZE = 20;
private final List<Pattern> sensitivePatterns = List.of(
Pattern.compile("手机号[::][\\d\\-\\s]{8,}"),
Pattern.compile("身份证[::][\\d]{15,18}"),
Pattern.compile("银行卡[::][\\d\\-\\s]{15,}"),
Pattern.compile("密码[::].{3,20}")
);
private StringBuilder buffer = new StringBuilder();
private int sentLength = 0; // 已安全发送的长度
/**
* 处理流入的token
* 返回可以安全发送的内容(可能有延迟)
*/
public Optional<String> processToken(String token) {
buffer.append(token);
String bufferStr = buffer.toString();
int checkEnd = Math.max(0, bufferStr.length() - WINDOW_SIZE);
// 检查已确认安全的部分
String safePrefix = bufferStr.substring(0, checkEnd);
String pendingWindow = bufferStr.substring(checkEnd);
// 在pending窗口中检测敏感内容
boolean hasSensitiveContent = sensitivePatterns.stream()
.anyMatch(p -> p.matcher(pendingWindow).find());
if (hasSensitiveContent) {
log.warn("检测到流式内容中的敏感信息,已脱敏");
// 对敏感内容进行替换
String sanitized = sanitize(pendingWindow);
String result = safePrefix + sanitized;
// 返回处理后的内容
String toSend = result.substring(sentLength);
sentLength = result.length();
return toSend.isEmpty() ? Optional.empty() : Optional.of(toSend);
}
// 没有敏感内容,返回safePrefix中新增的部分
if (safePrefix.length() > sentLength) {
String toSend = safePrefix.substring(sentLength);
sentLength = safePrefix.length();
return Optional.of(toSend);
}
return Optional.empty();
}
/**
* 流式结束时,刷出剩余缓冲
*/
public String flush() {
String remaining = buffer.substring(sentLength);
String sanitized = sanitize(remaining);
sentLength = buffer.length();
return sanitized;
}
private String sanitize(String text) {
String result = text;
for (Pattern pattern : sensitivePatterns) {
result = pattern.matcher(result).replaceAll("[已脱敏]");
}
return result;
}
}挑战3:流式输出的重试和断点续传
/**
* 流式断点续传
*
* 场景:网络抖动导致SSE断开,用户刷新页面需要从上次中断处继续
*
* 实现:缓存已生成的内容,断线重连时从断点继续
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class StreamingSessionManager {
private final RedisTemplate<String, String> redisTemplate;
// 每个会话的缓冲内容
private final Map<String, StringBuilder> sessionBuffers = new ConcurrentHashMap<>();
/**
* 记录已生成的token(用于断点续传)
*/
public void appendToken(String sessionId, String requestId, String token) {
String bufferKey = "streaming:" + sessionId + ":" + requestId + ":buffer";
redisTemplate.opsForValue().append(bufferKey, token);
redisTemplate.expire(bufferKey, Duration.ofMinutes(30));
}
/**
* 获取已生成的内容(断点续传时使用)
*/
public String getGeneratedContent(String sessionId, String requestId) {
String bufferKey = "streaming:" + sessionId + ":" + requestId + ":buffer";
String content = redisTemplate.opsForValue().get(bufferKey);
return content != null ? content : "";
}
/**
* 断线重连接口
* 从指定位置继续发送
*/
@GetMapping(value = "/api/chat/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter resumeStreaming(
@RequestParam String sessionId,
@RequestParam String requestId,
@RequestParam(defaultValue = "0") int fromPosition) {
String previousContent = getGeneratedContent(sessionId, requestId);
SseEmitter emitter = new HeartbeatSseEmitter(120_000L);
// 先发送已生成的内容(从断点位置开始)
if (previousContent.length() > fromPosition) {
String missedContent = previousContent.substring(fromPosition);
try {
emitter.send(SseEmitter.event()
.name("resume")
.data(missedContent));
} catch (IOException e) {
emitter.completeWithError(e);
return emitter;
}
}
// 如果已经完成了,直接发done
String statusKey = "streaming:" + sessionId + ":" + requestId + ":status";
String status = redisTemplate.opsForValue().get(statusKey);
if ("completed".equals(status)) {
try {
emitter.send(SseEmitter.event().name("done").data("[DONE]"));
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
}
return emitter;
}
}前端接收流式响应(JavaScript参考)
/**
* 前端流式接收示例(供参考,配套Java后端使用)
* 使用Fetch API的ReadableStream,比EventSource更灵活
*/
async function streamChat(sessionId, message) {
const response = await fetch('/api/chat/stream?' + new URLSearchParams({
sessionId, message
}), {
signal: AbortSignal.timeout(120000) // 2分钟超时
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let displayText = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
// 解析SSE格式
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 最后一行可能不完整,留在buffer中
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
// 流式完成
console.log('完整响应:', displayText);
return displayText;
}
displayText += data;
updateUI(displayText); // 更新页面显示
} else if (line.startsWith('event: error')) {
console.error('流式错误');
break;
}
}
}
return displayText;
}
function updateUI(text) {
document.getElementById('response').textContent = text;
// 自动滚动到底部
const el = document.getElementById('response');
el.scrollTop = el.scrollHeight;
}性能调优要点
不要每个token都flush
SSE默认每次send()都flush,对于高频token(LLM通常每秒30-50个token),这会导致大量系统调用。解决方案:批量发送,每50ms或每5-10个token批次发送一次:
/**
* Token批量发送器
* 减少SSE的flush频率,降低服务器压力
*/
@Component
@Slf4j
public class TokenBatchSender {
private static final int BATCH_SIZE = 5;
private static final long MAX_DELAY_MS = 50;
private final List<String> tokenBuffer = new ArrayList<>();
private long lastSendTime = System.currentTimeMillis();
private final SseEmitter emitter;
public TokenBatchSender(SseEmitter emitter) {
this.emitter = emitter;
}
public synchronized void addToken(String token) throws IOException {
tokenBuffer.add(token);
long now = System.currentTimeMillis();
boolean shouldFlush = tokenBuffer.size() >= BATCH_SIZE ||
(now - lastSendTime) >= MAX_DELAY_MS;
if (shouldFlush) {
flush();
}
}
public synchronized void flush() throws IOException {
if (!tokenBuffer.isEmpty()) {
String batch = String.join("", tokenBuffer);
emitter.send(SseEmitter.event().data(batch));
tokenBuffer.clear();
lastSendTime = System.currentTimeMillis();
}
}
}流式输出是AI产品体验的关键细节。后端工程师往往觉得"不就是改个API",但从LLM调用到用户看到流畅的打字效果,中间的超时处理、内容安全、断点续传、批量发送,每个环节都有可能成为体验的短板。
