Sentinel流量控制源码:滑动时间窗口算法的Java实现
Sentinel流量控制源码:滑动时间窗口算法的Java实现
适读人群:使用Sentinel做流量控制、想了解底层计数原理的Java开发者 | 阅读时长:约19分钟
开篇故事
接触 Sentinel 之前,我们限流用的是最简单的固定窗口计数:用 Redis 记每分钟的请求数,超过阈值就拒绝。
上线没多久,有人反馈说某些请求会无故被限流,明明流量不大。后来我分析了日志,发现问题在于固定窗口的边界效应:第59秒突发了 80 次请求,第 61 秒又来了 80 次,在不同的统计窗口里各自不超限,但实际上两秒内发生了 160 次请求,后端服务已经很吃力了。
固定窗口限流的本质缺陷:窗口切换瞬间会有流量峰值。
Sentinel 用了滑动时间窗口算法,把时间窗口分成多个小桶(Bucket),每次统计时只看最近一段时间内的所有桶,避免了边界问题。今天把这个算法拆开来讲。
一、固定窗口 vs 滑动时间窗口
1.1 固定窗口的问题
时间轴: [0s ---- 60s] [60s ---- 120s]
请求: 80次 80次
第59s~第61s:两秒内160次请求,但两个窗口各自都没超限!1.2 滑动时间窗口原理
Sentinel 的实现:把 60 秒窗口分成 6 个 10 秒的 Bucket,每个 Bucket 记录该时间片内的请求统计。计算当前时间窗口内的总量时,只累加最近 60 秒内的有效 Bucket。
二、Sentinel 滑动窗口源码解析
2.1 核心数据结构
// Sentinel 的 LeapArray:环形数组实现滑动窗口
// sampleCount: Bucket数量(窗口分成几份)
// intervalInMs: 整个窗口时间(毫秒)
// 每个Bucket代表 intervalInMs/sampleCount 毫秒的时间片
public abstract class LeapArray<T> {
// 每个Bucket覆盖的毫秒数
protected int windowLengthInMs;
// Bucket数量
protected int sampleCount;
// 整个窗口毫秒数
protected int intervalInMs;
// 底层数组(环形),索引取模复用
protected final AtomicReferenceArray<WindowWrap<T>> array;
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.sampleCount = sampleCount;
this.intervalInMs = intervalInMs;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* 获取当前时间所在的Bucket(核心方法)
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算当前时间对应的数组索引
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间所在Bucket的起始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 该位置还没有Bucket,创建新的
WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
}
// CAS失败,说明别的线程抢先创建了,重试
Thread.yield();
} else if (windowStart == old.windowStart()) {
// 时间窗口吻合,直接返回已有的Bucket
return old;
} else if (windowStart > old.windowStart()) {
// 当前时间超过了这个Bucket的时间范围,需要重置
if (updateLock.tryLock()) {
try {
// 重置Bucket为新时间窗口
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 时钟回拨(不正常情况)
return new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % sampleCount); // 取模,实现环形复用
}
}三、手写滑动时间窗口限流
理解了 Sentinel 的原理,我们自己实现一个:
3.1 Bucket 数据结构
package com.laozhang.sentinel.slidingwindow;
import java.util.concurrent.atomic.AtomicLong;
/**
* 时间槽(桶)
* 记录一个时间片段内的请求统计
*/
public class Bucket {
/** 桶的起始时间戳(毫秒)*/
private volatile long startTime;
/** 桶内的请求数 */
private final AtomicLong passCount = new AtomicLong(0);
/** 桶内被拒绝的请求数 */
private final AtomicLong blockCount = new AtomicLong(0);
public Bucket(long startTime) {
this.startTime = startTime;
}
public long getStartTime() {
return startTime;
}
public long getPassCount() {
return passCount.get();
}
public long getBlockCount() {
return blockCount.get();
}
public void incrementPass() {
passCount.incrementAndGet();
}
public void incrementBlock() {
blockCount.incrementAndGet();
}
/**
* 重置桶(复用桶对象,避免频繁GC)
*/
public void reset(long newStartTime) {
this.startTime = newStartTime;
this.passCount.set(0);
this.blockCount.set(0);
}
}3.2 滑动时间窗口实现
package com.laozhang.sentinel.slidingwindow;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
/**
* 滑动时间窗口计数器
*
* @param windowSizeMs 整个时间窗口大小(毫秒),例如 60000
* @param bucketCount 分成多少个桶,例如 6(每桶 10 秒)
*/
public class SlidingWindowCounter {
/** 每个桶覆盖的时间(毫秒)*/
private final long bucketSizeMs;
/** 桶的数量 */
private final int bucketCount;
/** 整个窗口大小(毫秒)*/
private final long windowSizeMs;
/** 环形数组 */
private final AtomicReferenceArray<Bucket> buckets;
/** 用于更新过期桶的锁 */
private final ReentrantLock updateLock = new ReentrantLock();
public SlidingWindowCounter(long windowSizeMs, int bucketCount) {
if (windowSizeMs % bucketCount != 0) {
throw new IllegalArgumentException("windowSizeMs 必须能被 bucketCount 整除");
}
this.windowSizeMs = windowSizeMs;
this.bucketCount = bucketCount;
this.bucketSizeMs = windowSizeMs / bucketCount;
this.buckets = new AtomicReferenceArray<>(bucketCount);
}
/**
* 获取当前时间对应的桶
*/
private Bucket getCurrentBucket() {
long now = System.currentTimeMillis();
int idx = (int) ((now / bucketSizeMs) % bucketCount);
long bucketStartTime = (now / bucketSizeMs) * bucketSizeMs;
while (true) {
Bucket existing = buckets.get(idx);
if (existing == null) {
// 位置为空,创建新桶
Bucket newBucket = new Bucket(bucketStartTime);
if (buckets.compareAndSet(idx, null, newBucket)) {
return newBucket;
}
Thread.yield();
} else if (bucketStartTime == existing.getStartTime()) {
// 时间窗口匹配,直接用
return existing;
} else if (bucketStartTime > existing.getStartTime()) {
// 桶已过期,需要重置
if (updateLock.tryLock()) {
try {
existing.reset(bucketStartTime);
return existing;
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else {
// 时钟回拨,返回新桶(不写入数组)
return new Bucket(bucketStartTime);
}
}
}
/**
* 记录一次请求通过
*/
public void recordPass() {
getCurrentBucket().incrementPass();
}
/**
* 获取当前滑动窗口内的通过总数
*/
public long getWindowPassCount() {
long now = System.currentTimeMillis();
long windowStart = now - windowSizeMs;
long total = 0;
for (int i = 0; i < bucketCount; i++) {
Bucket bucket = buckets.get(i);
if (bucket != null && bucket.getStartTime() > windowStart) {
total += bucket.getPassCount();
}
}
return total;
}
}3.3 基于滑动窗口的限流器
package com.laozhang.sentinel.slidingwindow;
import java.util.concurrent.ConcurrentHashMap;
/**
* 基于滑动时间窗口的限流器
*/
public class SlidingWindowRateLimiter {
/** 每个资源对应一个计数器 */
private final ConcurrentHashMap<String, SlidingWindowCounter> counters =
new ConcurrentHashMap<>();
/** 时间窗口大小(毫秒)*/
private final long windowSizeMs;
/** 桶数量 */
private final int bucketCount;
/** 最大请求数 */
private final long maxRequests;
public SlidingWindowRateLimiter(long windowSizeMs, int bucketCount, long maxRequests) {
this.windowSizeMs = windowSizeMs;
this.bucketCount = bucketCount;
this.maxRequests = maxRequests;
}
/**
* 尝试通过(返回true表示允许,false表示被限流)
*/
public boolean tryAcquire(String resource) {
SlidingWindowCounter counter = counters.computeIfAbsent(resource,
k -> new SlidingWindowCounter(windowSizeMs, bucketCount));
long currentCount = counter.getWindowPassCount();
if (currentCount >= maxRequests) {
return false;
}
// 允许通过,记录本次请求
counter.recordPass();
return true;
}
/**
* 获取当前窗口内的请求数
*/
public long getCurrentCount(String resource) {
SlidingWindowCounter counter = counters.get(resource);
return counter == null ? 0 : counter.getWindowPassCount();
}
}3.4 单元测试验证
package com.laozhang.sentinel.slidingwindow;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class SlidingWindowRateLimiterTest {
@Test
void testBasicRateLimit() throws InterruptedException {
// 创建限流器:60秒窗口,分6桶,最多100次请求
SlidingWindowRateLimiter limiter =
new SlidingWindowRateLimiter(60_000, 6, 100);
// 前100次应该通过
int passed = 0;
for (int i = 0; i < 100; i++) {
if (limiter.tryAcquire("test-api")) {
passed++;
}
}
assertEquals(100, passed, "前100次请求应该全部通过");
// 第101次应该被限流
assertFalse(limiter.tryAcquire("test-api"), "第101次请求应该被限流");
}
@Test
void testSlidingWindowEffect() throws InterruptedException {
// 使用更短的窗口来测试(1秒窗口,分10桶,每桶100ms)
SlidingWindowRateLimiter limiter =
new SlidingWindowRateLimiter(1_000, 10, 50);
// 发送50次请求
for (int i = 0; i < 50; i++) {
assertTrue(limiter.tryAcquire("test"));
}
// 等待1.1秒,老窗口的请求应该滑出
Thread.sleep(1100);
// 应该可以再次请求
assertTrue(limiter.tryAcquire("test"), "1秒后应该可以重新请求");
}
}四、踩坑实录
坑1:Sentinel 控制台规则不持久化
症状:在 Sentinel 控制台配置了限流规则,服务重启后规则消失。
根因:Sentinel 默认把规则存在内存里,重启丢失。
解决方案:集成 Nacos 做规则持久化:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>spring:
cloud:
sentinel:
datasource:
flow-rules:
nacos:
server-addr: ${nacos.server-addr}
data-id: ${spring.application.name}-flow-rules
group-id: SENTINEL_GROUP
data-type: json
rule-type: flow坑2:@SentinelResource 的异常不触发 blockHandler
症状:blockHandler 方法配了但没有执行,直接抛出异常了。
根因:blockHandler 只处理 BlockException(被 Sentinel 限流/熔断时抛出的异常);业务代码自己抛出的异常用 fallback 处理,不走 blockHandler。
@SentinelResource(
value = "getUserById",
blockHandler = "getUserByIdBlock", // 处理BlockException(限流/熔断)
fallback = "getUserByIdFallback" // 处理业务异常
)
public UserDTO getUserById(Long id) { ... }
public UserDTO getUserByIdBlock(Long id, BlockException ex) {
// 被Sentinel限流或熔断时执行
return UserDTO.defaultUser(id);
}
public UserDTO getUserByIdFallback(Long id, Throwable t) {
// 业务方法抛出异常时执行
return UserDTO.defaultUser(id);
}坑3:滑动窗口统计不准,多线程下计数丢失
症状:自己实现的滑动窗口,压测时发现实际通过的请求数超过了阈值。
根因:getWindowPassCount() 和 recordPass() 不是原子操作,存在 TOCTOU(检查时间和使用时间不一致)问题:
// 错误:判断和增加不是原子操作
long count = counter.getWindowPassCount(); // 线程A和B都读到 99
if (count < maxRequests) { // 都判断 99 < 100,都通过
counter.recordPass(); // 最终count变成 101
}解决方案:用 LongAdder 或原子操作,或者接受一定的超发(对于软限流场景可以接受):
// 方式1:接受少量超发(适合不精确的软限流)
// 方式2:用 Redis + Lua 做原子性限流(精确)
// 方式3:令牌桶算法,用AtomicLong原子地扣减令牌坑4:Sentinel 熔断后不恢复(探测请求始终失败)
症状:服务恢复后,Sentinel 一直处于熔断状态,流量无法恢复。
根因:熔断恢复期(Half-Open 状态)发送的探测请求失败了,导致重新进入 Open 状态,永远无法恢复。
排查:检查探测请求失败的原因(超时配置、服务未完全启动等),或者适当调大 slowRatioThreshold(慢调用占比阈值)。
五、总结与延伸
滑动时间窗口的核心思想:
- 把时间窗口分成多个等长的 Bucket
- 用环形数组存储 Bucket,到达末尾时复用(重置)旧 Bucket
- 统计时只累加时间范围内的 Bucket,过期的自然被排除
对比三种限流算法:
| 算法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 固定窗口 | 简单,内存小 | 边界突刺 | 精度要求不高 |
| 滑动时间窗口 | 无边界突刺,统计精准 | 内存稍大(需存多个Bucket) | 通用限流 |
| 令牌桶 | 允许突发(桶里有令牌就放行) | 实现复杂 | 需要允许短时突发的场景 |
| 漏桶 | 严格匀速,无突发 | 有延迟,不灵活 | 需要严格匀速输出的场景 |
Sentinel 同时支持这几种算法,通过 FlowRule.controlBehavior 选择。
