Go 限流算法实战——令牌桶、漏桶、滑动窗口的代码实现与对比
Go 限流算法实战——令牌桶、漏桶、滑动窗口的代码实现与对比
适读人群:需要在 Go 服务中实现限流的工程师 | 阅读时长:约17分钟 | 核心价值:三种限流算法的实现原理和使用场景,加上生产踩坑
那个差点被刷爆的接口
大概半年前,我们的开放 API 遭遇了一次异常流量——一个接入方的 SDK 有 bug,在某个错误情况下会无限重试,每秒给我们发了 8400 个请求(正常应该是 50-100 QPS)。
那时候我们只有一个简单的固定窗口计数限流,阈值设的是每分钟 3000 次请求。然而固定窗口有个致命缺陷:在窗口切换的瞬间,可以允许接近 2 倍的突发流量通过。那8400 QPS 的流量来了整整 17 秒,后端 PostgreSQL 的连接数被打满,整个服务响应时间飙到了 6800ms。
那次之后,我把限流这块认认真真重新设计了一遍。
三种算法的本质差异
| 算法 | 允许突发 | 平滑程度 | 实现复杂度 | 适合场景 |
|---|---|---|---|---|
| 固定窗口 | 是(窗口边界2倍突发) | 差 | 简单 | 粗粒度统计 |
| 滑动窗口 | 有限 | 中等 | 中等 | 精确限流 |
| 漏桶 | 否(强制均匀) | 最好 | 中等 | 保护下游稳定输入 |
| 令牌桶 | 是(受桶容量限制) | 好 | 中等 | API 限流(兼顾正常突发) |
令牌桶实现
令牌桶是 API 限流的最佳选择:以固定速率产生令牌,请求消耗令牌,桶满时多余的令牌丢弃。允许短时突发(桶里积攒的令牌),但突发量受桶容量限制。
Go 标准库的 golang.org/x/time/rate 就是令牌桶实现,可以直接用。但理解原理很重要,这里手写一个简化版:
package ratelimit
import (
"context"
"sync"
"time"
)
// TokenBucket 令牌桶限流器
type TokenBucket struct {
mu sync.Mutex
rate float64 // 每秒产生的令牌数
capacity float64 // 桶的容量(最大突发量)
tokens float64 // 当前令牌数
lastTime time.Time // 上次填充时间
}
// NewTokenBucket 创建令牌桶
// rate: 每秒令牌数(即 QPS 限制)
// burst: 允许的最大突发量(桶容量)
func NewTokenBucket(rate, burst float64) *TokenBucket {
return &TokenBucket{
rate: rate,
capacity: burst,
tokens: burst, // 初始时桶是满的
lastTime: time.Now(),
}
}
// Allow 尝试消耗1个令牌,不阻塞
// 返回 true 表示允许,false 表示被限流
func (tb *TokenBucket) Allow() bool {
return tb.AllowN(1)
}
// AllowN 尝试消耗 n 个令牌
func (tb *TokenBucket) AllowN(n float64) bool {
tb.mu.Lock()
defer tb.mu.Unlock()
// 根据时间差补充令牌
now := time.Now()
elapsed := now.Sub(tb.lastTime).Seconds()
tb.tokens += elapsed * tb.rate
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity // 不超过桶容量
}
tb.lastTime = now
// 检查令牌是否足够
if tb.tokens < n {
return false
}
tb.tokens -= n
return true
}
// Wait 等待直到令牌可用(阻塞版本)
func (tb *TokenBucket) Wait(ctx context.Context) error {
for {
if tb.Allow() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Duration(1000/tb.rate) * time.Millisecond):
// 估算等待时间后重试
}
}
}
// SetRate 动态调整速率(支持热更新)
func (tb *TokenBucket) SetRate(rate float64) {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.rate = rate
}滑动窗口实现
滑动窗口解决了固定窗口的边界突发问题,通过维护细粒度的时间槽来统计请求数:
package ratelimit
import (
"sync"
"time"
)
// SlidingWindow 滑动窗口限流器
type SlidingWindow struct {
mu sync.Mutex
windowSize time.Duration // 窗口大小
limit int // 窗口内最大请求数
buckets []int // 细粒度时间槽的计数
bucketSize time.Duration // 每个槽的时间粒度
lastBucket int // 上一次使用的槽索引
lastTime time.Time // 上一次记录时间
}
// NewSlidingWindow 创建滑动窗口
// windowSize: 窗口大小(如 1 * time.Second)
// limit: 窗口内最大请求数
// precision: 精度,窗口被划分为几个槽(槽越多精度越高,内存开销越大)
func NewSlidingWindow(windowSize time.Duration, limit, precision int) *SlidingWindow {
bucketSize := windowSize / time.Duration(precision)
return &SlidingWindow{
windowSize: windowSize,
limit: limit,
buckets: make([]int, precision),
bucketSize: bucketSize,
lastTime: time.Now(),
}
}
// Allow 判断当前请求是否被允许
func (sw *SlidingWindow) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now()
currentBucket := int(now.UnixNano()/int64(sw.bucketSize)) % len(sw.buckets)
// 清除过期的时间槽
sw.clearExpired(now, currentBucket)
// 统计窗口内的总请求数
total := 0
for _, count := range sw.buckets {
total += count
}
if total >= sw.limit {
return false
}
sw.buckets[currentBucket]++
sw.lastBucket = currentBucket
sw.lastTime = now
return true
}
func (sw *SlidingWindow) clearExpired(now time.Time, currentBucket int) {
// 计算需要清除多少个槽
elapsed := now.Sub(sw.lastTime)
bucketsToClean := int(elapsed / sw.bucketSize)
if bucketsToClean > len(sw.buckets) {
bucketsToClean = len(sw.buckets)
}
// 从上一个槽开始,向前清除
for i := 1; i <= bucketsToClean; i++ {
idx := (sw.lastBucket + i) % len(sw.buckets)
sw.buckets[idx] = 0
}
}漏桶实现
漏桶强制请求以固定速率流出,适合保护下游系统接受稳定的请求速率:
package ratelimit
import (
"context"
"sync"
"time"
)
// LeakyBucket 漏桶限流器
type LeakyBucket struct {
mu sync.Mutex
rate float64 // 漏出速率(请求/秒)
capacity int // 桶容量(最大排队数)
queue chan struct{} // 排队通道
lastLeakAt time.Time
}
// NewLeakyBucket 创建漏桶
func NewLeakyBucket(rate float64, capacity int) *LeakyBucket {
lb := &LeakyBucket{
rate: rate,
capacity: capacity,
queue: make(chan struct{}, capacity),
lastLeakAt: time.Now(),
}
go lb.leak()
return lb
}
// leak 以固定速率"漏水"
func (lb *LeakyBucket) leak() {
interval := time.Duration(float64(time.Second) / lb.rate)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
select {
case <-lb.queue:
// 漏出一个请求(允许一个请求通过)
default:
// 桶空了,等下一个 tick
}
}
}
// Allow 将请求放入漏桶,桶满则拒绝
func (lb *LeakyBucket) Allow() bool {
select {
case lb.queue <- struct{}{}:
// 等待被 leak goroutine 处理
// 注意:这里只是入队,实际通过是 leak goroutine 决定的
// 这个简化实现不是完整的漏桶
return true
default:
return false
}
}踩坑实录
坑一:用标准库 rate.Limiter 做分布式限流,实际没有效果
现象: 设了每秒 1000 QPS 的限制,但实际通过的 QPS 是 1000 × 实例数。
原因: golang.org/x/time/rate 是进程内限流,每个实例有自己独立的令牌桶。3个实例就是 3000 QPS 的实际吞吐。
解法: 分布式限流必须用 Redis 做共享状态。用 Lua 脚本在 Redis 里实现令牌桶:
// 分布式令牌桶:Lua 脚本在 Redis 里原子执行
var tokenBucketScript = redis.NewScript(`
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2]) -- 每秒产生的令牌数
local now = tonumber(ARGV[3]) -- 当前时间戳(毫秒)
local requested = tonumber(ARGV[4]) -- 请求的令牌数
local last_time = tonumber(redis.call("HGET", key, "last_time") or now)
local tokens = tonumber(redis.call("HGET", key, "tokens") or capacity)
-- 根据时间差补充令牌
local elapsed = (now - last_time) / 1000.0
tokens = math.min(capacity, tokens + elapsed * rate)
-- 检查是否有足够令牌
if tokens >= requested then
tokens = tokens - requested
redis.call("HSET", key, "tokens", tokens, "last_time", now)
redis.call("EXPIRE", key, math.ceil(capacity / rate) + 1)
return 1
else
redis.call("HSET", key, "tokens", tokens, "last_time", now)
return 0
end
`)坑二:令牌桶的 burst 设太小,正常业务高峰被限流了
现象: 早上 9 点上班高峰,用户集中登录,1分钟内触发了限流,但这是正常的业务峰值。
原因: burst(桶容量)设的是 100,rate 是 500 QPS,但早高峰瞬时请求是 1800 QPS,桶里的令牌在 0.2 秒内就被耗尽。
解法: burst 要根据业务高峰特征设置,不是越小越好。我们最终设 burst = rate × 3(允许 3 秒的突发积累)。
坑三:每个用户独立的限流器没有清理,内存泄漏
现象: 服务运行 48 小时后,内存从 200MB 涨到了 1.8GB。
原因: 我用了 map[string]*TokenBucket 存每个用户的限流器,map 只增不减,历史用户的限流器一直留在内存里。
解法: 用带 TTL 的缓存来存限流器,不活跃的用户的限流器自动过期删除。
如何选择
- 对外 API 限流:令牌桶,允许合理突发,用 Redis 做分布式版本
- 下游系统保护:漏桶,强制均匀输出,避免冲击数据库
- 精确统计(如计费):滑动窗口,最准确,没有边界问题
- 日志、监控等非核心流量:固定窗口够用,简单
