API网关限流:令牌桶、漏桶、滑动窗口的代码实现与Redis整合
API网关限流:令牌桶、漏桶、滑动窗口的代码实现与Redis整合
适读人群:中高级Java工程师 | 阅读时长:约20分钟 | 技术栈:Spring Boot 3.x、Spring Cloud Gateway、Redis、Lua
开篇故事
2021年黑色星期五,我们的电商 API 网关在晚上8点被一波突发流量打崩了。
事情的经过是这样的:我们对一个热门商品做了推广活动,但没想到一个大 V 博主在晚上8点整发了一条带链接的微博,粉丝疯狂涌入。原本预计的峰值是 5000 QPS,实际在 8 点整的那一分钟内,QPS 飙到了 47000。
我们有限流配置,但用的是固定窗口算法(每分钟最多 300000 次请求,等于 5000 QPS),固定窗口在临界点会有双倍流量的问题:7:59:59 打进来 300000 个请求,8:00:00 计数器清零,又打进来 300000 个请求,两秒内处理了 600000 个请求,实际是 300000 QPS,是设计容量的 60 倍。后端服务集体超时,数据库连接耗尽,恢复用了约 20 分钟。
那次事故让我深刻理解了几种限流算法的本质差异,尤其是固定窗口的临界问题。
一、核心问题分析
限流算法要解决的核心问题是:如何以平滑、可控的方式拒绝超额请求,同时在允许突发的场景下不过度限制正常流量。
固定窗口(时间窗口计数):最简单,但有临界问题,两个窗口交界处可能出现双倍流量。 滑动窗口:解决了临界问题,但需要存储每个请求的时间戳,内存开销大。 漏桶:请求以固定速率流出,绝对平滑,不允许突发。 令牌桶:以固定速率生产令牌,允许在令牌积累时处理突发流量,是目前最常用的方案。
二、原理深度解析
令牌桶算法
令牌桶是一个"桶",系统以固定速率(如每秒 1000 个)往桶里放令牌,桶有最大容量(如 2000 个)。每个请求来时取一个令牌,取到令牌则放行,取不到则拒绝。
令牌桶的关键特性:桶中积累的令牌允许突发流量,系统空闲时积累了 2000 个令牌,突发 2000 个请求时全部放行。之后恢复到每秒 1000 个的稳定速率。
漏桶算法
漏桶的请求进入队列,以固定速率(如每秒 1000 个)流出处理。队列满了则丢弃。
漏桶的本质是固定速率输出,完全不允许突发。适合保护下游不稳定的系统(不管上游流量多大,对下游始终是匀速)。
滑动窗口算法
滑动窗口不以固定的时间点切割窗口,而是以"当前时间往前 N 秒"作为统计窗口,随时间滑动。每次请求来时,统计过去 N 秒内的请求总数,超过阈值则拒绝。
滑动窗口解决了固定窗口的临界问题,但实现复杂,常见实现是用 Redis Sorted Set 存储请求时间戳。
三、完整代码实现
令牌桶(Guava RateLimiter 本地版)
@Service
@Slf4j
public class TokenBucketRateLimiter {
// 每个 API 路径对应一个令牌桶
private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();
private final RateLimiterConfigService configService;
public TokenBucketRateLimiter(RateLimiterConfigService configService) {
this.configService = configService;
}
/**
* 尝试获取令牌(非阻塞)
* @param apiPath API 路径
* @return true 表示获取成功,可以放行
*/
public boolean tryAcquire(String apiPath) {
RateLimiter rateLimiter = rateLimiters.computeIfAbsent(apiPath, path -> {
double qps = configService.getQpsLimit(path);
return RateLimiter.create(qps);
});
boolean acquired = rateLimiter.tryAcquire();
if (!acquired) {
log.warn("限流触发,apiPath={}, qps={}", apiPath,
configService.getQpsLimit(apiPath));
}
return acquired;
}
/**
* 动态更新 QPS 限制
*/
public void updateQpsLimit(String apiPath, double newQps) {
rateLimiters.compute(apiPath, (path, existing) -> {
if (existing == null) {
return RateLimiter.create(newQps);
}
existing.setRate(newQps);
return existing;
});
log.info("更新 QPS 限制,apiPath={}, newQps={}", apiPath, newQps);
}
}基于 Redis 的令牌桶(分布式版)
Lua 脚本保证原子性:
-- 令牌桶 Lua 脚本
-- KEYS[1]: 令牌桶 key
-- ARGV[1]: 令牌生产速率(每秒)
-- ARGV[2]: 桶的最大容量
-- ARGV[3]: 当前时间戳(毫秒)
-- ARGV[4]: 每次消耗的令牌数(通常为1)
-- 返回:1 表示允许,0 表示拒绝
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill_time')
local tokens = tonumber(bucket[1])
local last_refill_time = tonumber(bucket[2])
if tokens == nil then
tokens = capacity
last_refill_time = now
end
-- 计算自上次填充以来产生的新令牌
local elapsed = math.max(0, now - last_refill_time)
local new_tokens = elapsed * rate / 1000.0
tokens = math.min(capacity, tokens + new_tokens)
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end
redis.call('HMSET', key, 'tokens', tokens, 'last_refill_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate) + 1)
return allowed@Service
@Slf4j
public class RedisTokenBucketRateLimiter {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String TOKEN_BUCKET_SCRIPT;
static {
try {
TOKEN_BUCKET_SCRIPT = new String(
RedisTokenBucketRateLimiter.class
.getResourceAsStream("/lua/token_bucket.lua")
.readAllBytes()
);
} catch (Exception e) {
throw new RuntimeException("加载 Lua 脚本失败", e);
}
}
private final DefaultRedisScript<Long> redisScript;
public RedisTokenBucketRateLimiter() {
this.redisScript = new DefaultRedisScript<>();
this.redisScript.setScriptText(TOKEN_BUCKET_SCRIPT);
this.redisScript.setResultType(Long.class);
}
/**
* 分布式令牌桶限流
* @param key 限流 key(如 api:path:/order/create)
* @param rate 令牌生产速率(QPS)
* @param capacity 桶的最大容量(允许的最大突发量)
*/
public boolean tryAcquire(String key, double rate, int capacity) {
String bucketKey = "rate:token_bucket:" + key;
long now = System.currentTimeMillis();
Long result = redisTemplate.execute(
redisScript,
Collections.singletonList(bucketKey),
String.valueOf(rate),
String.valueOf(capacity),
String.valueOf(now),
"1"
);
return Long.valueOf(1L).equals(result);
}
}基于 Redis Sorted Set 的滑动窗口
@Service
@Slf4j
public class RedisSlidingWindowRateLimiter {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String SLIDING_WINDOW_SCRIPT =
"local key = KEYS[1] " +
"local now = tonumber(ARGV[1]) " +
"local window = tonumber(ARGV[2]) " +
"local limit = tonumber(ARGV[3]) " +
"local expire_time = now - window " +
// 删除窗口之前的请求记录
"redis.call('ZREMRANGEBYSCORE', key, '-inf', expire_time) " +
// 统计窗口内的请求数
"local count = redis.call('ZCARD', key) " +
"if count < limit then " +
" redis.call('ZADD', key, now, now .. '-' .. math.random(100000)) " +
" redis.call('EXPIRE', key, math.ceil(window / 1000) + 1) " +
" return 1 " +
"else " +
" return 0 " +
"end";
private final DefaultRedisScript<Long> slidingWindowScript;
public RedisSlidingWindowRateLimiter() {
this.slidingWindowScript = new DefaultRedisScript<>();
this.slidingWindowScript.setScriptText(SLIDING_WINDOW_SCRIPT);
this.slidingWindowScript.setResultType(Long.class);
}
/**
* 滑动窗口限流
* @param key 限流 key
* @param windowMs 窗口大小(毫秒)
* @param maxRequests 窗口内最大请求数
*/
public boolean tryAcquire(String key, long windowMs, int maxRequests) {
String windowKey = "rate:sliding_window:" + key;
long now = System.currentTimeMillis();
Long result = redisTemplate.execute(
slidingWindowScript,
Collections.singletonList(windowKey),
String.valueOf(now),
String.valueOf(windowMs),
String.valueOf(maxRequests)
);
return Long.valueOf(1L).equals(result);
}
}Spring Cloud Gateway 限流过滤器
@Component
@Slf4j
public class RateLimitGatewayFilter implements GlobalFilter, Ordered {
@Autowired
private RedisTokenBucketRateLimiter tokenBucketLimiter;
@Autowired
private RateLimiterConfigService configService;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String path = request.getPath().value();
String clientIp = getClientIp(request);
// 构建限流 key:按接口路径 + 客户端 IP 限流
String rateLimitKey = path + ":" + clientIp;
RateLimiterRule rule = configService.getRule(path);
if (rule == null) {
return chain.filter(exchange);
}
boolean allowed = tokenBucketLimiter.tryAcquire(
rateLimitKey,
rule.getQps(),
rule.getBurstCapacity()
);
if (!allowed) {
log.warn("请求被限流,path={}, clientIp={}, qps={}", path, clientIp, rule.getQps());
return writeRateLimitResponse(exchange);
}
return chain.filter(exchange);
}
private Mono<Void> writeRateLimitResponse(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
String body = "{\"code\":429,\"message\":\"请求过于频繁,请稍后再试\"}";
DataBuffer buffer = response.bufferFactory()
.wrap(body.getBytes(StandardCharsets.UTF_8));
return response.writeWith(Mono.just(buffer));
}
private String getClientIp(ServerHttpRequest request) {
String ip = request.getHeaders().getFirst("X-Real-IP");
if (ip == null || ip.isBlank()) {
ip = request.getHeaders().getFirst("X-Forwarded-For");
if (ip != null && ip.contains(",")) {
ip = ip.split(",")[0].trim();
}
}
if (ip == null || ip.isBlank()) {
ip = request.getRemoteAddress() != null
? request.getRemoteAddress().getAddress().getHostAddress()
: "unknown";
}
return ip;
}
@Override
public int getOrder() {
return -100; // 限流过滤器要在路由过滤器之前执行
}
}限流规则配置(支持动态更新)
@Component
@RefreshScope // Nacos 动态刷新
@ConfigurationProperties(prefix = "rate-limiter")
@Getter
@Setter
public class RateLimiterConfigService {
// 全局默认规则
private int defaultQps = 1000;
private int defaultBurstCapacity = 2000;
// 接口级规则
private Map<String, RateLimiterRule> rules = new HashMap<>();
public RateLimiterRule getRule(String path) {
// 精确匹配
if (rules.containsKey(path)) {
return rules.get(path);
}
// 前缀匹配
return rules.entrySet().stream()
.filter(e -> path.startsWith(e.getKey()))
.findFirst()
.map(Map.Entry::getValue)
.orElse(new RateLimiterRule(defaultQps, defaultBurstCapacity));
}
public double getQpsLimit(String path) {
RateLimiterRule rule = getRule(path);
return rule != null ? rule.getQps() : defaultQps;
}
}# Nacos 配置
rate-limiter:
default-qps: 1000
default-burst-capacity: 2000
rules:
"/api/order/create":
qps: 500
burst-capacity: 1000
"/api/product/detail":
qps: 5000
burst-capacity: 10000
"/api/user/login":
qps: 200
burst-capacity: 400四、生产调优与配置
限流维度选择
限流维度决定了限流的粒度:
全局限流:保护整个系统,key = "global",简单但不公平(一个用户疯狂请求会影响所有用户)。
接口维度:按 API 路径限流,key = path,保护特定接口,推荐基础配置。
用户维度:按 userId 或 clientIp 限流,key = path + userId,防止单用户滥用,更精细但 key 数量多,Redis 内存开销大。
租户维度:多租户系统按 tenantId 限流,为不同租户提供不同的配额。
生产建议:接口维度 + 用户维度双重限流,接口维度做整体保护,用户维度防止个别用户滥用。
Redis Lua 脚本的性能
Redis 执行 Lua 脚本是单线程原子的,不需要额外的分布式锁,性能很好。但要注意:
Lua 脚本执行时间过长会阻塞 Redis,导致其他操作超时。令牌桶脚本是 O(1) 的,滑动窗口脚本的 ZREMRANGEBYSCORE 是 O(log N + M),M 是被删除的元素数,正常情况下很快,但如果窗口内积累了大量元素(比如 100 万个请求),可能会慢。
建议对 Sorted Set 的大小做上限控制。
五、踩坑实录
坑一:固定窗口的临界问题(开篇故事的本质)
固定窗口在窗口切换时会重置计数,导致两个窗口的交界处实际流量是限制值的两倍。换成滑动窗口后,这个问题彻底解决。但滑动窗口 Redis Sorted Set 的内存开销是固定窗口的 N 倍(N = 窗口内请求数),对于高并发接口,选令牌桶更合适。
坑二:Redis 故障时的降级策略
Redis 挂了,限流逻辑无法执行,有两种策略:
降级放行(fail open):Redis 故障时不限流,直接放行所有请求。好处是服务可用性高,缺点是 Redis 故障时可能被刷爆。
降级拒绝(fail closed):Redis 故障时拒绝所有请求。好处是保护了下游,缺点是服务不可用。
我们的做法是分级处理:核心支付接口 fail closed,其他接口 fail open,同时用本地 Guava RateLimiter 做兜底,Redis 故障时自动切到本地限流。
坑三:令牌桶容量设置不当导致突发流量穿透
我们设了 QPS=500,但 burst_capacity 设了 5000(积累 10 秒的量),结果在系统空闲 10 秒后,一波 5000 个请求瞬间都被放行,后端数据库被打了 5000 QPS,触发了连接池超时。
教训:burst_capacity 不应该设太大,推荐是 QPS 的 2-3 倍,而不是 10 倍。给系统留有足够的突发空间,但不能允许无限突发。
六、总结
三种算法的选型建议:
令牌桶:最推荐,允许合理突发,适合大多数 API 限流场景。burst_capacity = QPS * 2~3。
滑动窗口:解决了固定窗口的临界问题,适合需要精确统计的场景(如每小时最多 N 次),但内存开销大。
漏桶:适合保护下游不稳定的系统,绝对平滑输出。通常在消息队列消费侧使用,而不是 API 网关。
无论哪种算法,生产必须考虑:Redis 故障降级策略、限流维度(全局/接口/用户)、动态调整(Nacos 配置中心),以及完善的限流监控(被拒绝的请求数、拒绝率、按接口统计)。
