Go 消息队列实战——sarama 操作 Kafka、消费者组、偏移量管理
Go 消息队列实战——sarama 操作 Kafka、消费者组、偏移量管理
适读人群:需要在 Go 服务中集成 Kafka 的工程师、熟悉 Java Kafka Client 想了解 Go 方案的同学 | 阅读时长:约20分钟 | 核心价值:掌握 sarama 库的生产者、消费者组完整用法,搞清楚偏移量管理和消息幂等性
一个被 Kafka 消费者坑了整整三天的故事
2022年年底,我刚转 Go 不久,需要把一个 Java 的 Kafka 消费者用 Go 重写。Java 那边用的是 spring-kafka,注解一写就完事。
Go 这边我选了 sarama 库,翻了翻 README,看起来不难。然后就开始踩坑。
第一天:消费者组 rebalance 太频繁,消息处理延迟高。 第二天:重启服务后,某些分区的消息重复消费了,数据库里出现了重复记录。 第三天:提交偏移量的方式不对,服务重启后消息大量丢失。
三天踩了三个经典坑,但也因此把 Kafka 消费者的原理彻底搞懂了。
今天把这三个坑以及正确做法,完整写出来。
sarama vs confluent-kafka-go:选哪个?
Go 有两个主流 Kafka 客户端:
| 库 | 特点 | 适用场景 |
|---|---|---|
github.com/IBM/sarama(原 Shopify/sarama) | 纯 Go 实现,不依赖 librdkafka | 大多数场景 |
github.com/confluentinc/confluent-kafka-go | CGo 封装 librdkafka,性能更强 | 超高吞吐量场景 |
我选 sarama:纯 Go、无 CGo 依赖,交叉编译方便,Docker 镜像也更小。
go get github.com/IBM/sarama生产者实战
package producer
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/IBM/sarama"
)
// OrderEvent 订单事件
type OrderEvent struct {
OrderID int64 `json:"order_id"`
UserID int64 `json:"user_id"`
EventType string `json:"event_type"`
Amount int64 `json:"amount"`
CreatedAt time.Time `json:"created_at"`
}
// KafkaProducer 异步生产者(高吞吐量场景推荐)
type KafkaProducer struct {
asyncProducer sarama.AsyncProducer
topic string
}
func NewKafkaProducer(brokers []string, topic string) (*KafkaProducer, error) {
config := sarama.NewConfig()
// 设置 Kafka 版本(要和你的 Kafka 服务端版本对应)
config.Version = sarama.V2_8_0_0
// 生产者确认模式:
// WaitForAll = 等待所有副本确认(最强可靠性,但延迟较高)
// WaitForLocal = 等待 leader 确认(默认,平衡)
// NoResponse = 不等待确认(最高吞吐量,可能丢消息)
config.Producer.RequiredAcks = sarama.WaitForAll
// 开启幂等生产者(Kafka 0.11+ 支持),防止网络重试导致的重复消息
config.Producer.Idempotent = true
config.Net.MaxOpenRequests = 1 // 幂等模式必须设置
// 重试次数
config.Producer.Retry.Max = 3
config.Producer.Retry.Backoff = 100 * time.Millisecond
// 开启成功/失败回调 channel(异步模式必须设置)
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// 消息压缩(SNAPPY 比 GZIP 快,推荐)
config.Producer.Compression = sarama.CompressionSnappy
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, fmt.Errorf("创建 Kafka 生产者失败: %w", err)
}
kp := &KafkaProducer{asyncProducer: producer, topic: topic}
// 启动 goroutine 处理成功/失败回调(异步模式必须消费这两个 channel)
go kp.handleCallbacks()
return kp, nil
}
// handleCallbacks 处理发送结果(异步)
func (kp *KafkaProducer) handleCallbacks() {
for {
select {
case msg, ok := <-kp.asyncProducer.Successes():
if !ok {
return
}
log.Printf("消息发送成功: topic=%s partition=%d offset=%d",
msg.Topic, msg.Partition, msg.Offset)
case err, ok := <-kp.asyncProducer.Errors():
if !ok {
return
}
log.Printf("消息发送失败: %v, key=%v", err.Err, err.Msg.Key)
// 生产中这里应该做告警和重试记录
}
}
}
// SendOrderEvent 发送订单事件
func (kp *KafkaProducer) SendOrderEvent(event *OrderEvent) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("序列化失败: %w", err)
}
// 使用 OrderID 作为 Key,保证同一订单的消息发到同一分区(顺序保证)
msg := &sarama.ProducerMessage{
Topic: kp.topic,
Key: sarama.StringEncoder(fmt.Sprintf("%d", event.OrderID)),
Value: sarama.ByteEncoder(data),
// 可以设置 Headers 传递元数据
Headers: []sarama.RecordHeader{
{Key: []byte("event_type"), Value: []byte(event.EventType)},
{Key: []byte("source"), Value: []byte("order-service")},
},
}
// 异步发送(不阻塞)
kp.asyncProducer.Input() <- msg
return nil
}
// Close 优雅关闭生产者
func (kp *KafkaProducer) Close() error {
return kp.asyncProducer.Close()
}消费者组实战(重点)
消费者组是 Kafka 的精华,也是最容易搞错的地方。
package consumer
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"github.com/IBM/sarama"
)
// OrderEventHandler 处理订单事件
type OrderEventHandler interface {
Handle(event *OrderEvent) error
}
// ConsumerGroup Kafka 消费者组
type ConsumerGroup struct {
client sarama.ConsumerGroup
topics []string
groupID string
handler OrderEventHandler
wg sync.WaitGroup
}
func NewConsumerGroup(brokers []string, groupID string, topics []string, handler OrderEventHandler) (*ConsumerGroup, error) {
config := sarama.NewConfig()
config.Version = sarama.V2_8_0_0
// 消费位移重置策略:
// OffsetNewest = 只消费新消息(适合新服务接入,不处理历史数据)
// OffsetOldest = 从最早的消息开始消费(适合数据补全)
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// 偏移量自动提交(危险!后面会解释为什么)
// config.Consumer.Offsets.AutoCommit.Enable = true // 不推荐
config.Consumer.Offsets.AutoCommit.Enable = false // 推荐手动提交
// 消费者组 Rebalance 策略:
// Sticky 策略减少 rebalance 时的分区重新分配
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.NewBalanceStrategySticky(),
}
// 会话超时:如果超过这个时间没有心跳,认为消费者挂了
config.Consumer.Group.Session.Timeout = 30 * time.Second
// 心跳间隔(应该是 Session.Timeout 的1/3)
config.Consumer.Group.Heartbeat.Interval = 10 * time.Second
// Fetch 配置(影响吞吐量)
config.Consumer.Fetch.Default = 1 * 1024 * 1024 // 1MB per fetch
config.Consumer.MaxWaitTime = 500 * time.Millisecond
client, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
return nil, err
}
return &ConsumerGroup{
client: client,
topics: topics,
groupID: groupID,
handler: handler,
}, nil
}
// Start 启动消费(阻塞,传入 ctx 控制退出)
func (cg *ConsumerGroup) Start(ctx context.Context) error {
// sarama 的消费者组 handler 接口实现
cgHandler := &consumerGroupHandler{handler: cg.handler}
cg.wg.Add(1)
go func() {
defer cg.wg.Done()
for {
// Consume 是阻塞调用,rebalance 会自动重连
if err := cg.client.Consume(ctx, cg.topics, cgHandler); err != nil {
log.Printf("消费者组错误: %v", err)
}
// ctx 取消时退出
if ctx.Err() != nil {
return
}
}
}()
// 等待退出信号
cg.wg.Wait()
return nil
}
// consumerGroupHandler 实现 sarama.ConsumerGroupHandler 接口
type consumerGroupHandler struct {
handler OrderEventHandler
}
// Setup 在 rebalance 开始后、消费开始前调用
func (h *consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
log.Println("消费者组 Setup:rebalance 完成,开始消费")
return nil
}
// Cleanup 在 rebalance 开始前调用(停止消费)
func (h *consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
log.Println("消费者组 Cleanup:准备 rebalance,停止消费")
return nil
}
// ConsumeClaim 消费分配到的分区
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
// 处理消息
var event OrderEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("消息反序列化失败: %v,跳过", err)
// 即使反序列化失败,也要提交偏移量,避免卡住
session.MarkMessage(msg, "")
continue
}
// 调用业务处理逻辑
if err := h.handler.Handle(&event); err != nil {
log.Printf("处理消息失败: orderID=%d, err=%v", event.OrderID, err)
// 注意:失败是否要重试?这取决于业务,这里跳过继续消费
// 如果需要重试,可以把消息发到 retry topic
}
// 手动提交偏移量(标记这条消息已处理)
// MarkMessage 只是标记,不立即提交到 Kafka
// 真正提交发生在 session.Commit() 或 AutoCommit 间隔
session.MarkMessage(msg, "")
}
return nil
}踩坑实录
坑1:开启了自动提交偏移量,消费者重启后大量消息丢失
现象: 消费者处理消息时发生 panic 重启,重启后发现很多消息没被处理,但 Kafka 里已经提交了更高的偏移量,无法重新消费。
原因: AutoCommit.Enable = true 时,sarama 会每隔1秒(默认)自动提交当前的偏移量,不管消息是否真正处理完成。如果处理到一半崩溃,已经提交的偏移量就回不来了。
解法: 关闭自动提交,改用 session.MarkMessage() 手动标记已处理的消息。sarama 会在下次提交时提交所有已标记的偏移量:
config.Consumer.Offsets.AutoCommit.Enable = false
// 消息处理成功后
session.MarkMessage(msg, "")
// 批量提交(可选,减少提交频率)
session.Commit()坑2:Session.Timeout 设置太短,频繁触发 Rebalance
现象: 消费者在处理耗时较长的消息(比如需要调用外部 API)时,心跳超时,触发 Rebalance,消费中断,消息被重新分配给其他消费者处理,导致重复消费。
原因: 默认的 Session.Timeout 是10秒,心跳间隔是3秒。如果单条消息处理超过10秒,心跳会停,触发 Rebalance。
解法: 把消息处理放到独立 goroutine,让 ConsumeClaim 的循环只负责接收消息和发送确认,不阻塞:
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
sem := make(chan struct{}, 10) // 并发处理限制
for msg := range claim.Messages() {
sem <- struct{}{} // 获取信号量
go func(m *sarama.ConsumerMessage) {
defer func() { <-sem }()
// 处理消息...
session.MarkMessage(m, "")
}(msg)
}
return nil
}坑3:消费者组 ID 写错,多个服务共用同一个 group,消息被瓜分
现象: 两个不同业务的服务,consumer group ID 恰好相同,结果一个服务只消费到了部分消息,另一个也只消费到部分。
原因: 同一个 consumer group 里的消费者,Kafka 会自动分配分区,每个分区只被组内一个消费者消费。如果两个本不相关的服务用了同一个 group ID,它们会竞争分区。
解法: consumer group ID 要按服务名唯一命名,推荐格式:{服务名}-{环境}-{业务功能},比如 order-service-prod-payment-event-consumer。
消费幂等性:必须在业务层保证
Kafka 的 at-least-once 语义意味着消息可能被消费多次(网络抖动、重启等),必须在业务层实现幂等。
最常用的方案:用 Redis 或数据库记录已处理的消息 ID:
func (h *OrderEventHandler) Handle(event *OrderEvent) error {
// 用消息的唯一 key 做幂等检查
idempotentKey := fmt.Sprintf("kafka:order:event:%d:%s", event.OrderID, event.EventType)
// 用 Redis SET NX(不存在才设置)做幂等锁
ok, err := redisClient.SetNX(ctx, idempotentKey, "1", 24*time.Hour).Result()
if err != nil {
return fmt.Errorf("幂等检查失败: %w", err)
}
if !ok {
// 已经处理过,跳过
log.Printf("重复消息,跳过: orderID=%d", event.OrderID)
return nil
}
// 实际业务处理
return h.processOrderEvent(event)
}