Go 大规模并发实战——10万并发连接的 Go 服务是怎么架构的
Go 大规模并发实战——10万并发连接的 Go 服务是怎么架构的
适读人群:需要处理高并发长连接的 Go 工程师 | 阅读时长:约19分钟 | 核心价值:从 0 到 10 万并发连接,Go 服务架构的每一个关键决策及背后的原因
我们是怎么被逼到10万并发这个量级的
去年下半年,我们为一家物流公司做了一个实时货运追踪系统。需求很简单:全国 8000 多辆货车,每辆车上有 GPS 设备,每隔3秒上报一次位置,同时有货主和调度员在 Web 端实时查看位置。
初期我以为这很简单:GPS 设备连 WebSocket 上报数据,前端订阅对应车辆的位置更新。8000辆车,加上可能同时在线的几千个查看端,总并发不过两万多,没什么压力。
但现实狠狠地教育了我。
首先,货主端用的是 H5 页面,同时在线人数峰值达到了 34000 多个(节假日前夕,大量人盯着自己的货在哪)。GPS 设备连接8000多个。调度大屏另外几百个。总并发峰值达到了 42000 多,而且都是长连接。
然后,客户告诉我们:他们要扩展到全国,到时候可能有 6 万辆车,还要对外开放 API 给第三方平台接入……算下来 10 万并发长连接是必须要支撑的。
我之前在 Java 里做过类似的事,用 Netty,那个复杂度是真的高。现在用 Go,我想试试能不能用更简单的方式做到。
10万并发连接意味着什么
先做一个基础的计算:
每个 TCP 连接在操作系统层面需要一个文件描述符(fd)。Linux 默认的 ulimit -n 是 1024,需要调整:
# /etc/security/limits.conf
* soft nofile 1048576
* hard nofile 1048576
# 或者在 systemd service 里
LimitNOFILE=1048576每个 WebSocket 连接在应用层需要维护连接状态,包括:
- 读写缓冲区(go/x/net/websocket 默认4KB + 4KB = 8KB)
- 连接元数据(设备ID、用户ID、订阅列表等)
- Go 里每个连接至少需要 2 个 goroutine(读和写)
10 万连接 × 8KB 缓冲区 = 800MB 内存 10 万连接 × 2 goroutine × ~2KB goroutine 栈 = ~400MB 内存 连接元数据约 ~200MB
光是连接本身就要 1.4GB 内存,还不算业务数据。这要求我们必须精细控制每一处内存分配。
踩坑实录
坑一:用了 gorilla/websocket 的 ReadMessage,内存暴涨
现象: 接了5000个 GPS 设备连接之后,进程内存从 300MB 涨到了 2.1GB,而且还在持续增长。
原因: gorilla/websocket 的 ReadMessage() 会为每条消息分配新的 []byte,8000个设备每3秒上报一次,每秒就是 ~2700 次 GC 对象分配,GC 来不及回收。
解法: 改用 ReadJSON + 固定大小的消息对象 + sync.Pool 复用:
var msgPool = sync.Pool{
New: func() interface{} {
return &LocationMessage{}
},
}
// 读取消息时从 pool 获取对象
msg := msgPool.Get().(*LocationMessage)
defer msgPool.Put(msg) // 用完归还
*msg = LocationMessage{} // 清零
if err := conn.ReadJSON(msg); err != nil {
return
}内存从 2.1GB 降到了 890MB。
坑二:Hub 的广播用大锁,锁竞争严重
现象: 当并发连接数超过 2 万时,CPU 使用率从 35% 涨到了 89%,P99 延迟从 12ms 涨到了 847ms。
原因: 我用了一个全局的 Hub 管理所有连接,广播时加了一把全局 RWMutex。2万个连接,广播一条消息需要遍历所有连接,遍历时要持有读锁,同时新连接注册要写锁,锁竞争极其严重。
解法: 分片 Hub,把连接分散到 N 个独立的 shard,每个 shard 有自己的锁:
const numShards = 64 // 根据 CPU 核数调整
type ShardedHub struct {
shards [numShards]*Shard
}
type Shard struct {
mu sync.RWMutex
clients map[string]*Client // key: deviceID or userID
}
func (h *ShardedHub) getShard(id string) *Shard {
// 用 FNV hash 确定 shard,同一个 id 总是路由到同一个 shard
hash := fnv.New32a()
hash.Write([]byte(id))
return h.shards[hash.Sum32()%numShards]
}分片后锁竞争降低了约 64 倍,P99 延迟降回了 18ms。
坑三:消息发送用无缓冲 channel,背压传导错误
现象: 偶尔有 goroutine 栈里看到大量 goroutine 卡在 ch <- msg 上,内存使用量异常。
原因: 每个客户端有一个 send chan Message,当客户端的网络慢或者短时阻塞时,发送 goroutine 卡在往 channel 写,而此时 Hub 的广播 goroutine 也在等待,背压传导到了错误的地方。
解法: 有缓冲 channel + 非阻塞发送 + 慢客户端丢弃策略:
const clientSendBuffer = 256
type Client struct {
send chan []byte
// ...
}
// 广播时非阻塞发送,丢弃慢客户端
func (s *Shard) broadcast(msg []byte) {
s.mu.RLock()
defer s.mu.RUnlock()
for id, client := range s.clients {
select {
case client.send <- msg:
// 发送成功
default:
// channel 满了,这个客户端太慢,标记为需要断开
// 不能在这里直接操作,避免死锁,用异步方式处理
go s.dropSlowClient(id, client)
}
}
}完整架构
核心 Hub 实现
package hub
import (
"hash/fnv"
"sync"
"sync/atomic"
)
const numShards = 64
// Hub 管理所有 WebSocket 连接
type Hub struct {
shards [numShards]*Shard
totalConns atomic.Int64
msgSent atomic.Int64
msgDropped atomic.Int64
}
type Shard struct {
mu sync.RWMutex
clients map[string]*Client
}
type Client struct {
ID string
Type ClientType // GPS设备 or 查看端
Tags []string // 订阅的标签(如货车ID)
send chan []byte
conn WebSocketConn
hub *Hub
once sync.Once
closeCh chan struct{}
}
type ClientType int
const (
ClientTypeDevice ClientType = iota
ClientTypeViewer
)
// NewHub 创建 Hub
func NewHub() *Hub {
h := &Hub{}
for i := 0; i < numShards; i++ {
h.shards[i] = &Shard{
clients: make(map[string]*Client, 256),
}
}
return h
}
func (h *Hub) getShard(id string) *Shard {
hash := fnv.New32a()
hash.Write([]byte(id))
return h.shards[int(hash.Sum32())%numShards]
}
// Register 注册客户端
func (h *Hub) Register(c *Client) {
shard := h.getShard(c.ID)
shard.mu.Lock()
shard.clients[c.ID] = c
shard.mu.Unlock()
h.totalConns.Add(1)
}
// Unregister 注销客户端
func (h *Hub) Unregister(c *Client) {
shard := h.getShard(c.ID)
shard.mu.Lock()
if _, ok := shard.clients[c.ID]; ok {
delete(shard.clients, c.ID)
close(c.send)
}
shard.mu.Unlock()
h.totalConns.Add(-1)
}
// BroadcastToTag 向订阅了某个 tag 的所有 viewer 广播
// 这是核心的广播路径,需要做到最低延迟
func (h *Hub) BroadcastToTag(tag string, msg []byte) {
// 遍历所有 shard,找到订阅了这个 tag 的客户端
// 在真实项目里,应该维护 tag -> clientIDs 的反向索引
// 这里简化展示核心逻辑
for _, shard := range h.shards {
shard.mu.RLock()
for _, client := range shard.clients {
if client.Type == ClientTypeViewer && client.hasTag(tag) {
select {
case client.send <- msg:
h.msgSent.Add(1)
default:
h.msgDropped.Add(1)
// 记录慢客户端,异步处理
}
}
}
shard.mu.RUnlock()
}
}
func (c *Client) hasTag(tag string) bool {
for _, t := range c.Tags {
if t == tag {
return true
}
}
return false
}
// Metrics 返回当前统计
func (h *Hub) Metrics() map[string]int64 {
return map[string]int64{
"total_connections": h.totalConns.Load(),
"messages_sent": h.msgSent.Load(),
"messages_dropped": h.msgDropped.Load(),
}
}系统级优化
除了代码层面,还需要一些系统级配置:
# TCP 参数优化
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535
sysctl -w net.core.netdev_max_backlog=65535
sysctl -w net.ipv4.tcp_keepalive_time=600
sysctl -w net.ipv4.tcp_keepalive_intvl=60
sysctl -w net.ipv4.tcp_keepalive_probes=10Go 的 HTTP Server 配置:
server := &http.Server{
Addr: ":8080",
Handler: mux,
// 对于 WebSocket,不设置 ReadTimeout(因为连接是长期的)
// 但要设置空闲超时,踢掉僵尸连接
IdleTimeout: 10 * time.Minute,
// 升级成 WebSocket 之前的 HTTP 请求要有超时
ReadHeaderTimeout: 10 * time.Second,
}Go vs Java:Netty vs Go 原生 net/http
做同样的事情,我用 Netty 也做过。Netty 的 Reactor 模型很强大,但需要理解 EventLoop、Channel、Pipeline 这套概念,写完一个正确的 WebSocket 服务需要至少两天时间,还要处理各种 ChannelHandler 的线程安全问题。
Go 的 net/http + goroutine 每连接模型在架构上更简单:每个连接一对 goroutine,读写分离,goroutine 可以阻塞,Go runtime 自己处理 IO 多路复用。
代价是:Go 的模型在极端高并发(百万级连接)时内存开销比 Netty 大,因为每个 goroutine 有最小 2KB 的栈。但对于我们这种 10 万级别,Go 完全够用,而且代码量少一个数量级,维护起来省心很多。
我们的服务最终在 16核32G 的机器上跑到了 97000 个稳定并发连接,CPU 使用率 41%,P99 延迟 22ms。
