Go WebSocket 实战——gorilla/websocket 实现实时消息推送完整方案
Go WebSocket 实战——gorilla/websocket 实现实时消息推送完整方案
适读人群:需要在 Go 服务中实现实时推送的工程师、了解 HTTP 想进阶 WebSocket 的开发者 | 阅读时长:约19分钟 | 核心价值:从握手到心跳到断线重连,构建一套生产可用的 WebSocket 实时推送方案
那个让产品经理每天追着我问的"消息延迟"需求
2023年初,产品经理找到我,说用户反映平台的"实时"通知一点都不实时,有时候延迟几分钟。
我去看了一下代码,好家伙:所谓的"实时通知",是前端每隔30秒轮询一次接口……
我当时就问产品:"你希望延迟多少?"
他说:"最好500毫秒以内。"
轮询肯定不行。SSE 可以考虑但功能有限。最终我选择了 WebSocket——双向通信,延迟低,连接复用好。
但我当时对 Go WebSocket 的理解很浅,只会 demo,不知道生产环境里要考虑哪些东西:连接管理、心跳、广播、断线重连、横向扩展……每一个都是坑。
这篇文章是我那套系统跑了半年后的总结,把所有坑都填上了。
WebSocket 基础和 gorilla/websocket
WebSocket 是基于 HTTP Upgrade 机制建立的双向通信协议。和 HTTP 不同,WebSocket 连接建立后,双方可以随时互相发消息,不需要等请求-响应。
Java 对比: Java 里用 Spring WebSocket 或 javax.websocket,Go 里最成熟的是 gorilla/websocket。
go get github.com/gorilla/websocket核心数据结构设计
package wsserver
import (
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
// WebSocket 消息写超时
writeWait = 10 * time.Second
// 心跳 Pong 超时(超过这个时间没收到 Pong,认为客户端断了)
pongWait = 60 * time.Second
// 心跳 Ping 发送间隔(必须 < pongWait)
pingPeriod = (pongWait * 9) / 10
// 最大消息大小
maxMessageSize = 512 * 1024 // 512KB
)
// Client 代表一个 WebSocket 客户端连接
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte // 待发送消息队列
userID int64
roomID string // 订阅的房间(用于分组推送)
}
// Hub 连接管理中心(核心组件)
type Hub struct {
// 所有已连接的客户端(key: userID, value: client)
clients map[int64]*Client
// 按房间分组(key: roomID, value: clients 集合)
rooms map[string]map[int64]*Client
// 广播消息 channel
broadcast chan *Message
// 定向消息 channel(发给特定用户)
direct chan *DirectMessage
// 注册/注销 channel
register chan *Client
unregister chan *Client
mu sync.RWMutex
}
// Message 广播消息
type Message struct {
RoomID string `json:"room_id,omitempty"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
// DirectMessage 定向消息
type DirectMessage struct {
UserID int64 `json:"user_id"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
func NewHub() *Hub {
return &Hub{
clients: make(map[int64]*Client),
rooms: make(map[string]map[int64]*Client),
broadcast: make(chan *Message, 256),
direct: make(chan *DirectMessage, 256),
register: make(chan *Client),
unregister: make(chan *Client),
}
}Hub 主循环:统一管理所有连接
// Run 启动 Hub 主循环(在 goroutine 里运行)
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client.userID] = client
if client.roomID != "" {
if _, ok := h.rooms[client.roomID]; !ok {
h.rooms[client.roomID] = make(map[int64]*Client)
}
h.rooms[client.roomID][client.userID] = client
}
h.mu.Unlock()
log.Printf("客户端连接: userID=%d, roomID=%s", client.userID, client.roomID)
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client.userID]; ok {
delete(h.clients, client.userID)
if client.roomID != "" {
delete(h.rooms[client.roomID], client.userID)
}
close(client.send) // 关闭 send channel,通知 writePump 退出
}
h.mu.Unlock()
log.Printf("客户端断开: userID=%d", client.userID)
case msg := <-h.broadcast:
h.mu.RLock()
var targets map[int64]*Client
if msg.RoomID != "" {
targets = h.rooms[msg.RoomID]
} else {
targets = h.clients
}
data, _ := json.Marshal(msg)
for _, client := range targets {
select {
case client.send <- data:
default:
// 发送队列满了,说明客户端处理太慢,断开连接
log.Printf("客户端消息队列满,断开: userID=%d", client.userID)
go func(c *Client) { h.unregister <- c }(client)
}
}
h.mu.RUnlock()
case msg := <-h.direct:
h.mu.RLock()
if client, ok := h.clients[msg.UserID]; ok {
data, _ := json.Marshal(msg)
select {
case client.send <- data:
default:
go func(c *Client) { h.unregister <- c }(client)
}
}
h.mu.RUnlock()
}
}
}Client 读写 goroutine
// readPump 持续读取客户端消息
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
// 收到 Pong 时,重置读超时(延长心跳)
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
msgType, data, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("WebSocket 读取错误: userID=%d, err=%v", c.userID, err)
}
return
}
if msgType == websocket.TextMessage {
// 处理客户端发来的消息(比如订阅请求、聊天消息等)
c.handleClientMessage(data)
}
}
}
// writePump 持续向客户端写消息
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case msg, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// Hub 已关闭 send channel,发送关闭帧
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(msg)
// 把队列里积压的消息一次性写出(批量发送,减少系统调用)
n := len(c.send)
for i := 0; i < n; i++ {
w.Write([]byte("\n"))
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
// 定时发送 Ping 心跳
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Printf("心跳发送失败: userID=%d, err=%v", c.userID, err)
return
}
}
}
}
func (c *Client) handleClientMessage(data []byte) {
// 解析客户端消息,根据类型处理
// 比如:订阅房间、发送聊天消息等
log.Printf("收到客户端消息: userID=%d, data=%s", c.userID, string(data))
}HTTP 升级为 WebSocket + 鉴权
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// 生产中应该严格校验 Origin,防止 CSRF 攻击
CheckOrigin: func(r *http.Request) bool {
origin := r.Header.Get("Origin")
// 只允许我们自己的域名
return origin == "https://yourapp.com" || origin == "http://localhost:3000"
},
}
func ServeWS(hub *Hub, w http.ResponseWriter, r *http.Request) {
// 从 query string 或 header 里取 token 做鉴权
token := r.URL.Query().Get("token")
userID, err := validateToken(token)
if err != nil {
http.Error(w, "未授权", http.StatusUnauthorized)
return
}
roomID := r.URL.Query().Get("room")
// 升级 HTTP 连接为 WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket 升级失败: %v", err)
return
}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
userID: userID,
roomID: roomID,
}
// 注册到 Hub
hub.register <- client
// 两个 goroutine 分别处理读和写
go client.writePump()
go client.readPump()
}踩坑实录
坑1:没有设置写超时,一个慢客户端阻塞所有消息发送
现象: 某个客户端网络很差,写消息时阻塞。因为 goroutine 在等待写完成,积压的消息越来越多,内存持续增长。
原因: WebSocket 写操作如果没有设置超时,会永久等待网络恢复,占用 goroutine 和内存。
解法: 每次写之前必须设置 WriteDeadline:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))坑2:send channel 满了没有处理,goroutine 泄漏
现象: 某个客户端一直不读消息(页面假死),服务端还在一直向 send channel 发数据,channel 满了后 goroutine 阻塞,无法退出。
原因: 向满的 channel 发数据会阻塞,如果没有处理,writePump goroutine 就泄漏了。
解法: 向 send channel 发数据时使用 select + default 非阻塞模式,满了就断开这个客户端连接(如代码中的 Hub 主循环所示)。
坑3:多实例部署时,WebSocket 消息只推给了连接到同一台机器的客户端
现象: 服务扩到3台机器后,有时候发通知,只有部分用户收到,另一部分没收到。
原因: WebSocket 连接是有状态的,用户A连接到机器1,发给用户A的消息如果只通知到机器2,机器2上没有用户A的连接,消息就丢了。
解法: 引入 Redis Pub/Sub 做跨机器广播。每台机器订阅 Redis 频道,收到消息后推给本机上连接的用户:
// 发布消息时,发到 Redis
redisClient.Publish(ctx, "ws:broadcast", msgJSON)
// 每台机器订阅 Redis
pubsub := redisClient.Subscribe(ctx, "ws:broadcast")
for msg := range pubsub.Channel() {
// 推给本机上连接的用户
hub.broadcast <- parseMessage(msg.Payload)
}前端连接示例(参考)
// 前端 WebSocket 连接(带断线重连)
class WSClient {
constructor(url) {
this.url = url;
this.ws = null;
this.reconnectDelay = 1000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket 连接成功');
this.reconnectDelay = 1000; // 重置退避时间
};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
this.handleMessage(msg);
};
this.ws.onclose = () => {
console.log(`连接断开,${this.reconnectDelay}ms 后重连`);
setTimeout(() => {
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
this.connect();
}, this.reconnectDelay);
};
}
}