Streaming 输出的工程实践——看起来简单,实现起来有一堆坑
Streaming 输出的工程实践——看起来简单,实现起来有一堆坑
适读人群:需要在生产系统里实现流式输出的后端/全栈工程师 | 阅读时长:约 14 分钟 | 核心价值:完整的生产级 Streaming 实现方案,覆盖各个环节的真实坑
第一次做 Streaming 的时候,我觉得这玩意儿很简单——不就是用 SSE 推数据吗?我在本地测了半小时,一切正常。然后上线,然后收到用户投诉:「文字一直在转,转了好几分钟没出来。」
查下来,是 Nginx 默认开了响应缓冲(proxy_buffering),把我所有的流式数据都缓存起来等到结束才推。本地开发没有 Nginx,所以没发现。
那是我第一个教训。后来还有很多坑,今天一起写出来。
Streaming 的整体架构
生产环境里的 Streaming 涉及这几个层:
用户浏览器
↕ SSE / WebSocket
前端 JS
↕ HTTP
后端服务(你的应用)
↕ HTTP Streaming
LLM API(OpenAI / Claude / etc.)每一层都有可能出问题。从 LLM API 开始,从后往前讲。
LLM API 层:流式请求的基本写法
Python(OpenAI SDK):
import asyncio
from openai import AsyncOpenAI
from typing import AsyncGenerator
client = AsyncOpenAI()
async def stream_llm_response(
messages: list,
model: str = "gpt-4o",
max_tokens: int = 2048
) -> AsyncGenerator[str, None]:
"""
生成LLM流式输出的异步生成器
每次yield一个token字符串
"""
try:
stream = await client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
# 检测结束原因
if chunk.choices[0].finish_reason == 'length':
# 因为token限制截断,记录日志
import logging
logging.warning(f"Stream截断: model={model}, reason=length")
except Exception as e:
# 不要直接raise,要给调用方一个错误信号
yield f"\n[错误: {str(e)}]"
raisePython(Anthropic SDK):
import anthropic
from typing import AsyncGenerator
client = anthropic.AsyncAnthropic()
async def stream_claude_response(
system_prompt: str,
messages: list,
max_tokens: int = 2048
) -> AsyncGenerator[str, None]:
async with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=max_tokens,
system=system_prompt,
messages=messages
) as stream:
async for text in stream.text_stream:
yield text
# 流结束后可以获取完整信息
final_message = await stream.get_final_message()
# 可以记录usage信息
usage = final_message.usage
log_token_usage(usage.input_tokens, usage.output_tokens)后端服务层:SSE 实现
SSE(Server-Sent Events)是目前最适合 LLM 流式输出的协议——单向推送、HTTP 兼容、自动重连支持。
FastAPI 实现:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json
import asyncio
import time
import uuid
app = FastAPI()
async def generate_sse_stream(
messages: list,
request_id: str,
request: Request
) -> AsyncGenerator[str, None]:
"""
生成SSE格式的流式数据
SSE格式:data: {json}\n\n
"""
start_time = time.time()
token_buffer = []
total_tokens = 0
try:
# 发送开始事件
yield f"data: {json.dumps({'type': 'start', 'request_id': request_id})}\n\n"
async for token in stream_llm_response(messages):
# 检查客户端是否已断开
if await request.is_disconnected():
break
token_buffer.append(token)
total_tokens += 1
# 立即发送每个token
event_data = {
'type': 'token',
'content': token,
'request_id': request_id
}
yield f"data: {json.dumps(event_data, ensure_ascii=False)}\n\n"
# 发送结束事件
end_event = {
'type': 'done',
'request_id': request_id,
'total_tokens': total_tokens,
'latency_ms': int((time.time() - start_time) * 1000)
}
yield f"data: {json.dumps(end_event)}\n\n"
except asyncio.CancelledError:
# 客户端断开导致的取消,正常情况
pass
except Exception as e:
error_event = {
'type': 'error',
'error': str(e),
'request_id': request_id
}
yield f"data: {json.dumps(error_event)}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: Request):
body = await request.json()
messages = body.get('messages', [])
request_id = str(uuid.uuid4())
return StreamingResponse(
generate_sse_stream(messages, request_id, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 关键:告诉Nginx不要缓冲
"Access-Control-Allow-Origin": "*"
}
)关键 Header 说明:
Cache-Control: no-cache:禁止任何缓存Connection: keep-alive:保持长连接X-Accel-Buffering: no:这个最容易忘,告诉 Nginx 关闭代理缓冲。这就是我上线第一天踩的坑
Nginx 配置
即使加了 X-Accel-Buffering: no Header,Nginx 配置里也要做对应设置:
location /chat/stream {
proxy_pass http://backend;
# 关闭代理缓冲,让数据立即推送
proxy_buffering off;
proxy_cache off;
# 增加超时时间(LLM响应可能很长)
proxy_read_timeout 300s;
proxy_send_timeout 300s;
# 关闭gzip压缩(对流式数据无效,反而可能造成缓冲)
gzip off;
# SSE必要的Header
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;
}如果用的是 Nginx 作为反向代理,proxy_buffering off 是必须的,否则你的 SSE 会被完整缓存后再发出去,就是「等了好几分钟一次性出来」的问题。
前端实现:断线重连
前端用 EventSource API 接收 SSE,但原生 EventSource 有一个问题:它不支持 POST 请求,只能 GET。而我们的请求体(messages)通常比较大,GET 不合适。
用 @microsoft/fetch-event-source 库解决这个问题:
import { fetchEventSource } from '@microsoft/fetch-event-source';
class StreamingChat {
constructor(apiUrl) {
this.apiUrl = apiUrl;
this.controller = null;
this.retryCount = 0;
this.maxRetries = 3;
}
async sendMessage(messages, onToken, onDone, onError) {
// 如果有正在进行的请求,先取消
if (this.controller) {
this.controller.abort();
}
this.controller = new AbortController();
let accumulatedText = '';
try {
await fetchEventSource(this.apiUrl + '/chat/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.getAuthToken()}`
},
body: JSON.stringify({ messages }),
signal: this.controller.signal,
onopen: async (response) => {
if (response.ok) {
this.retryCount = 0;
return;
}
// 非200状态码
throw new Error(`HTTP error: ${response.status}`);
},
onmessage: (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'token':
accumulatedText += data.content;
onToken(data.content, accumulatedText);
break;
case 'done':
onDone(accumulatedText, data);
break;
case 'error':
onError(new Error(data.error));
break;
}
},
onerror: (err) => {
// 断线重连逻辑
if (this.retryCount < this.maxRetries) {
this.retryCount++;
const delay = Math.pow(2, this.retryCount) * 1000; // 指数退避
console.log(`连接断开,${delay}ms后重试(第${this.retryCount}次)`);
return delay; // 返回重试延迟(fetchEventSource支持)
}
onError(err);
throw err; // 超过最大重试次数,停止重试
},
openWhenHidden: true // 页面隐藏时也保持连接
});
} catch (err) {
if (err.name === 'AbortError') {
// 用户主动取消,不是错误
return;
}
onError(err);
}
}
cancel() {
if (this.controller) {
this.controller.abort();
this.controller = null;
}
}
getAuthToken() {
return localStorage.getItem('auth_token') || '';
}
}
// 使用示例
const chat = new StreamingChat('https://api.yourapp.com');
chat.sendMessage(
[{ role: 'user', content: '你好' }],
(token, fullText) => {
// 每收到一个token更新UI
document.getElementById('response').textContent = fullText;
},
(fullText, metadata) => {
console.log('完成,总token数:', metadata.total_tokens);
},
(error) => {
console.error('错误:', error);
}
);断线重连的后端支持
光有前端重连还不够,断线重连需要后端支持从中断点继续。这需要流式输出支持 Last-Event-ID:
@app.post("/chat/stream")
async def chat_stream(request: Request):
body = await request.json()
messages = body.get('messages', [])
# 获取断点续传的位置
last_event_id = request.headers.get('Last-Event-ID')
request_id = body.get('request_id') or str(uuid.uuid4())
# 如果有Last-Event-ID,从缓存里找到已生成的内容
cached_content = None
if last_event_id:
cached_content = await get_cached_stream(last_event_id)
return StreamingResponse(
generate_sse_stream_with_resume(
messages, request_id, request, cached_content
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
)
async def generate_sse_stream_with_resume(
messages, request_id, request, cached_content=None
):
"""支持断点续传的流式生成"""
# 如果有缓存内容,先快速重放已有的token
if cached_content:
for i, token in enumerate(cached_content['tokens']):
yield f"id: {request_id}-{i}\n"
yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
# 注意:如果已经完成了,不需要重新调LLM
if cached_content['is_complete']:
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return
# 继续生成新的token...
token_index = len(cached_content['tokens']) if cached_content else 0
async for token in stream_llm_response(messages):
event_id = f"{request_id}-{token_index}"
yield f"id: {event_id}\n"
yield f"data: {json.dumps({'type': 'token', 'content': token}, ensure_ascii=False)}\n\n"
token_index += 1
# 缓存token(用于断线重连)
await cache_stream_token(request_id, token)内存管理:不要无限积累
一个容易忽视的坑:如果你在后端把所有流式 token 都积累在内存里(为了做缓存或统计),长对话会吃掉大量内存。
正确做法:
class StreamCache:
"""流式输出的有界缓存"""
def __init__(self, redis_client, max_tokens_per_stream=4096):
self.redis = redis_client
self.max_tokens = max_tokens_per_stream
async def cache_token(self, request_id: str, token: str):
key = f"stream:{request_id}:tokens"
# 用Redis List存储,自动控制大小
pipe = self.redis.pipeline()
pipe.rpush(key, token)
pipe.ltrim(key, -self.max_tokens, -1) # 只保留最新的max_tokens个
pipe.expire(key, 3600) # 1小时过期
await pipe.execute()
async def get_cached_stream(self, request_id: str) -> dict:
key = f"stream:{request_id}:tokens"
tokens = await self.redis.lrange(key, 0, -1)
is_complete_key = f"stream:{request_id}:complete"
is_complete = await self.redis.exists(is_complete_key)
return {
'tokens': [t.decode() for t in tokens],
'is_complete': bool(is_complete)
}
async def mark_complete(self, request_id: str):
key = f"stream:{request_id}:complete"
await self.redis.setex(key, 3600, '1')错误处理:LLM API 超时的处理
LLM API 有时候会在流式传输中途超时或断开,这种情况要给用户明确的反馈:
import asyncio
async def stream_with_timeout(messages: list, timeout_seconds: int = 60):
"""带超时控制的流式输出"""
try:
async with asyncio.timeout(timeout_seconds):
async for token in stream_llm_response(messages):
yield token
except asyncio.TimeoutError:
yield "\n\n[响应超时,请重试]"
except Exception as e:
yield f"\n\n[发生错误: {type(e).__name__}]"一张坑的清单
做了这么多次 Streaming,把踩过的坑总结一下:
| 坑 | 现象 | 解决方法 |
|---|---|---|
| Nginx proxy_buffering | 全部内容一次性出现 | nginx 配置 proxy_buffering off |
| max_tokens 太小 | 输出被截断 | 根据任务估算合理的 max_tokens |
| 前端 EventSource 不支持 POST | 无法发送复杂请求体 | 用 fetch-event-source 库 |
| 断线没有重连 | 网络抖动时对话中断 | 实现指数退避重连 |
| 内存泄漏 | 服务内存持续上涨 | 用有界缓存,不无限积累 |
| 中文被截断乱码 | 汉字显示为乱码 | SSE 数据用 JSON 包裹而非裸文本 |
| 生产超时 | 长回复时连接被网关断开 | 设置足够的 proxy_read_timeout |
