Python 流式 AI 响应实战——SSE、WebSocket 与前端的流式对接方案
Python 流式 AI 响应实战——SSE、WebSocket 与前端的流式对接方案
适读人群:做 AI 对话产品的全栈工程师 | 阅读时长:约17分钟 | 核心价值:完整实现从 LLM 流式输出到前端打字机效果的全链路方案,包括 FastAPI SSE、WebSocket 两种架构
小施是个做了六年的全栈工程师,去年给公司做了个 AI 聊天界面。功能通了,但有个体验问题:用户点了发送,界面会"转圈圈"大概3-5秒,然后一下子把全部回答刷出来。他明知道 ChatGPT 有打字机效果,但不知道怎么实现。
他以为是前端的问题,但前端同事说"你后端给我的就是一次性返回,我怎么做流式"。他才意识到是后端架构的问题。
他来找我说:"老张,我知道要用流式输出,但不清楚整个链路怎么搭——LLM 流式、后端流式、前端接收,这三段要怎么连起来?"
这个问题问得很好,而且非常有代表性。流式对话的关键不是某一个环节,而是整个链路的流式化:LLM 流式生成 → 后端流式转发 → 前端流式渲染,任何一段变成了"缓冲后一次性发送",打字机效果就失效了。
今天我们把整个链路讲透,给你两套方案:SSE 方案(简单)和 WebSocket 方案(双向通信)。
一、方案对比
| 方案 | 特点 | 适用场景 |
|---|---|---|
| SSE(Server-Sent Events) | 服务器→客户端单向,HTTP 协议,简单可靠 | AI 对话,内容生成 |
| WebSocket | 双向通信,低延迟,需要维护连接 | 实时协作,需要从前端中断生成 |
大多数 AI 对话场景,SSE 更合适,实现简单,兼容性好,不需要维护 WebSocket 连接。
二、FastAPI + SSE 完整实现
2.1 后端 SSE 服务
import asyncio
import json
import time
import uuid
from typing import AsyncGenerator, Optional
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
# 允许前端跨域
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
aclient = AsyncOpenAI()
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
model: str = "gpt-3.5-turbo"
system_prompt: str = "你是一个专业的 AI 助手"
def format_sse_event(data: dict, event_type: str = "message") -> str:
"""
格式化 SSE 事件
SSE 格式:每条消息以 'data: ' 开头,以 '\n\n' 结尾
可选 event 字段用于区分不同类型的消息
"""
json_data = json.dumps(data, ensure_ascii=False)
return f"event: {event_type}\ndata: {json_data}\n\n"
async def generate_ai_stream(
request: ChatRequest,
request_id: str
) -> AsyncGenerator[str, None]:
"""
异步生成器:流式调用 OpenAI,转发给前端
"""
start_time = time.time()
total_tokens = 0
try:
# 发送开始信号
yield format_sse_event({
"type": "start",
"request_id": request_id,
"timestamp": start_time
}, event_type="start")
# 调用 OpenAI 流式接口
stream = await aclient.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.message}
],
stream=True,
temperature=0.7,
max_tokens=2048
)
# 转发流式内容
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
total_tokens += 1
yield format_sse_event({
"type": "text",
"content": delta.content,
"request_id": request_id
}, event_type="message")
# 不要 sleep,让数据尽快发送
# 检查是否结束
if chunk.choices[0].finish_reason is not None:
break
# 发送完成信号(包含统计信息)
elapsed = time.time() - start_time
yield format_sse_event({
"type": "done",
"request_id": request_id,
"stats": {
"tokens_generated": total_tokens,
"elapsed_seconds": round(elapsed, 2),
"tokens_per_second": round(total_tokens / elapsed, 1) if elapsed > 0 else 0
}
}, event_type="done")
except asyncio.CancelledError:
# 客户端断开连接
yield format_sse_event({
"type": "cancelled",
"request_id": request_id
}, event_type="error")
except Exception as e:
# 其他错误
yield format_sse_event({
"type": "error",
"message": str(e),
"request_id": request_id
}, event_type="error")
@app.post("/api/chat/stream")
async def chat_stream(chat_request: ChatRequest, request: Request):
"""流式聊天接口"""
request_id = str(uuid.uuid4())
async def event_generator():
async for event in generate_ai_stream(chat_request, request_id):
# 检查客户端是否还在连接
if await request.is_disconnected():
break
yield event
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 关键:禁用 Nginx 缓冲
"Content-Type": "text/event-stream; charset=utf-8",
"Access-Control-Allow-Origin": "*",
}
)
# 也保留非流式接口(用于不需要流式的场景)
@app.post("/api/chat")
async def chat(chat_request: ChatRequest):
response = await aclient.chat.completions.create(
model=chat_request.model,
messages=[
{"role": "system", "content": chat_request.system_prompt},
{"role": "user", "content": chat_request.message}
],
temperature=0.7
)
return {"response": response.choices[0].message.content}2.2 前端 HTML 完整示例
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>AI 对话</title>
<style>
body { font-family: "PingFang SC", "Microsoft YaHei", sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; }
#chat-box { border: 1px solid #ddd; border-radius: 8px; padding: 20px; min-height: 400px; margin-bottom: 20px; overflow-y: auto; }
.message { margin: 10px 0; padding: 10px 16px; border-radius: 12px; max-width: 80%; }
.user { background: #4527A0; color: white; margin-left: auto; }
.ai { background: #F0E6FF; color: #333; }
#input-area { display: flex; gap: 10px; }
#message-input { flex: 1; padding: 12px; border: 1px solid #ddd; border-radius: 8px; font-size: 15px; }
#send-btn { padding: 12px 24px; background: #4527A0; color: white; border: none; border-radius: 8px; cursor: pointer; font-size: 15px; }
#send-btn:disabled { background: #aaa; cursor: not-allowed; }
.cursor { display: inline-block; animation: blink 0.8s step-end infinite; }
@keyframes blink { 0%, 100% { opacity: 1; } 50% { opacity: 0; } }
#stats { color: #999; font-size: 12px; margin-top: 8px; }
</style>
</head>
<body>
<h2>老张聊AI转型 · 流式对话 Demo</h2>
<div id="chat-box"></div>
<div id="input-area">
<input id="message-input" type="text" placeholder="输入消息..." />
<button id="send-btn" onclick="sendMessage()">发送</button>
</div>
<div id="stats"></div>
<script>
const API_URL = "http://localhost:8000/api/chat/stream";
let currentEventSource = null;
function addMessage(role, content) {
const box = document.getElementById("chat-box");
const div = document.createElement("div");
div.className = `message ${role}`;
div.id = role === "ai" ? "current-ai-message" : "";
div.textContent = content;
box.appendChild(div);
box.scrollTop = box.scrollHeight;
return div;
}
async function sendMessage() {
const input = document.getElementById("message-input");
const sendBtn = document.getElementById("send-btn");
const message = input.value.trim();
if (!message) return;
// 清空输入框,禁用按钮
input.value = "";
sendBtn.disabled = true;
sendBtn.textContent = "生成中...";
// 添加用户消息
addMessage("user", message);
// 创建 AI 回答的占位区域
const aiDiv = addMessage("ai", "");
const cursor = document.createElement("span");
cursor.className = "cursor";
cursor.textContent = "▌";
aiDiv.appendChild(cursor);
let fullText = "";
const startTime = Date.now();
try {
// 使用 fetch + ReadableStream 处理 SSE
const response = await fetch(API_URL, {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({
message: message,
model: "gpt-3.5-turbo"
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
try {
const data = JSON.parse(line.slice(6));
if (data.type === "text") {
fullText += data.content;
// 更新显示(去掉光标,更新文字,再加光标)
aiDiv.textContent = fullText;
aiDiv.appendChild(cursor);
// 滚动到底部
document.getElementById("chat-box").scrollTop =
document.getElementById("chat-box").scrollHeight;
}
if (data.type === "done") {
// 移除光标
cursor.remove();
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
document.getElementById("stats").textContent =
`生成了 ${data.stats?.tokens_generated || "?"} 个token,耗时 ${elapsed}s,速度 ${data.stats?.tokens_per_second || "?"} tokens/s`;
}
} catch (e) {
// 解析失败的行跳过
}
}
}
}
} catch (error) {
cursor.remove();
aiDiv.textContent = fullText || "请求失败,请重试";
console.error("Stream error:", error);
} finally {
sendBtn.disabled = false;
sendBtn.textContent = "发送";
// 支持 Enter 发送
document.getElementById("message-input").focus();
}
}
document.getElementById("message-input").addEventListener("keypress", e => {
if (e.key === "Enter") sendMessage();
});
</script>
</body>
</html>三、WebSocket 方案(支持中断生成)
from fastapi import WebSocket, WebSocketDisconnect
import asyncio
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
"""WebSocket 流式聊天——支持客户端主动中断"""
await websocket.accept()
generation_task = None
try:
while True:
# 接收用户消息
data = await websocket.receive_json()
message_type = data.get("type", "message")
if message_type == "cancel":
# 客户端请求中断生成
if generation_task and not generation_task.done():
generation_task.cancel()
await websocket.send_json({"type": "cancelled"})
continue
# 正常消息,启动流式生成
user_message = data.get("message", "")
request_id = data.get("request_id", str(uuid.uuid4()))
async def stream_and_send():
try:
stream = await aclient.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": user_message}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
await websocket.send_json({
"type": "text",
"content": chunk.choices[0].delta.content,
"request_id": request_id
})
await websocket.send_json({"type": "done", "request_id": request_id})
except asyncio.CancelledError:
await websocket.send_json({"type": "cancelled", "request_id": request_id})
except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
# 取消之前的生成任务
if generation_task and not generation_task.done():
generation_task.cancel()
generation_task = asyncio.create_task(stream_and_send())
except WebSocketDisconnect:
if generation_task and not generation_task.done():
generation_task.cancel()四、踩坑实录一:SSE 在生产环境不流式
现象:本地测试 SSE 效果很好,打字机效果正常。部署到生产后(Nginx + Gunicorn),前端又变成"一次性刷出来"了。
原因:Nginx 和 Gunicorn 都有响应缓冲区,会把小的数据包攒在一起批量发送。
解法:
# nginx.conf
location /api/chat/stream {
proxy_pass http://backend;
proxy_buffering off; # 关键:关闭代理缓冲
proxy_cache off;
proxy_read_timeout 300s;
proxy_connect_timeout 30s;
gzip off; # 关闭压缩(压缩是缓冲后处理的)
# SSE 必须的请求头
proxy_set_header Connection "";
proxy_http_version 1.1;
}# Gunicorn 配置
# gunicorn.conf.py
worker_class = "uvicorn.workers.UvicornWorker"
timeout = 300
keepalive = 5
# 启动命令
# gunicorn app:app -c gunicorn.conf.py --bind 0.0.0.0:8000五、踩坑实录二:客户端断开后服务端继续生成
现象:用户关闭浏览器或刷新页面后,服务端 LLM 调用仍然在继续,浪费 Token 和算力。
原因:HTTP SSE 连接断开后,如果后端没有检测到断开,生成任务会继续运行。
解法:
@app.post("/api/chat/stream")
async def chat_stream(chat_request: ChatRequest, request: Request):
async def event_generator():
async for event in generate_ai_stream(chat_request, str(uuid.uuid4())):
# 每次发送前检查连接状态
if await request.is_disconnected():
print("客户端已断开,停止生成")
break
yield event
return StreamingResponse(event_generator(), media_type="text/event-stream", ...)六、踩坑实录三:多个并发 SSE 请求导致事件混乱
现象:多个用户同时在请求,前端收到的 SSE 事件属于不同请求,导致内容混乱。
原因:如果前端没有正确管理 SSE 连接,可能多个连接在同时给同一个 DOM 元素写内容。
解法:
// 前端正确管理 SSE 连接
let currentReader = null; // 全局变量跟踪当前请求
async function sendMessage() {
// 取消之前的请求
if (currentReader) {
await currentReader.cancel();
currentReader = null;
}
const response = await fetch(API_URL, {...});
currentReader = response.body.getReader();
// 每次生成前清理 AI 消息区域
document.getElementById("current-ai-message").textContent = "";
// 读取流...
try {
while (true) {
const { done, value } = await currentReader.read();
if (done) break;
// 处理数据...
}
} finally {
currentReader = null;
}
}流式输出看起来只是个 UI 效果,但背后涉及异步编程、HTTP 协议、Nginx 配置、前端事件处理等多个技术层面。把整个链路打通,用户的 AI 交互体验会有质的提升——从"等待"变成"陪伴",这就是流式输出的价值。
