Go Redis 客户端实战——go-redis 连接池、Pipeline、Lua 脚本、分布式锁
2026/4/30大约 7 分钟
Go Redis 客户端实战——go-redis 连接池、Pipeline、Lua 脚本、分布式锁
适读人群:Go后端开发者、需要在项目中使用Redis的工程师 | 阅读时长:约18分钟 | 核心价值:go-redis的高级用法能让你的Redis操作更高效、更安全
一、小方的「原子性」事故
小方在一家游戏公司做Go后端,有个道具兑换功能:检查用户积分是否足够,然后扣减积分,发放道具。他用Redis存用户积分:
func redeemItem(userID string, cost int64) error {
// 步骤1:检查积分
points, _ := rdb.Get(ctx, "points:"+userID).Int64()
if points < cost {
return errors.New("积分不足")
}
// 步骤2:扣减积分
rdb.DecrBy(ctx, "points:"+userID, cost)
// 步骤3:发放道具
grantItem(userID)
return nil
}上线后没两天,有玩家发现可以重复兑换道具。问题出在步骤1和步骤2之间:高并发下,多个请求同时通过了步骤1的检查,然后全部执行了步骤2,积分变成了负数。
经典的TOCTOU(Time of Check to Time of Use)竞态条件。
解决方案:用Redis分布式锁或者Lua脚本保证原子性。
二、go-redis基础配置
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func newRedisClient() *redis.Client {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
// 连接池配置
PoolSize: 100, // 最大连接数
MinIdleConns: 10, // 最小空闲连接
MaxIdleConns: 50, // 最大空闲连接
ConnMaxIdleTime: 5 * time.Minute, // 空闲连接超时
ConnMaxLifetime: time.Hour, // 连接最大存活时间
// 超时配置
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
// 连接池等待超时(等不到连接时)
PoolTimeout: 4 * time.Second,
})
// 测试连接
if err := rdb.Ping(ctx).Err(); err != nil {
panic("Redis连接失败: " + err.Error())
}
fmt.Println("Redis连接成功")
return rdb
}
func main() {
rdb := newRedisClient()
defer rdb.Close()
// 基本操作
rdb.Set(ctx, "key", "value", time.Hour)
val, _ := rdb.Get(ctx, "key").Result()
fmt.Println("get:", val)
}三、Pipeline:批量命令减少网络往返
每次Redis命令都需要一次网络往返(RTT)。如果你需要执行100个SET命令,不用Pipeline就是100次RTT;用Pipeline可以一次发送100个命令,一次接收所有结果。
Java对比: Jedis/Lettuce也有Pipeline,概念相同。
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func pipelineDemo(rdb *redis.Client) {
// 方法1:Pipelined(自动执行)
_, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
// 这里的命令不立即执行,而是缓存在pipe里
for i := 0; i < 100; i++ {
pipe.Set(ctx, fmt.Sprintf("key:%d", i), i, time.Hour)
}
return nil
// 闭包结束时,所有命令一次性发送给Redis
})
if err != nil {
fmt.Println("Pipeline错误:", err)
return
}
fmt.Println("批量写入完成")
// 方法2:TxPipelined(事务性Pipeline)
// 注意:这是Redis的MULTI/EXEC,不是Go的事务
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, "tx_key1", "val1", time.Hour)
pipe.Set(ctx, "tx_key2", "val2", time.Hour)
return nil
})
if err != nil {
fmt.Println("TxPipeline错误:", err)
}
// 方法3:手动Pipeline(需要自己获取结果)
pipe := rdb.Pipeline()
cmds := make([]*redis.StringCmd, 100)
for i := 0; i < 100; i++ {
cmds[i] = pipe.Get(ctx, fmt.Sprintf("key:%d", i))
}
_, err = pipe.Exec(ctx)
if err != nil && err != redis.Nil {
fmt.Println("执行错误:", err)
return
}
for i, cmd := range cmds {
val, err := cmd.Result()
if err == redis.Nil {
continue
}
fmt.Printf("key:%d = %s\n", i, val)
}
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
defer rdb.Close()
pipelineDemo(rdb)
}四、Lua脚本:Redis端原子操作
Lua脚本在Redis服务端原子执行,是解决竞态条件的最优雅方案。
解决小方的积分扣减问题
package main
import (
"context"
"errors"
"fmt"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
// Lua脚本:原子性检查并扣减积分
var deductPointsScript = redis.NewScript(`
local key = KEYS[1]
local cost = tonumber(ARGV[1])
local current = tonumber(redis.call('GET', key))
if current == nil then
return -1 -- key不存在
end
if current < cost then
return -2 -- 积分不足
end
redis.call('DECRBY', key, cost)
return redis.call('GET', key) -- 返回扣减后的积分
`)
func redeemItemSafe(rdb *redis.Client, userID string, cost int64) error {
key := "points:" + userID
result, err := deductPointsScript.Run(ctx, rdb, []string{key}, cost).Int64()
if err != nil {
return fmt.Errorf("Redis脚本执行失败: %w", err)
}
switch result {
case -1:
return errors.New("用户积分账户不存在")
case -2:
return errors.New("积分不足")
default:
fmt.Printf("扣减成功,剩余积分: %d\n", result)
return nil
}
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
defer rdb.Close()
// 初始化积分
rdb.Set(ctx, "points:user001", 100, 0)
// 并发测试
err := redeemItemSafe(rdb, "user001", 30)
fmt.Println("第1次兑换:", err)
err = redeemItemSafe(rdb, "user001", 50)
fmt.Println("第2次兑换:", err)
err = redeemItemSafe(rdb, "user001", 40) // 剩余70,扣40后还有30
fmt.Println("第3次兑换:", err)
err = redeemItemSafe(rdb, "user001", 50) // 剩余30,扣50失败
fmt.Println("第4次兑换:", err)
}五、分布式锁:Redlock的Go实现
分布式锁用于多个服务实例之间的互斥访问。
简单版分布式锁
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
type DistributedLock struct {
rdb *redis.Client
key string
value string
ttl time.Duration
}
// Lua脚本:原子性释放锁(只释放自己的锁)
var releaseLockScript = redis.NewScript(`
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
`)
func NewLock(rdb *redis.Client, key string, ttl time.Duration) *DistributedLock {
return &DistributedLock{
rdb: rdb,
key: "lock:" + key,
value: fmt.Sprintf("%d", time.Now().UnixNano()), // 唯一标识
ttl: ttl,
}
}
// TryLock 尝试获取锁(非阻塞)
func (l *DistributedLock) TryLock(ctx context.Context) (bool, error) {
ok, err := l.rdb.SetNX(ctx, l.key, l.value, l.ttl).Result()
return ok, err
}
// Lock 阻塞获取锁(带超时)
func (l *DistributedLock) Lock(ctx context.Context, waitTimeout time.Duration) error {
deadline := time.Now().Add(waitTimeout)
for time.Now().Before(deadline) {
ok, err := l.TryLock(ctx)
if err != nil {
return err
}
if ok {
return nil
}
// 等待一段时间后重试
time.Sleep(50 * time.Millisecond)
}
return errors.New("获取锁超时")
}
// Unlock 释放锁(只能释放自己持有的锁)
func (l *DistributedLock) Unlock(ctx context.Context) error {
result, err := releaseLockScript.Run(ctx, l.rdb, []string{l.key}, l.value).Int64()
if err != nil {
return fmt.Errorf("释放锁失败: %w", err)
}
if result == 0 {
return errors.New("锁已过期或被其他持有者释放")
}
return nil
}
// 使用示例
func criticalSection(rdb *redis.Client, resourceID string) error {
lock := NewLock(rdb, "resource:"+resourceID, 10*time.Second)
if err := lock.Lock(ctx, 3*time.Second); err != nil {
return fmt.Errorf("获取锁失败: %w", err)
}
defer lock.Unlock(ctx) // 用defer确保锁被释放
fmt.Println("执行临界区操作...")
time.Sleep(100 * time.Millisecond) // 模拟操作
fmt.Println("临界区操作完成")
return nil
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
defer rdb.Close()
err := criticalSection(rdb, "order-001")
fmt.Println("结果:", err)
}六、常用数据结构操作
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func dataStructuresDemo(rdb *redis.Client) {
// Hash:用户信息
userKey := "user:10086"
rdb.HMSet(ctx, userKey, map[string]interface{}{
"name": "老张",
"email": "laowang@example.com",
"score": 1000,
})
rdb.Expire(ctx, userKey, 24*time.Hour)
name, _ := rdb.HGet(ctx, userKey, "name").Result()
fmt.Println("name:", name)
all, _ := rdb.HGetAll(ctx, userKey).Result()
fmt.Println("所有字段:", all)
// ZSet(有序集合):排行榜
leaderboardKey := "leaderboard:game001"
rdb.ZAdd(ctx, leaderboardKey,
redis.Z{Score: 9500, Member: "playerA"},
redis.Z{Score: 8800, Member: "playerB"},
redis.Z{Score: 9200, Member: "playerC"},
redis.Z{Score: 7600, Member: "playerD"},
)
// 获取前3名(从高到低)
top3, _ := rdb.ZRevRangeWithScores(ctx, leaderboardKey, 0, 2).Result()
fmt.Println("前3名:")
for i, z := range top3 {
fmt.Printf(" 第%d名: %s (%.0f分)\n", i+1, z.Member, z.Score)
}
// 获取某玩家排名(从1开始)
rank, _ := rdb.ZRevRank(ctx, leaderboardKey, "playerC").Result()
fmt.Printf("playerC排名: 第%d名\n", rank+1)
// List:消息队列(简单场景)
queueKey := "message:queue"
rdb.LPush(ctx, queueKey, "msg1", "msg2", "msg3")
// 阻塞读取(生产者消费者)
// msg, _ := rdb.BRPop(ctx, 5*time.Second, queueKey).Result()
// 非阻塞读取
msg, _ := rdb.RPop(ctx, queueKey).Result()
fmt.Println("消费消息:", msg)
// Set:标签系统
rdb.SAdd(ctx, "article:1:tags", "Go", "并发", "后端")
rdb.SAdd(ctx, "article:2:tags", "Go", "HTTP", "Gin")
// 求交集:两篇文章的共同标签
common, _ := rdb.SInter(ctx, "article:1:tags", "article:2:tags").Result()
fmt.Println("共同标签:", common)
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
defer rdb.Close()
dataStructuresDemo(rdb)
}七、踩坑实录
坑1:忘记处理redis.Nil错误
// 错误:key不存在时,Get返回redis.Nil,不是nil
val, err := rdb.Get(ctx, "nonexistent_key").Result()
if err != nil {
fmt.Println("错误:", err) // "redis: nil",key不存在
}
// 正确:区分key不存在和其他错误
val, err = rdb.Get(ctx, "nonexistent_key").Result()
if err == redis.Nil {
fmt.Println("key不存在,使用默认值")
val = "default"
} else if err != nil {
fmt.Println("Redis错误:", err)
return
}
fmt.Println("值:", val)坑2:连接池耗尽导致请求超时
现象: 高并发时,请求报 redis: connection pool timeout。
原因: 连接池配置的 PoolSize 太小,或者有goroutine持有连接不释放(比如长时间的阻塞操作)。
解法:
- 合理配置PoolSize(通常等于goroutine峰值数量)
- 给所有Redis操作加上context超时
- 避免在Redis操作之间做长时间阻塞
// 给Redis操作加超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
val, err := rdb.Get(ctx, "key").Result()
if err != nil {
// 可能是超时,也可能是其他错误
}坑3:Key没有设置过期时间导致内存膨胀
// 危险:没有过期时间
rdb.Set(ctx, "cache:user:10086", userData, 0) // 0 = 永不过期
// 正确:缓存数据设置合理的TTL
rdb.Set(ctx, "cache:user:10086", userData, 30*time.Minute)八、Java Jedis/Lettuce vs go-redis
| 功能 | Java (Jedis/Lettuce) | go-redis |
|---|---|---|
| 连接池 | JedisPool/LettucePool | 内置连接池,Options配置 |
| Pipeline | jedis.pipelined() | rdb.Pipelined() |
| Lua脚本 | jedis.eval() | redis.NewScript() |
| 集群支持 | JedisCluster | redis.NewClusterClient() |
| Sentinel | JedisSentinelPool | redis.NewFailoverClient() |
| 异步操作 | Lettuce异步API | 天然goroutine |
| 发布订阅 | jedis.subscribe() | rdb.Subscribe() |
九、总结
go-redis使用的几个要点:
- 连接池配置:PoolSize、MinIdleConns、ConnMaxIdleTime都要配置
- Pipeline:批量操作时必用,减少网络往返
- Lua脚本:需要原子操作时使用,比WATCH+事务更简洁
- 分布式锁:SetNX + Lua释放,确保只能释放自己的锁
- 错误处理:redis.Nil和其他错误要区分处理
- TTL:缓存数据一定要设置过期时间
小方的积分扣减问题用Lua脚本解决后,连续并发100个请求测试,积分从来没有变成负数,问题彻底解决。
