Go 网络编程深度实战——TCP/UDP 原始套接字、非阻塞 IO、epoll
Go 网络编程深度实战——TCP/UDP 原始套接字、非阻塞 IO、epoll
适读人群:想深入理解 Go 网络模型、做高性能网络服务的工程师 | 阅读时长:约18分钟 | 核心价值:从 Go 标准库的 net 包到底层 netpoll 机制,彻底理解 Go 的网络 IO
一次面试让我意识到自己网络基础不够扎实
两年前我去面一个高级后端职位,面试官问:"Go 的 net.Conn 底层是阻塞还是非阻塞 IO?" 我回答说:"阻塞的,因为 conn.Read() 会阻塞直到有数据。"
面试官追问:"那 Go 能同时处理几万个并发连接,用阻塞 IO 不会每个连接都占用一个线程吗?"
我一下子卡住了。
回来我查了很多资料才搞清楚:Go 的网络 IO 在操作系统层面是非阻塞的,Go runtime 在上面用 netpoll(Linux 上是 epoll,Mac 上是 kqueue)做了一层调度,把系统级非阻塞 IO 包装成了看起来阻塞(实际是协程挂起)的 API。
这是 Go 高并发网络的核心秘密。
Go 网络 IO 的分层架构
理解 Go 网络编程,需要从三个层面来看:
应用代码层: conn.Read(), conn.Write() ← 看起来阻塞
↕
Go Runtime层:netpoll (epoll/kqueue) ← 实际非阻塞
↕
OS 系统调用: epoll_wait, read, write ← 非阻塞 syscall当你调用 conn.Read() 时:
- Go runtime 把当前 goroutine 挂起(不是阻塞线程!)
- 把这个 fd 注册到 epoll,等待可读事件
- 有数据时 epoll 通知 runtime,runtime 唤醒对应 goroutine
- goroutine 继续执行
Read
这就是为什么 Go 能用少量 OS 线程(M),调度几百万 goroutine 处理网络 IO。
TCP 服务端:完整实现
package tcpserver
import (
"bufio"
"context"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"
)
// TCPServer 是一个高性能 TCP 服务端
type TCPServer struct {
addr string
handler ConnHandler
listener net.Listener
activeConns sync.Map // 追踪活跃连接
connCount int64
ctx context.Context
cancel context.CancelFunc
}
// ConnHandler 处理一个连接
type ConnHandler func(ctx context.Context, conn net.Conn)
func NewTCPServer(addr string, handler ConnHandler) *TCPServer {
ctx, cancel := context.WithCancel(context.Background())
return &TCPServer{
addr: addr,
handler: handler,
ctx: ctx,
cancel: cancel,
}
}
func (s *TCPServer) Start() error {
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("监听 %s 失败: %w", s.addr, err)
}
s.listener = ln
fmt.Printf("TCP 服务启动,监听 %s\n", s.addr)
go s.acceptLoop()
return nil
}
func (s *TCPServer) acceptLoop() {
for {
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.ctx.Done():
return // 服务关闭
default:
fmt.Printf("Accept 错误: %v\n", err)
// 短暂等待后继续(避免 accept 失败时 CPU 空转)
time.Sleep(5 * time.Millisecond)
continue
}
}
// 每个连接用一个 goroutine 处理
atomic.AddInt64(&s.connCount, 1)
s.activeConns.Store(conn.RemoteAddr().String(), conn)
go func(c net.Conn) {
defer func() {
c.Close()
s.activeConns.Delete(c.RemoteAddr().String())
atomic.AddInt64(&s.connCount, -1)
}()
s.handler(s.ctx, c)
}(conn)
}
}
func (s *TCPServer) Stop() {
s.cancel()
s.listener.Close()
// 等待所有连接关闭(最多等 30 秒)
deadline := time.Now().Add(30 * time.Second)
for time.Now().Before(deadline) {
if atomic.LoadInt64(&s.connCount) == 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
fmt.Println("TCP 服务已关闭")
}
func (s *TCPServer) ConnCount() int64 {
return atomic.LoadInt64(&s.connCount)
}
// Echo Handler:把收到的数据回显给客户端
func EchoHandler(ctx context.Context, conn net.Conn) {
// 设置读超时,避免连接一直挂着
reader := bufio.NewReader(conn)
for {
// 检查是否应该退出
select {
case <-ctx.Done():
return
default:
}
// 设置读超时
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
line, err := reader.ReadString('\n')
if err != nil {
if err != io.EOF {
// 超时或其他错误
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
// 读超时,发送 keepalive 或断开连接
continue
}
}
return
}
// 回显
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if _, err := fmt.Fprintf(conn, "ECHO: %s", line); err != nil {
return
}
}
}TCP 客户端:连接池实现
package tcpclient
import (
"fmt"
"net"
"sync"
"time"
)
// ConnPool TCP 连接池
type ConnPool struct {
addr string
maxSize int
minSize int
timeout time.Duration
mu sync.Mutex
idle []net.Conn
total int
}
func NewConnPool(addr string, minSize, maxSize int) *ConnPool {
p := &ConnPool{
addr: addr,
maxSize: maxSize,
minSize: minSize,
timeout: 5 * time.Second,
}
// 预热:建立最小数量的连接
for i := 0; i < minSize; i++ {
conn, err := p.newConn()
if err != nil {
break
}
p.idle = append(p.idle, conn)
p.total++
}
return p
}
func (p *ConnPool) newConn() (net.Conn, error) {
return net.DialTimeout("tcp", p.addr, p.timeout)
}
func (p *ConnPool) Get() (net.Conn, error) {
p.mu.Lock()
defer p.mu.Unlock()
// 从空闲池取
for len(p.idle) > 0 {
conn := p.idle[len(p.idle)-1]
p.idle = p.idle[:len(p.idle)-1]
// 检查连接是否还活着
conn.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
one := make([]byte, 1)
_, err := conn.Read(one)
conn.SetReadDeadline(time.Time{})
if err != nil {
// 连接已断开,废弃
conn.Close()
p.total--
continue
}
return conn, nil
}
// 空闲池为空,新建连接
if p.total >= p.maxSize {
return nil, fmt.Errorf("连接池已满(%d/%d)", p.total, p.maxSize)
}
conn, err := p.newConn()
if err != nil {
return nil, err
}
p.total++
return conn, nil
}
func (p *ConnPool) Put(conn net.Conn) {
p.mu.Lock()
defer p.mu.Unlock()
p.idle = append(p.idle, conn)
}
func (p *ConnPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for _, conn := range p.idle {
conn.Close()
}
p.idle = nil
}UDP:实现简单的消息广播
package udp
import (
"fmt"
"net"
"sync"
)
// UDPBroadcaster 接收 UDP 消息并广播给所有注册的客户端
type UDPBroadcaster struct {
conn *net.UDPConn
clients sync.Map // addr -> struct{}
bufSize int
}
func NewUDPBroadcaster(listenAddr string) (*UDPBroadcaster, error) {
addr, err := net.ResolveUDPAddr("udp", listenAddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
// 设置接收缓冲区大小(避免高流量时丢包)
conn.SetReadBuffer(4 * 1024 * 1024) // 4MB
conn.SetWriteBuffer(4 * 1024 * 1024)
return &UDPBroadcaster{
conn: conn,
bufSize: 65535,
}, nil
}
func (b *UDPBroadcaster) Run() {
buf := make([]byte, b.bufSize)
for {
n, srcAddr, err := b.conn.ReadFromUDP(buf)
if err != nil {
fmt.Printf("UDP 读取错误: %v\n", err)
return
}
data := make([]byte, n)
copy(data, buf[:n])
// 注册新客户端
b.clients.Store(srcAddr.String(), srcAddr)
// 广播给所有客户端(排除发送者)
go b.broadcast(data, srcAddr)
}
}
func (b *UDPBroadcaster) broadcast(data []byte, exclude *net.UDPAddr) {
b.clients.Range(func(key, value interface{}) bool {
addr := value.(*net.UDPAddr)
if addr.String() == exclude.String() {
return true // 跳过发送者
}
if _, err := b.conn.WriteToUDP(data, addr); err != nil {
// 发送失败,可能客户端已断开
b.clients.Delete(key)
}
return true
})
}理解 Go 的 netpoll(不直接用 epoll,但要理解它)
Go 的 net 包已经帮你处理了 epoll,你不需要直接写 epoll 代码。但理解 Go 是怎么用 epoll 的,能帮你避免很多性能问题。
// 这个模式会创建大量 goroutine,但 Go 能高效处理
// 每个 goroutine 的栈从 2KB 开始(不是 Java 线程的 1MB)
for {
conn, _ := listener.Accept()
go handle(conn) // 不要担心 goroutine 数量,这是 Go 的正确用法
}
// 你需要担心的是:
// 1. 没有设置 deadline,导致 goroutine 泄漏
conn.SetDeadline(time.Now().Add(30 * time.Second))
// 2. 没有限制最大并发连接数
// 用 semaphore 控制
sem := make(chan struct{}, maxConns)
for {
conn, _ := listener.Accept()
sem <- struct{}{} // 获取令牌
go func() {
defer func() { <-sem }() // 释放令牌
handle(conn)
}()
}三个踩坑实录
坑一:没设 Deadline 导致 goroutine 泄漏
现象:服务运行几天后 goroutine 数量从几百涨到几万,内存持续增长。
原因:TCP 连接的对端断开时,不一定会发 FIN 包(比如网线直接拔掉),conn.Read() 会永远阻塞,对应的 goroutine 永远挂着。
解法:所有的 conn.Read() 和 conn.Write() 都要设置 deadline。对于长连接,每次读写前刷新 deadline:
// 每次读写前刷新 deadline
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
_, err := conn.Read(buf)坑二:大量小包导致网络吞吐下降
现象:发送大量小消息时,实际吞吐量远低于预期,网络利用率只有 10%。
原因:Nagle 算法把小包合并发送,但 Go 的 net.TCPConn 默认开启了 TCP_NODELAY,禁用了 Nagle 算法,导致每个小包都立即发出,每个包都有 40 字节的 IP+TCP header 开销。
解法:
- 在应用层自己做批量发送(把多个小消息合并成一个大包)
- 或者关闭 TCP_NODELAY(但这会增加延迟):
tcpConn.SetNoDelay(false) - 使用
bufio.Writer+ 定期 Flush
坑三:UDP 大量丢包
现象:UDP 服务在高流量时丢包率高达 50%。
原因:操作系统的 UDP 接收缓冲区(默认 208KB)被打满,新来的包被丢弃。
解法:
- 调大内核 UDP 缓冲区:
sysctl -w net.core.rmem_max=67108864 - 在 Go 代码里调大 socket buffer:
conn.SetReadBuffer(4 * 1024 * 1024) - 多开几个 goroutine 并发读取 UDP 包(单 goroutine 读取速度有上限)
Java 对比
Java 的网络编程经历了几个阶段:ServerSocket(同步阻塞)→ NIO Selector(多路复用)→ Netty(基于 NIO 的高性能框架)。
Go 把这些全部统一了:net.Listen + conn.Read/Write,底层自动用 epoll,应用代码写起来和同步阻塞一样简单。
在 Java 里用 Netty 写高性能网络服务,需要理解 EventLoop、ChannelPipeline、ByteBuf 这些 Netty 特有的概念。Go 里就是 goroutine + net 包,简单直接,心智负担小很多。
小结
- Go 的 net 包底层是 epoll/kqueue:goroutine 挂起 ≠ 线程阻塞,不要怕 goroutine 数量
- Deadline 必须设置:没有 deadline 的连接是 goroutine 泄漏的温床
- 用 bufio 优化小包:避免大量小包导致吞吐下降
- UDP 必须设大 buffer:默认缓冲区在高流量下必然丢包
- 连接池复用 TCP 连接:避免频繁建立连接的开销
