gRPC 流式通信实战——单向流、双向流、心跳保活在 Go 中的实现
gRPC 流式通信实战——单向流、双向流、心跳保活在 Go 中的实现
适读人群:已掌握 gRPC 基础的 Go 工程师 | 阅读时长:约20分钟 | 核心价值:彻底搞懂 gRPC 四种通信模式,在实时推送、文件传输、聊天等场景中做出正确选择
那次让我彻夜不眠的实时推送需求
2023年初,我们团队接了一个监控大盘的需求:要求服务器把实时指标数据推送给前端,延迟不超过500ms,数据量很大,每秒几百条记录。
当时技术选型讨论了很久。前端同学说用 WebSocket,后端同学说用 SSE(Server-Sent Events),我想试试 gRPC 流式。
最后我说:"gRPC 流式来搞,不行再换。"
然后就是那个让我连续熬了两个夜晚的需求。不是因为 gRPC 流式难,而是我把几个概念搞混了:什么时候用服务端流、什么时候用双向流、心跳怎么做、流断了怎么重连。
这篇文章是我那段经历的总结。把四种通信模式讲清楚,把流式常见的坑列出来,让你不用熬两个夜。
gRPC 四种通信模式
gRPC 有四种 RPC 类型,搞清楚它们是理解流式的基础:
| 类型 | 请求 | 响应 | 典型场景 |
|---|---|---|---|
| Unary RPC | 单个消息 | 单个消息 | 普通增删改查 |
| Server Streaming | 单个消息 | 消息流 | 实时推送、文件下载 |
| Client Streaming | 消息流 | 单个消息 | 文件上传、批量写入 |
| Bidirectional Streaming | 消息流 | 消息流 | 聊天、协同编辑 |
Java 工程师可以这样理解:
- Unary = 普通 HTTP 请求/响应
- Server Streaming = SSE(Server-Sent Events)
- Client Streaming = 分块上传
- Bidirectional = WebSocket
Proto 文件定义四种模式
syntax = "proto3";
option go_package = "stream-demo/pb/monitor;monitor";
package monitor;
// 指标数据
message MetricData {
string metric_name = 1;
double value = 2;
int64 timestamp = 3;
string host = 4;
}
// 查询请求
message QueryRequest {
string host = 1;
int32 duration = 2; // 推送持续秒数
}
// 批量上报请求(单条)
message ReportRequest {
MetricData data = 1;
}
// 批量上报响应
message ReportResponse {
int32 received = 1;
string message = 2;
}
// 心跳消息
message Ping {
int64 timestamp = 1;
}
message Pong {
int64 timestamp = 1;
}
service MonitorService {
// 1. Unary:查询单条最新指标
rpc GetLatestMetric(QueryRequest) returns (MetricData);
// 2. Server Streaming:服务端持续推送指标
rpc StreamMetrics(QueryRequest) returns (stream MetricData);
// 3. Client Streaming:客户端批量上报
rpc ReportMetrics(stream ReportRequest) returns (ReportResponse);
// 4. Bidirectional:双向流,做心跳保活
rpc Heartbeat(stream Ping) returns (stream Pong);
}服务端实现
package main
import (
"fmt"
"io"
"log"
"net"
"time"
"google.golang.org/grpc"
pb "stream-demo/pb/monitor"
)
type MonitorServer struct {
pb.UnimplementedMonitorServiceServer
}
// 1. Unary RPC
func (s *MonitorServer) GetLatestMetric(ctx context.Context, req *pb.QueryRequest) (*pb.MetricData, error) {
return &pb.MetricData{
MetricName: "cpu_usage",
Value: 65.5,
Timestamp: time.Now().Unix(),
Host: req.Host,
}, nil
}
// 2. Server Streaming:持续推送指标数据
// stream 参数是服务端向客户端发送消息的"管道"
func (s *MonitorServer) StreamMetrics(req *pb.QueryRequest, stream pb.MonitorService_StreamMetricsServer) error {
log.Printf("开始推送指标,host=%s,持续 %d 秒", req.Host, req.Duration)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
deadline := time.Now().Add(time.Duration(req.Duration) * time.Second)
for {
select {
case <-stream.Context().Done():
// 客户端断开或超时,停止推送
log.Printf("客户端断开: %v", stream.Context().Err())
return nil
case t := <-ticker.C:
if time.Now().After(deadline) {
log.Println("推送时间到,结束流")
return nil
}
// 模拟指标数据
data := &pb.MetricData{
MetricName: "cpu_usage",
Value: 40.0 + float64(t.Second()%30),
Timestamp: t.Unix(),
Host: req.Host,
}
// 向客户端发送一条数据
if err := stream.Send(data); err != nil {
log.Printf("发送失败,流可能已关闭: %v", err)
return err
}
}
}
}
// 3. Client Streaming:接收客户端批量上报
// stream 参数是从客户端接收消息的"管道"
func (s *MonitorServer) ReportMetrics(stream pb.MonitorService_ReportMetricsServer) error {
var count int32
for {
req, err := stream.Recv()
if err == io.EOF {
// 客户端发完了,返回汇总响应
log.Printf("接收完成,共 %d 条", count)
return stream.SendAndClose(&pb.ReportResponse{
Received: count,
Message: fmt.Sprintf("成功接收 %d 条数据", count),
})
}
if err != nil {
log.Printf("接收出错: %v", err)
return err
}
count++
log.Printf("接收第 %d 条: %s=%.2f", count, req.Data.MetricName, req.Data.Value)
}
}
// 4. Bidirectional Streaming:双向心跳
func (s *MonitorServer) Heartbeat(stream pb.MonitorService_HeartbeatServer) error {
log.Println("心跳连接建立")
for {
ping, err := stream.Recv()
if err == io.EOF {
log.Println("心跳流正常关闭")
return nil
}
if err != nil {
log.Printf("心跳接收出错: %v", err)
return err
}
// 立即回复 Pong
pong := &pb.Pong{Timestamp: time.Now().UnixMilli()}
if err := stream.Send(pong); err != nil {
log.Printf("Pong 发送失败: %v", err)
return err
}
latency := time.Now().UnixMilli() - ping.Timestamp
log.Printf("心跳延迟: %dms", latency)
}
}
func main() {
lis, _ := net.Listen("tcp", ":50052")
s := grpc.NewServer()
pb.RegisterMonitorServiceServer(s, &MonitorServer{})
log.Println("流式监控服务启动,监听 :50052")
s.Serve(lis)
}客户端实现(四种模式)
package main
import (
"context"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "stream-demo/pb/monitor"
)
func main() {
conn, err := grpc.Dial("localhost:50052",
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewMonitorServiceClient(conn)
demoServerStreaming(client)
demoClientStreaming(client)
demoBidirectional(client)
}
// 演示服务端流:接收服务端持续推送
func demoServerStreaming(client pb.MonitorServiceClient) {
log.Println("=== 服务端流示例 ===")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.StreamMetrics(ctx, &pb.QueryRequest{
Host: "server-01",
Duration: 5, // 推送5秒
})
if err != nil {
log.Fatalf("建立流失败: %v", err)
}
// 循环接收,直到流结束
for {
data, err := stream.Recv()
if err == io.EOF {
log.Println("服务端流结束")
break
}
if err != nil {
log.Printf("接收出错: %v", err)
break
}
log.Printf("收到指标: %s=%.2f @ %s", data.MetricName, data.Value, data.Host)
}
}
// 演示客户端流:向服务端批量上报
func demoClientStreaming(client pb.MonitorServiceClient) {
log.Println("=== 客户端流示例 ===")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ReportMetrics(ctx)
if err != nil {
log.Fatalf("建立流失败: %v", err)
}
// 发送10条数据
for i := 0; i < 10; i++ {
err := stream.Send(&pb.ReportRequest{
Data: &pb.MetricData{
MetricName: "memory_usage",
Value: float64(60 + i),
Timestamp: time.Now().Unix(),
Host: "server-01",
},
})
if err != nil {
log.Printf("发送失败: %v", err)
break
}
time.Sleep(100 * time.Millisecond)
}
// 关闭发送端,等待服务端回复
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("等待响应失败: %v", err)
}
log.Printf("上报完成: %s", resp.Message)
}
// 演示双向流:心跳保活
func demoBidirectional(client pb.MonitorServiceClient) {
log.Println("=== 双向流心跳示例 ===")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Heartbeat(ctx)
if err != nil {
log.Fatalf("建立心跳流失败: %v", err)
}
// 并发:一个 goroutine 发 Ping,一个接收 Pong
done := make(chan struct{})
// 接收 Pong
go func() {
defer close(done)
for {
pong, err := stream.Recv()
if err != nil {
log.Printf("心跳流关闭: %v", err)
return
}
log.Printf("收到 Pong: %d", pong.Timestamp)
}
}()
// 发送 Ping,每秒一次,共5次
for i := 0; i < 5; i++ {
err := stream.Send(&pb.Ping{Timestamp: time.Now().UnixMilli()})
if err != nil {
log.Printf("发送 Ping 失败: %v", err)
break
}
time.Sleep(time.Second)
}
stream.CloseSend()
<-done
log.Println("心跳演示结束")
}踩坑实录
坑1:服务端流里忘检查 ctx.Done(),goroutine 泄漏
现象: 客户端正常断开后,服务端仍在执行 StreamMetrics,CPU 飙高,日志还在不停打印。
原因: 服务端流是在 goroutine 里运行的,如果客户端断开,stream.Context() 会取消,但你的代码如果不检查这个信号,就会永远循环下去。
解法: 在发送循环里,用 select 监听 stream.Context().Done():
select {
case <-stream.Context().Done():
return nil // 客户端已断开,退出
case <-ticker.C:
// 正常发送逻辑
}坑2:双向流里发送和接收要并发,不能串行
现象: 双向流写法如下,结果程序阻塞,双方都在等对方先发:
stream.Send(ping)
stream.Recv() // 永远等不到原因: 双向流的发送和接收是独立的通道,必须并发执行。如果你先 Send 再 Recv,而服务端也是先 Recv 再 Send,就会形成死锁。
解法: 发送和接收必须在两个 goroutine 里并发运行,如上面客户端代码所示。
坑3:大消息流没有背压控制,内存爆炸
现象: 服务端推送速度远大于客户端处理速度,一段时间后客户端 OOM。
原因: gRPC 流本身没有内置背压机制。服务端可以无限制地发消息,消息会在客户端的内部缓冲区堆积。
解法: 服务端在发送时加速率控制,或者使用双向流让客户端通过 ACK 机制反馈进度:
// 服务端加速率控制
limiter := time.NewTicker(time.Millisecond * 10) // 每10ms最多发一条
defer limiter.Stop()
for {
<-limiter.C
stream.Send(data)
}坑4:流断开后没有重连机制
现象: 网络抖动后,服务端流断开,客户端收到 io.EOF 或错误,整个业务功能停止工作。
原因: gRPC 流不像连接池那样自动重连,流断开后必须由业务代码重新发起调用。
解法: 在客户端实现带退避的重连逻辑:
func connectWithRetry(client pb.MonitorServiceClient) {
for retries := 0; ; retries++ {
err := doStreamMetrics(client)
if err == nil {
return
}
backoff := time.Duration(min(1<<retries, 30)) * time.Second
log.Printf("流断开,%v 后重连 (第%d次)", backoff, retries+1)
time.Sleep(backoff)
}
}四种模式选型建议
根据我的实际使用经验,给出选型建议:
- 实时监控/推送通知:用 Server Streaming。服务端主动推,客户端只需接收。
- 日志/指标批量上报:用 Client Streaming。客户端积攒一批再发,减少 RPC 调用次数。
- 在线聊天/协同编辑:用 Bidirectional Streaming。双方都需要随时发消息。
- 心跳保活:可以用 Bidirectional Streaming,也可以用定时 Unary RPC(更简单,推荐优先考虑)。
我的一个原则:能用 Unary 解决的,不用流式。流式代码更复杂,出了问题更难排查。只有当数据量大、实时性要求高、Unary 代价太大时,才考虑流式。
