Go SSE 服务端推送实战——实现流式 AI 响应的完整技术方案
Go SSE 服务端推送实战——实现流式 AI 响应的完整技术方案
适读人群:需要实现 AI 对话流式输出的 Go 工程师 | 阅读时长:约16分钟 | 核心价值:从零实现生产级 SSE 服务,支持 AI 流式响应、断线重连、并发管理
做 AI 对话功能时,流式输出让我踩了很多坑
去年我们团队接了一个 AI 助手项目,要对接 OpenAI API 实现对话功能。最开始用了最简单的方案:等 AI 生成完整回复后一次性返回。
用户体验很差——有时候 AI 要思考十几秒才能生成完整响应,用户面对一个空白屏幕等着,跳出率很高。
产品经理说"能不能像 ChatGPT 那样,字一个一个蹦出来?"
我说可以,然后开始研究 SSE(Server-Sent Events)。
以为很简单,结果踩了好几个坑:Nginx 默认缓冲了响应,导致所有字符批量延迟输出;客户端断开后服务端没有感知,goroutine 泄漏;并发连接数没有限制,某次压测把服务打挂了……
今天把这些经验全部整理出来。
SSE vs WebSocket,选哪个?
很多人不清楚 SSE 和 WebSocket 的区别,我给一个简单判断标准:
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向(服务端 → 客户端) | 双向 |
| 协议 | HTTP | ws:// |
| 断线重连 | 浏览器自动处理 | 需要自己实现 |
| 适用场景 | AI 流式输出、通知推送 | 在线聊天、游戏 |
| 实现复杂度 | 简单 | 复杂 |
AI 对话的流式输出是单向的(服务端推给客户端),用 SSE 完全够用,比 WebSocket 简单很多。
SSE 协议格式
SSE 只是在 HTTP 响应里约定了一个特殊的格式:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: 第一条消息\n\n
data: 第二条消息\n\n
id: 3
event: custom-event
data: {"key": "value"}\n\n规则:
- 每个字段一行,格式为
field: value - 一条消息由多行组成,消息之间用两个换行符(
\n\n)分隔 data:消息内容id:消息 ID(用于断线重连时告诉服务端从哪里继续)event:自定义事件类型(默认是message)retry:建议客户端重连等待时间(毫秒)
完整 SSE 服务端实现
package sse
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
)
// Event SSE 事件
type Event struct {
ID string
Event string
Data string
Retry int // 毫秒,建议重连时间
}
func (e Event) Format() string {
var s string
if e.Retry > 0 {
s += fmt.Sprintf("retry: %d\n", e.Retry)
}
if e.ID != "" {
s += fmt.Sprintf("id: %s\n", e.ID)
}
if e.Event != "" {
s += fmt.Sprintf("event: %s\n", e.Event)
}
if e.Data != "" {
s += fmt.Sprintf("data: %s\n", e.Data)
}
s += "\n" // 消息结束标志
return s
}
// AIStreamHandler 处理 AI 流式响应的 SSE handler
type AIStreamHandler struct {
aiClient AIClient // AI 服务客户端接口
maxConns int64 // 最大并发连接数
activeConns int64 // 当前活跃连接数
}
type AIClient interface {
Stream(ctx context.Context, prompt string) (<-chan string, error)
}
func NewAIStreamHandler(client AIClient, maxConns int64) *AIStreamHandler {
return &AIStreamHandler{
aiClient: client,
maxConns: maxConns,
}
}
// ChatRequest 聊天请求体
type ChatRequest struct {
Prompt string `json:"prompt"`
UserID string `json:"user_id"`
}
func (h *AIStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 检查并发连接数限制
current := atomic.AddInt64(&h.activeConns, 1)
defer atomic.AddInt64(&h.activeConns, -1)
if current > h.maxConns {
http.Error(w, "服务繁忙,请稍后重试", http.StatusTooManyRequests)
return
}
// 解析请求
var req ChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "请求格式错误", http.StatusBadRequest)
return
}
if req.Prompt == "" {
http.Error(w, "prompt 不能为空", http.StatusBadRequest)
return
}
// 设置 SSE 响应头
// 关键:必须设置这三个 Header
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// 允许跨域(按需配置)
w.Header().Set("Access-Control-Allow-Origin", "*")
// 告诉 Nginx/代理服务器不要缓冲
w.Header().Set("X-Accel-Buffering", "no")
// 获取 Flusher(用于强制刷新缓冲区,立即发送数据)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "服务器不支持 SSE", http.StatusInternalServerError)
return
}
// 检测客户端断开
ctx := r.Context()
// 发送初始事件(告诉客户端建立成功)
fmt.Fprintf(w, Event{
Event: "connected",
Data: `{"status":"connected"}`,
}.Format())
flusher.Flush()
// 调用 AI 服务,获取流式响应
tokenCh, err := h.aiClient.Stream(ctx, req.Prompt)
if err != nil {
sendError(w, flusher, "AI 服务调用失败: "+err.Error())
return
}
// 流式发送 AI 响应
var fullContent string
tokenCount := 0
for {
select {
case <-ctx.Done():
// 客户端断开了,停止生成
fmt.Printf("客户端 %s 断开连接,已生成 %d 个 token\n",
req.UserID, tokenCount)
return
case token, ok := <-tokenCh:
if !ok {
// AI 响应完成
sendDone(w, flusher, fullContent)
return
}
fullContent += token
tokenCount++
// 发送 token
event := Event{
ID: fmt.Sprintf("%d", tokenCount),
Event: "token",
Data: escapeJSON(token),
}
fmt.Fprint(w, event.Format())
flusher.Flush() // 立即发送!这是流式的关键
}
}
}
func sendError(w http.ResponseWriter, flusher http.Flusher, msg string) {
data, _ := json.Marshal(map[string]string{"error": msg})
fmt.Fprint(w, Event{Event: "error", Data: string(data)}.Format())
flusher.Flush()
}
func sendDone(w http.ResponseWriter, flusher http.Flusher, fullContent string) {
data, _ := json.Marshal(map[string]string{"status": "done", "content": fullContent})
fmt.Fprint(w, Event{Event: "done", Data: string(data)}.Format())
flusher.Flush()
}
func escapeJSON(s string) string {
data, _ := json.Marshal(s)
return string(data)
}
// SSEBroadcaster 广播型 SSE(推送给所有连接的客户端)
type SSEBroadcaster struct {
mu sync.RWMutex
clients map[string]chan Event // clientID -> event channel
}
func NewSSEBroadcaster() *SSEBroadcaster {
return &SSEBroadcaster{
clients: make(map[string]chan Event),
}
}
func (b *SSEBroadcaster) Subscribe(w http.ResponseWriter, r *http.Request, clientID string) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "不支持 SSE", http.StatusInternalServerError)
return
}
// 为这个客户端创建 channel
ch := make(chan Event, 100)
b.mu.Lock()
b.clients[clientID] = ch
b.mu.Unlock()
defer func() {
b.mu.Lock()
delete(b.clients, clientID)
b.mu.Unlock()
close(ch)
}()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return // 客户端断开
case event, ok := <-ch:
if !ok {
return
}
fmt.Fprint(w, event.Format())
flusher.Flush()
case <-time.After(30 * time.Second):
// 发送心跳,保持连接活跃
fmt.Fprint(w, ": heartbeat\n\n")
flusher.Flush()
}
}
}
func (b *SSEBroadcaster) Broadcast(event Event) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, ch := range b.clients {
select {
case ch <- event:
default:
// 客户端消费太慢,跳过(或者可以断开连接)
}
}
}
func (b *SSEBroadcaster) ClientCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.clients)
}Nginx 配置(关键!)
server {
location /api/chat/stream {
proxy_pass http://backend;
proxy_http_version 1.1;
# 关闭 Nginx 的响应缓冲,否则 SSE 数据会被批量延迟发送
proxy_buffering off;
proxy_cache off;
# 这两个 Header 告诉 Nginx 不要缓冲
proxy_set_header X-Accel-Buffering "no";
# SSE 连接可能持续很长时间
proxy_read_timeout 600s;
proxy_send_timeout 600s;
# 保持连接
proxy_set_header Connection '';
chunked_transfer_encoding on;
}
}前端使用示例
// 方式一:原生 EventSource(自动断线重连)
const source = new EventSource('/api/chat/stream?prompt=xxx');
source.addEventListener('token', (e) => {
document.getElementById('output').textContent += JSON.parse(e.data);
});
source.addEventListener('done', (e) => {
console.log('AI 响应完成');
source.close();
});
source.addEventListener('error', (e) => {
console.error('SSE 错误', e);
source.close();
});
// 方式二:fetch + ReadableStream(支持 POST 请求)
async function streamChat(prompt) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留最后一个未完成的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
console.log('收到:', JSON.parse(data));
}
}
}
}三个踩坑实录
坑一:Nginx 缓冲导致字符批量延迟输出
现象:Go 服务里每个 token 都调用了 flusher.Flush(),但客户端收到的还是批量的字符,延迟明显。
原因:Nginx 默认开启了 proxy_buffering,会把后端的响应积攒到一定量再一次性发给客户端。
解法:在 Nginx 配置里设置 proxy_buffering off,或者在响应 Header 里加 X-Accel-Buffering: no(Nginx 识别这个 Header 后会自动关闭该请求的缓冲)。
坑二:客户端断开后服务端 goroutine 泄漏
现象:压测时发现 goroutine 数量持续增长,每个中途断开的客户端都会留下一个卡住的 goroutine。
原因:AI 客户端的 Stream 函数里,当客户端断开后,生产端(AI API 调用)还在不断往 channel 里写 token,而消费端(SSE handler)因为 ctx.Done() 已经退出了。但如果生产端没有监听 ctx,channel 会满了,生产 goroutine 阻塞。
解法:在 AI 客户端的实现里,必须监听 ctx 取消信号,一旦 ctx 取消就停止生产:
func (c *AIClientImpl) Stream(ctx context.Context, prompt string) (<-chan string, error) {
ch := make(chan string, 10)
go func() {
defer close(ch)
// 模拟 AI API 调用
for _, token := range generateTokens(prompt) {
select {
case <-ctx.Done():
return // ctx 取消,立即退出
case ch <- token:
}
}
}()
return ch, nil
}坑三:心跳没做,移动端长时间没有消息导致连接被断开
现象:用户打开 AI 对话页,有时候等了很久没有回复(AI 在思考),连接直接断了,显示"请求超时"。
原因:某些移动网络的中间件(NAT、CDN、运营商设备)会断开一段时间没有数据的连接。
解法:定期发送 SSE 心跳注释行(以冒号开头的行是注释,客户端忽略但连接保持活跃):
// 每 15 秒发送一次心跳
case <-time.After(15 * time.Second):
fmt.Fprint(w, ": heartbeat\n\n")
flusher.Flush()Java 对比
Java 里实现 SSE 有 Spring WebFlux 的 ServerSentEvent<T> 类,或者 Spring MVC 的 SseEmitter,都是成熟方案。Spring WebFlux 的响应式编程模型和 SSE 的流式特性非常契合。
Go 里手动处理 SSE 字节流,代码量更多,但也更透明——你清楚地知道每一个字节是怎么发出去的。在调试 Nginx 缓冲问题、心跳逻辑时,这种透明度很有帮助。
小结
- 必须调
flusher.Flush():不 Flush 数据不会立即发出,流式输出就是个摆设 - Nginx 必须关闭缓冲:
proxy_buffering off或X-Accel-Buffering: no - 监听
ctx.Done()防止 goroutine 泄漏:客户端断开要立即停止生产 - 发心跳保持连接:每 15-30 秒发一次
: heartbeat\n\n - 设置最大并发连接数:防止大量长连接把服务打挂
