第1751篇:Spring Cloud Gateway在AI服务中的高级应用——请求路由与限流策略
第1751篇:Spring Cloud Gateway在AI服务中的高级应用——请求路由与限流策略
我在做AI项目的时候踩过一个坑,印象非常深。
那是一个对话系统上线后第二天,突然收到告警,某个AI推理节点CPU跑满了,而其他几个节点却闲着没事干。排查了很久才发现,网关的路由规则写得太简单,把所有流量都打到了同一个实例。那时候我才意识到,AI服务和传统微服务的流量特征差异极大——一个GPT推理请求可能占用普通接口十倍的资源,传统的轮询或者随机负载均衡根本不够用。
从那以后,我开始认真研究网关层对AI服务的专项适配,今天把这块经验系统梳理出来。
一、为什么AI服务的网关需要特殊对待
传统微服务里,网关就是个流量入口,做做认证、路由、简单限流就够了。但AI服务有几个特殊之处,让网关的职责变得复杂很多。
请求耗时差异巨大。一个普通REST接口响应时间可能是几毫秒到几百毫秒,而AI推理请求动辄几秒、几十秒,流式响应的场景下一个请求还要保持长连接。这意味着传统的连接池配置、超时设置在AI场景下全部需要重新评估。
资源消耗高度不均匀。一个"写一首诗"的请求和"分析这份100页的合同"的请求,在token消耗、推理时间、后端资源占用上可能相差数十倍。如果按请求数来限流,实际上是在放纵那些重量级请求、压制轻量级请求——完全不公平。
后端实例的健康状态不一。GPU推理节点随时可能因为显存不足、温度过高进入降级状态,网关需要感知这些状态并动态调整路由,而不是像以前一样只看HTTP健康检查。
安全风险更复杂。Prompt注入、越狱攻击、恶意内容提交,这些AI特有的安全威胁需要在网关层做第一道拦截。
带着这些认识,我们来看Spring Cloud Gateway如何应对这些挑战。
二、基础架构搭建
先把项目框架建起来。用Spring Cloud Gateway + Spring Boot 3.x的组合,依赖如下:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>基本路由配置:
spring:
cloud:
gateway:
routes:
- id: ai-chat-route
uri: lb://ai-chat-service
predicates:
- Path=/api/v1/chat/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 10
redis-rate-limiter.burstCapacity: 20
redis-rate-limiter.requestedTokens: 1
- name: CircuitBreaker
args:
name: aiChatCircuitBreaker
fallbackUri: forward:/fallback/ai-chat
- AddRequestHeader=X-Gateway-Request-Id, #{T(java.util.UUID).randomUUID().toString()}
- name: AIRequestEnhancer
- id: ai-embedding-route
uri: lb://ai-embedding-service
predicates:
- Path=/api/v1/embedding/**
filters:
- name: TokenBucketRateLimiter
args:
capacity: 100
refillRate: 50
default-filters:
- DedupeResponseHeader=Access-Control-Allow-Credentials Access-Control-Allow-Origin
- name: Retry
args:
retries: 2
statuses: SERVICE_UNAVAILABLE
methods: GET,POST
backoff:
firstBackoff: 100ms
maxBackoff: 500ms三、核心挑战:基于Token的限流器
这是整个方案里最关键的一块。普通的QPS限流对AI服务意义不大——你限制了每秒10个请求,但每个请求可能消耗1000个token,或者也可能只消耗10个token,结果完全不可预测。
真正应该限制的是token消耗速率。但问题来了:请求到达网关时,你还不知道这个请求最终会消耗多少token。
我的解法是分两阶段处理:
阶段一:请求预检,根据请求体估算token消耗(通过简单的字符数/4估算),并做预扣减。 阶段二:响应后校准,拿到实际消耗的token数,与预扣减值做差额处理。
@Component
public class TokenBucketRateLimiter implements GatewayFilter, Ordered {
private final ReactiveRedisTemplate<String, String> redisTemplate;
private final TokenEstimator tokenEstimator;
private static final String TOKEN_BUCKET_SCRIPT =
"local key = KEYS[1]\n" +
"local capacity = tonumber(ARGV[1])\n" +
"local refillRate = tonumber(ARGV[2])\n" +
"local requested = tonumber(ARGV[3])\n" +
"local now = tonumber(ARGV[4])\n" +
"local bucket = redis.call('hmget', key, 'tokens', 'lastRefill')\n" +
"local tokens = tonumber(bucket[1]) or capacity\n" +
"local lastRefill = tonumber(bucket[2]) or now\n" +
"local elapsed = math.max(0, now - lastRefill)\n" +
"local refilled = math.min(capacity, tokens + elapsed * refillRate)\n" +
"if refilled >= requested then\n" +
" redis.call('hmset', key, 'tokens', refilled - requested, 'lastRefill', now)\n" +
" redis.call('expire', key, 3600)\n" +
" return {1, math.floor(refilled - requested)}\n" +
"else\n" +
" redis.call('hmset', key, 'tokens', refilled, 'lastRefill', now)\n" +
" redis.call('expire', key, 3600)\n" +
" return {0, math.floor(refilled)}\n" +
"end";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String userId = extractUserId(exchange);
String bucketKey = "token_bucket:" + userId;
return exchange.getRequest().getBody()
.collectList()
.flatMap(dataBuffers -> {
// 读取请求体并估算token
byte[] bytes = mergeDataBuffers(dataBuffers);
String requestBody = new String(bytes, StandardCharsets.UTF_8);
int estimatedTokens = tokenEstimator.estimate(requestBody);
// 重建请求体(因为body只能读一次)
ServerWebExchange mutatedExchange = rebuildExchange(exchange, bytes);
long now = System.currentTimeMillis() / 1000;
return redisTemplate.execute(
RedisScript.of(TOKEN_BUCKET_SCRIPT, List.class),
Collections.singletonList(bucketKey),
String.valueOf(10000), // capacity: 10000 tokens
String.valueOf(100), // refillRate: 100 tokens/s
String.valueOf(estimatedTokens),
String.valueOf(now)
).next().flatMap(result -> {
List<Long> res = (List<Long>) result;
boolean allowed = res.get(0) == 1L;
long remaining = res.get(1);
exchange.getResponse().getHeaders()
.add("X-Token-Remaining", String.valueOf(remaining));
if (!allowed) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
exchange.getResponse().getHeaders()
.add("Retry-After", "60");
return exchange.getResponse().setComplete();
}
// 在exchange中保存预扣token数,响应后校准
mutatedExchange.getAttributes()
.put("preDeductedTokens", estimatedTokens);
mutatedExchange.getAttributes()
.put("tokenBucketKey", bucketKey);
return chain.filter(mutatedExchange)
.then(calibrateAfterResponse(mutatedExchange));
});
});
}
private Mono<Void> calibrateAfterResponse(ServerWebExchange exchange) {
// 从响应头中获取实际消耗的token数(由后端AI服务写入)
String actualTokensHeader = exchange.getResponse()
.getHeaders().getFirst("X-Actual-Tokens-Used");
if (actualTokensHeader == null) return Mono.empty();
int actualTokens = Integer.parseInt(actualTokensHeader);
int preDeducted = (Integer) exchange.getAttributes().get("preDeductedTokens");
int diff = preDeducted - actualTokens;
if (diff <= 0) return Mono.empty(); // 预扣偏少,这次就算了(避免复杂性)
String bucketKey = (String) exchange.getAttributes().get("tokenBucketKey");
// 归还多扣的token
String refundScript =
"local key = KEYS[1]\n" +
"local capacity = tonumber(ARGV[1])\n" +
"local refund = tonumber(ARGV[2])\n" +
"local tokens = tonumber(redis.call('hget', key, 'tokens') or 0)\n" +
"redis.call('hset', key, 'tokens', math.min(capacity, tokens + refund))\n" +
"return 1";
return redisTemplate.execute(
RedisScript.of(refundScript, Long.class),
Collections.singletonList(bucketKey),
String.valueOf(10000),
String.valueOf(diff)
).then();
}
@Override
public int getOrder() {
return -1;
}
}Token估算器的实现也有点门道,不能太复杂(网关是高频路径,不能引入过多延迟),也不能太简单:
@Component
public class TokenEstimator {
private static final int CHARS_PER_TOKEN_ZH = 2; // 中文约2字符/token
private static final int CHARS_PER_TOKEN_EN = 4; // 英文约4字符/token
private static final int BASE_OVERHEAD = 50; // 系统消息等固定开销
public int estimate(String requestBody) {
try {
JsonNode root = objectMapper.readTree(requestBody);
// 处理OpenAI格式的请求
if (root.has("messages")) {
int totalChars = 0;
for (JsonNode message : root.get("messages")) {
String content = message.path("content").asText("");
totalChars += content.length();
}
return estimateFromChars(totalChars) + BASE_OVERHEAD;
}
// 处理简单prompt格式
if (root.has("prompt")) {
String prompt = root.get("prompt").asText("");
return estimateFromChars(prompt.length()) + BASE_OVERHEAD;
}
return BASE_OVERHEAD;
} catch (Exception e) {
// 解析失败就返回一个保守估计
return requestBody.length() / 3 + BASE_OVERHEAD;
}
}
private int estimateFromChars(int charCount) {
// 简单判断是否主要是中文(通过字符占比)
// 实际生产中可以更精确
return charCount / 3; // 折中估算
}
}四、智能路由:感知后端状态
前面说过,AI推理节点的状态比普通服务复杂。我们需要网关感知后端的实际负载,做出更智能的路由决策。
先定义后端健康状态的数据结构:
@Data
@Builder
public class AINodeStatus {
private String instanceId;
private String host;
private int port;
private double gpuUtilization; // GPU利用率
private double gpuMemoryUsed; // 显存使用率
private int activeConnections; // 当前活跃连接数
private int queuedRequests; // 排队请求数
private long avgResponseTimeMs; // 平均响应时间
private NodeState state; // 节点状态
public enum NodeState {
HEALTHY, DEGRADED, OVERLOADED, UNHEALTHY
}
public double getLoadScore() {
// 综合评分,越低越好
if (state == NodeState.UNHEALTHY) return Double.MAX_VALUE;
double score = 0;
score += gpuUtilization * 40; // GPU利用率权重40%
score += gpuMemoryUsed * 30; // 显存权重30%
score += (activeConnections / 10.0) * 20; // 连接数权重20%
score += (queuedRequests / 5.0) * 10; // 队列权重10%
if (state == NodeState.DEGRADED) score *= 1.5; // 降级节点惩罚系数
return score;
}
}自定义负载均衡器:
@Component
public class AIAwareLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final AINodeStatusRepository nodeStatusRepo;
private final String serviceId;
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(serviceInstances, request));
}
private Response<ServiceInstance> processInstanceResponse(
List<ServiceInstance> instances, Request request) {
if (instances.isEmpty()) {
return new EmptyResponse();
}
// 获取所有实例的状态
List<Pair<ServiceInstance, AINodeStatus>> candidates = instances.stream()
.map(instance -> {
AINodeStatus status = nodeStatusRepo.getStatus(instance.getInstanceId());
return Pair.of(instance, status);
})
.filter(pair -> pair.getRight() == null ||
pair.getRight().getState() != AINodeStatus.NodeState.UNHEALTHY)
.sorted(Comparator.comparingDouble(pair -> {
AINodeStatus status = pair.getRight();
return status != null ? status.getLoadScore() : 0.0;
}))
.collect(Collectors.toList());
if (candidates.isEmpty()) {
// 所有节点都不健康,降级处理,选一个DEGRADED的
candidates = instances.stream()
.map(instance -> Pair.of(instance, nodeStatusRepo.getStatus(instance.getInstanceId())))
.collect(Collectors.toList());
}
// 带权重的随机选择(避免完全集中到最优节点)
ServiceInstance selected = weightedRandomSelect(candidates);
return new DefaultResponse(selected);
}
private ServiceInstance weightedRandomSelect(
List<Pair<ServiceInstance, AINodeStatus>> candidates) {
if (candidates.size() == 1) {
return candidates.get(0).getLeft();
}
// 取前三名做带权重选择,避免全部流量都打到最优节点
int topN = Math.min(3, candidates.size());
List<Pair<ServiceInstance, AINodeStatus>> topCandidates =
candidates.subList(0, topN);
// 权重:最优节点60%,次优30%,第三10%
double[] weights = {0.6, 0.3, 0.1};
double rand = Math.random();
double cumulative = 0;
for (int i = 0; i < topCandidates.size(); i++) {
cumulative += weights[i];
if (rand <= cumulative) {
return topCandidates.get(i).getLeft();
}
}
return topCandidates.get(0).getLeft();
}
}后端节点需要定期上报自己的状态,我们在AI服务里加一个状态上报器:
@Component
@Slf4j
public class NodeStatusReporter {
private final RedisTemplate<String, Object> redisTemplate;
@Scheduled(fixedDelay = 5000) // 每5秒上报一次
public void reportStatus() {
AINodeStatus status = collectCurrentStatus();
String key = "ai_node_status:" + getInstanceId();
redisTemplate.opsForValue().set(key, status, Duration.ofSeconds(30));
log.debug("Reported node status: GPU={}%, Memory={}%, Connections={}",
status.getGpuUtilization(),
status.getGpuMemoryUsed(),
status.getActiveConnections());
}
private AINodeStatus collectCurrentStatus() {
// 实际项目中通过NVML或nvidia-smi获取GPU信息
// 这里展示思路
return AINodeStatus.builder()
.instanceId(getInstanceId())
.gpuUtilization(getGpuUtilization())
.gpuMemoryUsed(getGpuMemoryUsed())
.activeConnections(getActiveConnectionCount())
.queuedRequests(getQueuedRequestCount())
.avgResponseTimeMs(getAvgResponseTime())
.state(determineState())
.build();
}
private AINodeStatus.NodeState determineState() {
double gpuUtil = getGpuUtilization();
double memUtil = getGpuMemoryUsed();
int queue = getQueuedRequestCount();
if (gpuUtil > 0.95 || memUtil > 0.95 || queue > 20) {
return AINodeStatus.NodeState.OVERLOADED;
} else if (gpuUtil > 0.8 || memUtil > 0.85 || queue > 10) {
return AINodeStatus.NodeState.DEGRADED;
} else {
return AINodeStatus.NodeState.HEALTHY;
}
}
}五、流式响应的特殊处理
SSE(Server-Sent Events)和WebSocket流式响应是AI对话的标配,但给网关层带来了不小的麻烦。
最常见的坑:默认的响应缓冲会把流式响应积累完再转发,用户完全感受不到流式效果。
解决方案,在路由配置里禁用响应缓冲:
spring:
cloud:
gateway:
routes:
- id: ai-stream-route
uri: lb://ai-chat-service
predicates:
- Path=/api/v1/chat/stream/**
filters:
- RemoveResponseHeader=Transfer-Encoding
- name: ModifyResponseBody
args:
inClass: "#{T(java.lang.String)}"
outClass: "#{T(java.lang.String)}"
newContentType: "text/event-stream;charset=UTF-8"同时需要自定义过滤器处理SSE头部:
@Component
public class SSEPassThroughFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 检测是否是SSE请求
String accept = request.getHeaders().getFirst(HttpHeaders.ACCEPT);
if (accept != null && accept.contains("text/event-stream")) {
// 修改响应,确保不缓冲
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().add(HttpHeaders.CACHE_CONTROL, "no-cache");
response.getHeaders().add(HttpHeaders.CONNECTION, "keep-alive");
response.getHeaders().add("X-Accel-Buffering", "no"); // 通知Nginx不缓冲
// 设置超时为流式请求专用配置(更长)
return chain.filter(exchange.mutate()
.request(request.mutate()
.header("X-Timeout-Override", "300000") // 5分钟
.build())
.build());
}
return chain.filter(exchange);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}六、熔断降级策略
AI服务的熔断降级和普通服务有点不同——降级不是直接返回错误,而是可以返回一个"降级模式"的响应,比如从本地缓存、规则库返回一个简化的答案。
@RestController
@Slf4j
public class FallbackController {
private final LocalFallbackKnowledgeBase knowledgeBase;
@PostMapping("/fallback/ai-chat")
public Mono<ResponseEntity<Map<String, Object>>> chatFallback(
ServerWebExchange exchange) {
// 获取原始请求信息
String originalPath = exchange.getAttribute(
ServerWebExchangeUtils.GATEWAY_ORIGINAL_REQUEST_URL_ATTR) + "";
log.warn("AI chat service circuit breaker opened, serving fallback for path: {}",
originalPath);
// 尝试从本地知识库给出简单回答
Map<String, Object> fallbackResponse = new HashMap<>();
fallbackResponse.put("id", UUID.randomUUID().toString());
fallbackResponse.put("object", "chat.completion");
fallbackResponse.put("model", "fallback-local");
fallbackResponse.put("choices", List.of(Map.of(
"message", Map.of(
"role", "assistant",
"content", "抱歉,AI服务当前繁忙,请稍后重试。如有紧急需求," +
"可以尝试简化您的问题,或联系人工客服。"
),
"finish_reason", "stop",
"index", 0
)));
fallbackResponse.put("is_fallback", true);
return Mono.just(ResponseEntity
.status(HttpStatus.SERVICE_UNAVAILABLE)
.header("X-Fallback-Reason", "circuit-breaker-open")
.body(fallbackResponse));
}
}Resilience4j的熔断配置要根据AI服务的特点来调整:
@Configuration
public class CircuitBreakerConfig {
@Bean
public Customizer<ReactiveResilience4JCircuitBreakerFactory> aiChatCircuitBreakerCustomizer() {
return factory -> {
factory.configure(builder -> builder
.circuitBreakerConfig(
io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.custom()
// AI推理慢,慢调用阈值设高一点
.slowCallDurationThreshold(Duration.ofSeconds(30))
// 超过80%的请求慢或失败才开熔断
.slowCallRateThreshold(80)
.failureRateThreshold(50)
// 滑动窗口大小
.slidingWindowSize(20)
.minimumNumberOfCalls(10)
// 半开状态允许通过的请求数
.permittedNumberOfCallsInHalfOpenState(3)
// 熔断后等待60秒再尝试
.waitDurationInOpenState(Duration.ofSeconds(60))
.build()
)
.timeLimiterConfig(
TimeLimiterConfig.custom()
// 单个请求超时90秒
.timeoutDuration(Duration.ofSeconds(90))
.build()
),
"aiChatCircuitBreaker"
);
};
}
}七、监控与可观测性
网关层的监控对AI系统特别重要,除了常规的QPS、延迟,还需要关注token消耗分布、请求重量分布等AI特有指标。
@Component
public class AIGatewayMetricsFilter implements GlobalFilter, Ordered {
private final MeterRegistry meterRegistry;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
long startTime = System.currentTimeMillis();
String path = exchange.getRequest().getPath().value();
String routeId = getRouteId(exchange);
return chain.filter(exchange)
.doFinally(signalType -> {
long duration = System.currentTimeMillis() - startTime;
int statusCode = exchange.getResponse().getStatusCode() != null ?
exchange.getResponse().getStatusCode().value() : 0;
// 记录请求延迟
meterRegistry.timer("ai.gateway.request.duration",
"route", routeId,
"status", String.valueOf(statusCode),
"outcome", signalType.toString()
).record(duration, TimeUnit.MILLISECONDS);
// 记录token消耗
String tokensUsed = exchange.getResponse()
.getHeaders().getFirst("X-Actual-Tokens-Used");
if (tokensUsed != null) {
meterRegistry.summary("ai.gateway.tokens.used",
"route", routeId
).record(Double.parseDouble(tokensUsed));
}
// 记录限流事件
if (statusCode == 429) {
meterRegistry.counter("ai.gateway.rate.limited",
"route", routeId
).increment();
}
});
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}八、我踩过的那些坑
坑一:请求体被消费后无法重读
Gateway用Flux处理请求体,默认情况下请求体读一次就没了。如果你在Filter里读了请求体做Token估算,转发到后端时请求体就是空的。解决方案是用ServerRequest.create()重建请求,或者用CachedBodyOutputMessage缓存请求体。上面代码里的rebuildExchange方法就是干这个的,别偷懒。
坑二:SSE响应被网关缓冲
遇到这个问题时,用户反映AI回答要等很久才一次性显示,根本没有流式效果。排查后发现是网关的响应缓冲在捣鬼。加了X-Accel-Buffering: no头和禁用Transfer-Encoding之后才解决。
坑三:熔断超时配置和AI推理时间冲突
默认的熔断超时是1秒,AI推理动辄10-30秒,结果每个请求都触发超时熔断,把自己给玩死了。AI服务的超时要单独配,不能用通用配置。
坑四:Redis Lua脚本的原子性理解错误
Token桶的Lua脚本必须在一个脚本里完成读-算-写,不能拆开来用多次Redis命令,否则在高并发下会有竞态条件。这个坑在压测时才会暴露,平时看起来完全正常。
坑五:负载均衡感知延迟
节点状态上报是5秒一次,负载均衡器读取状态也有缓存。这意味着某个节点刚超载,网关可能还要再给它发10几秒的流量才能感知到。对于这个问题我的做法是在节点侧做一层快速熔断——节点自己如果检测到超载,直接返回503,让客户端(网关)重试到其他节点。双保险。
九、完整的架构视图
十、小结
把这套方案落地之后,我们的AI服务集群再没出现过流量不均的问题,Token消耗控制也变得精准很多。整体下来,最核心的三个设计原则是:
一,基于Token而非请求数做限流——这是AI场景限流的本质区别。
二,网关感知后端状态——不能把AI节点当成无状态的黑盒,GPU利用率、显存使用率这些信息网关必须知道。
三,流式响应要单独对待——SSE、WebSocket在网关层有专门的处理方式,不能用通用过滤器去碰它。
当然,这套方案本身也在演进中。下一步我想探索的是在网关层做Prompt缓存——同样的Prompt打过来,直接在网关层返回缓存结果,这样既能降低后端压力,又能大幅降低token消耗。等我把这块跑通了,再单独写一篇。
