Spring Cloud Gateway源码:路由断言、全局过滤器与自定义Filter
Spring Cloud Gateway源码:路由断言、全局过滤器与自定义Filter
适读人群:使用Spring Cloud Gateway做API网关的Java开发者 | 阅读时长:约20分钟
开篇故事
我们公司的网关是用 Spring Cloud Gateway 搭的,有一天运营说某个接口被刷了,一分钟几十万次请求,后端服务直接被打挂了。
当时我花了大概两小时,在网关层加了一个全局限流过滤器:读请求路径和 IP,打到 Redis 令牌桶,超阈值直接返回 429。
那次事故之后,我把 Spring Cloud Gateway 的源码翻了一遍,发现它的设计非常精妙——基于 Reactor 的响应式编程,整个请求处理链都是非阻塞的。今天把路由断言、过滤器机制以及自定义过滤器的完整实现分享出来。
一、Spring Cloud Gateway 整体架构
1.1 请求处理流程
1.2 核心组件
| 组件 | 职责 |
|---|---|
RouteDefinitionLocator | 从配置文件/注册中心加载路由定义 |
RoutePredicateFactory | 创建断言(Predicate),判断请求是否匹配路由 |
GatewayFilterFactory | 创建网关过滤器(针对特定路由) |
GlobalFilter | 全局过滤器(对所有路由生效) |
FilteringWebHandler | 组装过滤器链并执行 |
二、路由断言深度解析
2.1 断言(Predicate)的本质
每个 RoutePredicateFactory 生成的是一个 AsyncPredicate<ServerWebExchange> —— 本质上就是一个判断函数:输入请求,输出 true/false。
内置断言工厂:
spring:
cloud:
gateway:
routes:
- id: user-service
uri: lb://user-service # 负载均衡到注册中心的 user-service
predicates:
- Path=/api/user/** # PathRoutePredicateFactory:路径匹配
- Method=GET,POST # MethodRoutePredicateFactory:方法匹配
- Header=X-Request-Id,\d+ # HeaderRoutePredicateFactory:请求头匹配(支持正则)
- Query=type,premium # QueryRoutePredicateFactory:查询参数匹配
- After=2024-01-01T00:00:00+08:00[Asia/Shanghai] # 时间限定
- Weight=group1, 8 # WeightRoutePredicateFactory:按权重路由(灰度发布)2.2 自定义断言:按用户等级路由
package com.laozhang.gateway.predicate;
import lombok.Data;
import org.springframework.cloud.gateway.handler.predicate.AbstractRoutePredicateFactory;
import org.springframework.web.server.ServerWebExchange;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
/**
* 自定义断言工厂:按用户等级路由
* 配置:- UserLevel=VIP,SVIP
* 请求头 User-Level 必须包含在配置的等级列表里
*/
public class UserLevelRoutePredicateFactory
extends AbstractRoutePredicateFactory<UserLevelRoutePredicateFactory.Config> {
public UserLevelRoutePredicateFactory() {
super(Config.class);
}
@Override
public List<String> shortcutFieldOrder() {
return List.of("levels"); // 支持简写:- UserLevel=VIP,SVIP
}
@Override
public Predicate<ServerWebExchange> apply(Config config) {
return exchange -> {
String userLevel = exchange.getRequest().getHeaders().getFirst("User-Level");
if (userLevel == null || userLevel.isEmpty()) {
return false;
}
return config.getLevelList().contains(userLevel.toUpperCase());
};
}
@Data
public static class Config {
private String levels; // 逗号分隔的等级列表
public List<String> getLevelList() {
return Arrays.stream(levels.split(","))
.map(String::trim)
.map(String::toUpperCase)
.toList();
}
}
}三、完整代码实现
3.1 全局过滤器:链路追踪
package com.laozhang.gateway.filter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.UUID;
/**
* 全局链路追踪过滤器
* 为每个请求生成唯一的 traceId,并透传到下游服务
*/
@Slf4j
@Component
public class TraceGlobalFilter implements GlobalFilter, Ordered {
private static final String TRACE_ID_HEADER = "X-Trace-Id";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 获取或生成 traceId
String traceId = exchange.getRequest().getHeaders().getFirst(TRACE_ID_HEADER);
if (traceId == null || traceId.isEmpty()) {
traceId = UUID.randomUUID().toString().replace("-", "");
}
long startTime = System.currentTimeMillis();
final String finalTraceId = traceId;
// 在请求里添加 traceId header,传给下游服务
ServerHttpRequest mutatedRequest = exchange.getRequest().mutate()
.header(TRACE_ID_HEADER, finalTraceId)
.build();
// 在响应里也加上 traceId
exchange.getResponse().getHeaders().add(TRACE_ID_HEADER, finalTraceId);
return chain.filter(exchange.mutate().request(mutatedRequest).build())
.then(Mono.fromRunnable(() -> {
long elapsed = System.currentTimeMillis() - startTime;
log.info("[Gateway] traceId={} method={} path={} status={} elapsed={}ms",
finalTraceId,
exchange.getRequest().getMethod(),
exchange.getRequest().getPath().value(),
exchange.getResponse().getStatusCode(),
elapsed
);
}));
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE; // 最高优先级,最先执行
}
}3.2 全局过滤器:基于 Redis 的限流
package com.laozhang.gateway.filter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.time.Duration;
/**
* 全局限流过滤器(响应式写法,不阻塞)
* 基于滑动窗口:每分钟每IP最多100次请求
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RateLimitGlobalFilter implements GlobalFilter, Ordered {
private final ReactiveStringRedisTemplate redisTemplate;
private static final int MAX_REQUESTS = 100;
private static final Duration WINDOW = Duration.ofMinutes(1);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String clientIp = getClientIp(exchange);
String path = exchange.getRequest().getPath().value();
String key = "rate_limit:gateway:" + clientIp + ":" + path;
return redisTemplate.opsForValue().increment(key)
.flatMap(count -> {
if (count == 1) {
// 第一次访问,设置过期时间
return redisTemplate.expire(key, WINDOW)
.then(chain.filter(exchange));
} else if (count > MAX_REQUESTS) {
// 超过限制
log.warn("[RateLimit] 限流触发 ip={} path={} count={}", clientIp, path, count);
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
exchange.getResponse().getHeaders().add("X-Rate-Limit-Retry-After", "60");
return exchange.getResponse().setComplete();
} else {
return chain.filter(exchange);
}
});
}
private String getClientIp(ServerWebExchange exchange) {
String forwarded = exchange.getRequest().getHeaders().getFirst("X-Forwarded-For");
if (forwarded != null && !forwarded.isEmpty()) {
return forwarded.split(",")[0].trim();
}
var remoteAddress = exchange.getRequest().getRemoteAddress();
return remoteAddress != null ? remoteAddress.getAddress().getHostAddress() : "unknown";
}
@Override
public int getOrder() {
return 1; // TraceFilter(最高优先级) 之后
}
}3.3 路由级过滤器工厂:请求参数加签验证
package com.laozhang.gateway.filter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.HexFormat;
import java.util.List;
import java.util.TreeMap;
/**
* 请求签名验证过滤器工厂
* 只对需要签名验证的路由生效
*
* 使用方式(配置文件):
* filters:
* - SignatureVerify=my-secret-key
*/
@Slf4j
@Component
public class SignatureVerifyGatewayFilterFactory
extends AbstractGatewayFilterFactory<SignatureVerifyGatewayFilterFactory.Config> {
public SignatureVerifyGatewayFilterFactory() {
super(Config.class);
}
@Override
public List<String> shortcutFieldOrder() {
return List.of("secretKey");
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
var request = exchange.getRequest();
String timestamp = request.getHeaders().getFirst("X-Timestamp");
String nonce = request.getHeaders().getFirst("X-Nonce");
String signature = request.getHeaders().getFirst("X-Signature");
// 验证参数完整性
if (timestamp == null || nonce == null || signature == null) {
log.warn("[SignatureVerify] 缺少签名参数 path={}", request.getPath().value());
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
// 验证时间戳(防重放,5分钟内有效)
long requestTime = Long.parseLong(timestamp);
if (Math.abs(System.currentTimeMillis() - requestTime) > 5 * 60 * 1000) {
log.warn("[SignatureVerify] 请求已过期 timestamp={}", timestamp);
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
// 重新计算签名,与传入签名比对
String expectedSignature = calculateSignature(
config.getSecretKey(), timestamp, nonce,
request.getPath().value()
);
if (!expectedSignature.equalsIgnoreCase(signature)) {
log.warn("[SignatureVerify] 签名验证失败 path={}", request.getPath().value());
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
};
}
private String calculateSignature(String secretKey, String timestamp, String nonce, String path) {
// 签名算法:HmacSHA256(secretKey + timestamp + nonce + path)
// 简化演示:使用MD5
String raw = secretKey + timestamp + nonce + path;
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hash = md.digest(raw.getBytes(StandardCharsets.UTF_8));
return HexFormat.of().formatHex(hash);
} catch (Exception e) {
throw new RuntimeException("签名计算失败", e);
}
}
@Data
public static class Config {
private String secretKey;
}
}3.4 完整的 Gateway 配置
spring:
application:
name: api-gateway
cloud:
gateway:
# 全局默认过滤器(对所有路由生效)
default-filters:
- DedupeResponseHeader=Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_UNIQUE
- AddResponseHeader=X-Gateway-Version, 1.0.0
routes:
# 用户服务路由
- id: user-service
uri: lb://user-service
predicates:
- Path=/api/user/**
filters:
- StripPrefix=1 # 去掉/api前缀
- SignatureVerify=my-secret-key # 签名验证(自定义Filter)
- RequestRateLimiter=... # 内置限流
# 支付服务路由(更严格的安全要求)
- id: payment-service
uri: lb://payment-service
predicates:
- Path=/api/payment/**
- Header=Authorization, Bearer .+ # 必须有Bearer Token
filters:
- StripPrefix=1
- SignatureVerify=payment-secret-key
# 灰度发布路由(20%流量到新版本)
- id: order-service-v2
uri: lb://order-service-v2
predicates:
- Path=/api/order/**
- Weight=order-group, 20 # 20%流量
filters:
- StripPrefix=1
- id: order-service-v1
uri: lb://order-service-v1
predicates:
- Path=/api/order/**
- Weight=order-group, 80 # 80%流量
filters:
- StripPrefix=1四、踩坑实录
坑1:GlobalFilter 里阻塞操作导致线程饥饿
症状:加了一个 GlobalFilter,里面调用了 JDBC 或者 Thread.sleep(),高并发下网关吞吐量急剧下降,甚至挂死。
根因:Spring Cloud Gateway 基于 Reactor + Netty,使用少量的 I/O 线程处理所有请求。在这些线程上做阻塞操作会把线程占死。
解决方案:在过滤器里调用阻塞代码时,切换到独立的线程池:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return Mono.fromCallable(() -> {
// 这里可以做阻塞操作
return blockingDatabaseCall();
})
.subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池里执行
.flatMap(result -> chain.filter(exchange));
}坑2:过滤器修改请求 Body 后下游收不到
症状:在过滤器里用 exchange.getRequest().mutate() 修改了请求 Body,但下游服务读到的还是原始 Body。
根因:ServerHttpRequest 的 Body 是一个 Flux<DataBuffer>,只能被订阅一次(类似流,读过就没了)。如果过滤器消费了 Body,下游就读不到了。
正确做法:缓存 Body,用 ServerRequest 包装后重新构建:
// 先把Body缓存下来
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
// 处理body(修改、验证等)
String bodyStr = new String(bytes, StandardCharsets.UTF_8);
// ... 处理 bodyStr ...
// 重新构建 Request,用新的 Body
DataBuffer newBody = exchange.getResponse().bufferFactory().wrap(bytes);
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Flux.just(newBody);
}
};
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});坑3:断言配置中时间格式写错导致路由全部失效
症状:配置了 After=2024-01-01T00:00:00 后,所有请求都返回 404。
根因:After 断言使用 ZonedDateTime 格式,必须包含时区信息,否则解析失败,这条路由变为无效路由。
正确格式:
predicates:
# 正确:包含时区
- After=2024-01-01T00:00:00+08:00[Asia/Shanghai]
# 错误:缺少时区
- After=2024-01-01T00:00:00坑4:使用 lb:// 负载均衡但下游报 503
症状:配置了 uri: lb://user-service,有时返回 503 Service Unavailable。
根因:lb:// 协议依赖 Spring Cloud LoadBalancer(或旧版本的 Ribbon),如果没有找到可用的服务实例,会返回 503。
排查步骤:
- 检查
user-service是否已注册到注册中心(Nacos/Eureka) - 检查 Gateway 是否能正常连接注册中心
- 加日志确认 LoadBalancer 选择了哪个实例:
logging:
level:
org.springframework.cloud.gateway: DEBUG
reactor.netty.http.client: DEBUG五、总结与延伸
Spring Cloud Gateway 的核心设计:
- 响应式:基于 Reactor,全程非阻塞,适合高并发 API 网关
- 过滤器链:GlobalFilter(全局)+ GatewayFilter(路由级),组合灵活
- 断言优先:请求匹配由断言决定,多个断言 AND 逻辑,都满足才路由
自定义扩展的三个方向:
- 自定义断言:继承
AbstractRoutePredicateFactory - 自定义路由级过滤器:继承
AbstractGatewayFilterFactory - 自定义全局过滤器:实现
GlobalFilter + Ordered
