Go 和 Redis 的深度集成——连接池调优、Pipeline、Lua 脚本、分布式锁
Go 和 Redis 的深度集成——连接池调优、Pipeline、Lua 脚本、分布式锁
适读人群:在 Go 项目中深度使用 Redis 的工程师 | 阅读时长:约18分钟 | 核心价值:从连接池到分布式锁,Redis 在 Go 生产环境中的完整使用方案
那次凌晨3点的 Redis 报警
那是一个周四凌晨 3 点 17 分,我被告警电话叫醒了。监控显示 Redis 连接数从正常的 200 多个暴涨到了 9847 个,然后 Redis 开始拒绝新连接,API 大面积超时。
我翻身起来,花了 23 分钟才找到原因:有一个批量处理任务在深夜启动,里面每一个并发的处理 goroutine 都建了一个独立的 Redis 连接,任务跑了大概 45 分钟,产生了近 1 万个并发连接。
更蠢的是:这些连接里大多数都是在做单个 GET 操作,每次都有一次 TCP 往返的开销。如果用 Pipeline 批量,本来1秒的任务,可能3秒就处理完了,还不用建那么多连接。
这篇文章从这次事故出发,聊聊 Go 里 Redis 使用的完整方案。
连接池调优
go-redis 默认连接池配置在大多数场景下不够好,需要根据业务场景调整。
踩坑一:PoolSize 设了,但还是超连接
现象: 设了 PoolSize: 100,但 Redis 侧还是看到 800+ 连接。
原因: PoolSize 是每个 Redis 节点的连接数,如果用的是 Redis Cluster(3主3从),实际连接数是 PoolSize × 节点数 = 100 × 6 = 600。还有就是连接池是懒建立的,设了 PoolSize 是上限,不代表连接被复用了。
解法: 理清楚连接数计算方式,同时配置 MinIdleConns 保证连接预热:
package cache
import (
"context"
"crypto/tls"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// Config Redis 连接配置
type Config struct {
Addr string
Password string
DB int
PoolSize int // 连接池大小,建议: CPU核数 * 10
MinIdleConns int // 最小空闲连接数,保证预热
MaxIdleConns int // 最大空闲连接数
ConnMaxIdleTime time.Duration // 空闲连接最大存活时间
ConnMaxLifetime time.Duration // 连接最大存活时间
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}
func NewClient(cfg Config) (*redis.Client, error) {
rdb := redis.NewClient(&redis.Options{
Addr: cfg.Addr,
Password: cfg.Password,
DB: cfg.DB,
// 连接池配置
PoolSize: cfg.PoolSize, // 最大活跃连接数
MinIdleConns: cfg.MinIdleConns, // 预建立的空闲连接
MaxIdleConns: cfg.MaxIdleConns,
ConnMaxIdleTime: cfg.ConnMaxIdleTime,
ConnMaxLifetime: cfg.ConnMaxLifetime,
// 超时配置
DialTimeout: cfg.DialTimeout,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
// 连接池满时的等待时间
PoolTimeout: cfg.ReadTimeout + 100*time.Millisecond,
})
// 验证连接
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := rdb.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("redis ping failed: %w", err)
}
return rdb, nil
}
// DefaultConfig 生产环境推荐配置
func DefaultConfig(addr, password string) Config {
return Config{
Addr: addr,
Password: password,
DB: 0,
PoolSize: 50, // 根据实际 QPS 调整
MinIdleConns: 10,
MaxIdleConns: 30,
ConnMaxIdleTime: 10 * time.Minute,
ConnMaxLifetime: 1 * time.Hour,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
}
}Pipeline:批量操作的正确姿势
踩坑二:for 循环里一条条 GET,延迟累积
现象: 渲染一个列表页需要查 50 个用户的信息,每个用户都需要从 Redis 读一次缓存,总耗时 47ms,但每次 Redis 操作只需要 0.3ms。
原因: 50次 × 0.3ms = 15ms,但实际 47ms,多出来的 32ms 全是 RTT(往返延迟)×50 次网络往返。
解法: Pipeline 批量发送:
package cache
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type UserCache struct {
rdb *redis.Client
}
// GetMultiple 批量获取用户缓存,用 Pipeline 减少 RTT
func (c *UserCache) GetMultiple(ctx context.Context, userIDs []string) (map[string]*User, error) {
if len(userIDs) == 0 {
return make(map[string]*User), nil
}
// 使用 Pipeline 批量发送所有 GET 命令
pipe := c.rdb.Pipeline()
cmds := make([]*redis.StringCmd, len(userIDs))
for i, id := range userIDs {
cmds[i] = pipe.Get(ctx, userCacheKey(id))
}
// 一次性发送,一次性接收
if _, err := pipe.Exec(ctx); err != nil && err != redis.Nil {
return nil, fmt.Errorf("pipeline exec: %w", err)
}
result := make(map[string]*User, len(userIDs))
for i, cmd := range cmds {
val, err := cmd.Result()
if err == redis.Nil {
// cache miss,正常情况
continue
}
if err != nil {
return nil, fmt.Errorf("get user %s: %w", userIDs[i], err)
}
var user User
if err := json.Unmarshal([]byte(val), &user); err != nil {
// 缓存数据损坏,跳过这个 key
continue
}
result[userIDs[i]] = &user
}
return result, nil
}
// SetMultiple 批量写入用户缓存
func (c *UserCache) SetMultiple(ctx context.Context, users []*User, ttl time.Duration) error {
if len(users) == 0 {
return nil
}
pipe := c.rdb.Pipeline()
for _, user := range users {
data, err := json.Marshal(user)
if err != nil {
continue
}
pipe.Set(ctx, userCacheKey(user.ID), data, ttl)
}
if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("pipeline set: %w", err)
}
return nil
}
func userCacheKey(id string) string {
return "user:" + id
}Pipeline 之后,50次批量查询耗时从 47ms 降到了 2.3ms。
Lua 脚本:原子操作
有些操作需要原子性,比如"读-改-写",单纯用 WATCH/MULTI/EXEC 的乐观锁方案在高竞争时性能差。Lua 脚本在 Redis 里是原子执行的,是解决这类问题的利器。
package cache
import (
"context"
"errors"
"github.com/redis/go-redis/v9"
)
// 计数器限制脚本:原子地增加计数,如果超过限制则失败
// KEYS[1]: 计数器 key
// ARGV[1]: 最大值
// ARGV[2]: 过期时间(秒)
// 返回:0 表示超限,1 表示成功,返回当前值
var incrWithLimitScript = redis.NewScript(`
local current = redis.call("INCR", KEYS[1])
if current == 1 then
redis.call("EXPIRE", KEYS[1], tonumber(ARGV[2]))
end
if current > tonumber(ARGV[1]) then
return 0
end
return current
`)
// IncrWithLimit 原子增加计数,超过 limit 返回 false
func (c *UserCache) IncrWithLimit(ctx context.Context, key string, limit, expireSeconds int) (bool, error) {
result, err := incrWithLimitScript.Run(ctx, c.rdb, []string{key}, limit, expireSeconds).Int()
if err != nil {
return false, err
}
return result > 0, nil
}分布式锁:go-redis 官方方案
踩坑三:自己实现的分布式锁有 bug
现象: 早期我自己写了一个简单的分布式锁,用 SET key value NX PX timeout 实现,但在服务重启时偶尔出现"锁没释放,但服务已经没了"的情况,导致某些任务被阻塞。
原因: 释放锁时没有检查 value 是否是自己设的(只有持有锁的人才能释放),导致一种情况:A 设锁,锁过期,B 设锁,A 任务完成后删掉了 B 的锁。
解法: 用官方的 rueidis 库或者 go-redis/extra/redisotel,不要自己实现锁。如果要自己实现,释放时必须用 Lua 脚本原子检查:
package lock
import (
"context"
"errors"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ErrLockNotObtained = errors.New("lock not obtained")
var ErrLockNotHeld = errors.New("lock not held by this instance")
// 释放锁的 Lua 脚本:检查 value 匹配才删除
var releaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
// DistributedLock 分布式锁
type DistributedLock struct {
rdb *redis.Client
key string
value string
ttl time.Duration
}
// NewDistributedLock 创建一个分布式锁(不立即获取)
func NewDistributedLock(rdb *redis.Client, key string, ttl time.Duration) *DistributedLock {
return &DistributedLock{
rdb: rdb,
key: key,
value: generateLockValue(), // 唯一标识持有者
ttl: ttl,
}
}
// Lock 尝试获取锁,失败立即返回错误
func (l *DistributedLock) Lock(ctx context.Context) error {
ok, err := l.rdb.SetNX(ctx, l.key, l.value, l.ttl).Result()
if err != nil {
return fmt.Errorf("redis setnx: %w", err)
}
if !ok {
return ErrLockNotObtained
}
return nil
}
// Unlock 释放锁(只有持有者才能释放)
func (l *DistributedLock) Unlock(ctx context.Context) error {
result, err := releaseScript.Run(ctx, l.rdb, []string{l.key}, l.value).Int()
if err != nil {
return fmt.Errorf("release lock: %w", err)
}
if result == 0 {
return ErrLockNotHeld
}
return nil
}
// TryLock 带重试的获取锁
func (l *DistributedLock) TryLock(ctx context.Context, retryInterval time.Duration) error {
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for {
err := l.Lock(ctx)
if err == nil {
return nil
}
if !errors.Is(err, ErrLockNotObtained) {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
// 继续重试
}
}
}
func generateLockValue() string {
// 生成唯一值,结合主机名 + 进程 ID + 随机数
return fmt.Sprintf("%d", time.Now().UnixNano())
}使用:
func processOrder(ctx context.Context, rdb *redis.Client, orderID string) error {
lock := NewDistributedLock(rdb, "order:lock:"+orderID, 30*time.Second)
// 最多等5秒获取锁
lockCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := lock.TryLock(lockCtx, 100*time.Millisecond); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("order %s is being processed by another instance", orderID)
}
return err
}
defer lock.Unlock(ctx) // 务必在 defer 里释放
// 处理订单
return doProcessOrder(ctx, orderID)
}Go vs Java:Jedis/Lettuce vs go-redis
Java 的 Redis 客户端主要有 Jedis(同步)和 Lettuce(异步/响应式)。Jedis 简单易用,Lettuce 性能更好但复杂。
go-redis 在设计上接近 Jedis 的简洁性,但因为 Go 的 goroutine 模型,它内部是非阻塞的,用起来是同步风格,实际上底层是多路复用。
一个明显的区别:go-redis 的 Pipeline 比 Jedis 的更直观,不需要 try-with-resources,不需要显式关闭 pipeline 对象,风格更符合 Go 的习惯。
