第1922篇:API网关的AI智能限流——基于语义相似度的请求聚合与合并
第1922篇:API网关的AI智能限流——基于语义相似度的请求聚合与合并
传统限流的思路很简单:每秒最多多少个请求,超了就拒绝。这个策略对普通业务接口够用,但放到AI接口上,简直是在自我伤害。
我跟你说一个真实场景。
某天下午三点,我们的内容生成服务突然被上游系统刷爆了。查了一下日志,是一个推荐系统的Bug导致它在一分钟内重复请求了同一个内容类别的生成任务六千多次。这六千多次请求,如果用传统令牌桶,会直接返回429——用户看到的就是功能不可用。
但更关键的问题是:这六千多个请求,实际上内容高度重复。"给我生成一段关于手机评测的文章开头"这样的请求,前后两次的差异可能就是几个参数略有不同,或者关键词换了一两个字。如果我们能把语义相似的请求聚合起来,用一次推理结果回复多个调用方,既减少了GPU算力消耗,又提高了系统吞吐。
这就是AI智能限流要解决的问题:不是简单地说"你不能发这么多请求",而是说"我帮你把重复的请求合并掉,然后用最高效的方式执行"。
为什么传统限流对AI不够用
先梳理一下传统限流的几个问题:
固定窗口限流:每分钟100次,超了拒绝。问题是窗口临界处会有突发:59秒发了100次,60秒又发了100次,实际上一秒内承受了200次请求压力。
滑动窗口限流:改进了临界问题,但本质还是基于"数量"的控制,对AI这种"每次请求资源消耗差异极大"的场景不适合——一个生成1000token的请求消耗的GPU算力是生成100token的请求的好几倍,但它们在限流计数上是一样的。
令牌桶限流:解决了突发问题,但同样没有考虑请求的"语义价值"。
AI接口需要的是:
- 基于资源消耗而非请求数量的限流
- 语义相似请求的去重与聚合
- 动态调整限流阈值(根据GPU负载实时调整)
整体架构设计
核心有三个组件:
- 语义相似度计算器:判断两个请求是否"说的是同一件事"
- 请求聚合器:把相似请求合并成一个任务,用一次推理结果服务多个调用方
- 资源感知令牌桶:根据当前GPU负载动态调整放行速率
语义相似度计算
这是整个方案的核心难点。要判断两个请求是否语义相似,最直接的方式是用Embedding模型把请求向量化,然后计算余弦相似度。
但这里有个鸡生蛋蛋生鸡的问题:计算语义相似度本身也需要调用Embedding模型,也消耗资源。如果每个请求都做一次Embedding,相当于每个AI请求又多了一次Embedding请求,反而更重了。
解决方案是分层判断:
大多数重复请求会在第一层(精确哈希)就被命中,只有少量差异的请求进入第二层,真正需要语义理解的才到第三层。这样计算开销是可控的。
请求规范化与哈希
在做哈希匹配之前,需要先做请求规范化,因为两个"相同"的请求可能因为空白字符、大小写、无意义参数差异而哈希不一样:
@Component
public class RequestNormalizer {
public String normalize(AIRequest request) {
// 提取核心语义字段,忽略请求ID、时间戳等噪音字段
StringBuilder sb = new StringBuilder();
// 对prompt做规范化
String normalizedPrompt = request.getPrompt()
.trim()
.replaceAll("\\s+", " ") // 合并多余空白
.toLowerCase();
sb.append("model:").append(request.getModelId()).append("|");
sb.append("task:").append(request.getTaskType()).append("|");
sb.append("prompt:").append(normalizedPrompt).append("|");
// 推理参数要参与哈希,temperature=0.7和temperature=0.8是不同请求
if (request.getTemperature() != null) {
// 对temperature做分桶,0.65-0.75都算0.7
double bucketed = Math.round(request.getTemperature() * 10.0) / 10.0;
sb.append("temp:").append(bucketed).append("|");
}
if (request.getMaxTokens() != null) {
sb.append("tokens:").append(request.getMaxTokens()).append("|");
}
return sb.toString();
}
public String computeHash(String normalized) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hash = md.digest(normalized.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(hash);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}基于TF-IDF的关键词相似度
第二层用的是简化的TF-IDF余弦相似度,不需要模型,纯Java计算,速度快:
@Component
public class KeywordSimilarityCalculator {
// 简单的IDF字典(可以离线计算好)
private final Map<String, Double> idfDict;
public double computeSimilarity(String text1, String text2) {
Map<String, Double> vec1 = toTfIdfVector(text1);
Map<String, Double> vec2 = toTfIdfVector(text2);
return cosineSimilarity(vec1, vec2);
}
private Map<String, Double> toTfIdfVector(String text) {
// 简单分词:按字和词分割
List<String> tokens = tokenize(text);
Map<String, Long> termFreq = tokens.stream()
.collect(Collectors.groupingBy(t -> t, Collectors.counting()));
Map<String, Double> vector = new HashMap<>();
int totalTerms = tokens.size();
for (Map.Entry<String, Long> entry : termFreq.entrySet()) {
String term = entry.getKey();
double tf = (double) entry.getValue() / totalTerms;
double idf = idfDict.getOrDefault(term, Math.log(10000.0)); // 未知词给高IDF
vector.put(term, tf * idf);
}
return vector;
}
private double cosineSimilarity(Map<String, Double> vec1, Map<String, Double> vec2) {
Set<String> allTerms = new HashSet<>(vec1.keySet());
allTerms.addAll(vec2.keySet());
double dotProduct = 0.0;
double norm1 = 0.0;
double norm2 = 0.0;
for (String term : allTerms) {
double v1 = vec1.getOrDefault(term, 0.0);
double v2 = vec2.getOrDefault(term, 0.0);
dotProduct += v1 * v2;
norm1 += v1 * v1;
norm2 += v2 * v2;
}
if (norm1 == 0 || norm2 == 0) return 0.0;
return dotProduct / (Math.sqrt(norm1) * Math.sqrt(norm2));
}
private List<String> tokenize(String text) {
// 实际生产中可以用jieba等分词器
List<String> tokens = new ArrayList<>();
// 简单按字符分割,实际需要替换为分词库
for (int i = 0; i < text.length(); i++) {
tokens.add(String.valueOf(text.charAt(i)));
}
return tokens;
}
}请求聚合器:等待窗口机制
请求聚合的核心是"等待窗口":新请求进来,先不急着执行,等一小段时间(比如50ms),看看这段时间里有没有相似请求可以一起合并处理。
@Component
public class RequestAggregator {
private final Map<String, AggregatedTask> pendingTasks = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4);
private static final long AGGREGATION_WINDOW_MS = 50; // 等待窗口
private static final int MAX_BATCH_SIZE = 10; // 最多合并10个请求
@Autowired
private AIInferenceClient inferenceClient;
public CompletableFuture<AIResponse> submitRequest(AIRequest request, String requestHash) {
// 检查是否有可以合并的等待中任务
AggregatedTask existingTask = findMergeableTask(request, requestHash);
if (existingTask != null) {
// 有可合并的任务,挂载到现有任务上
CompletableFuture<AIResponse> future = new CompletableFuture<>();
existingTask.addWaiter(request, future);
log.debug("请求{}合并到任务{}", request.getRequestId(), existingTask.getTaskId());
return future;
} else {
// 创建新的聚合任务
AggregatedTask newTask = new AggregatedTask(request, requestHash);
pendingTasks.put(requestHash, newTask);
// 设置执行时间:等待窗口到了或者批次满了就执行
scheduleExecution(newTask);
return newTask.getPrimaryFuture();
}
}
private AggregatedTask findMergeableTask(AIRequest request, String requestHash) {
// 先精确匹配
AggregatedTask exact = pendingTasks.get(requestHash);
if (exact != null && exact.getWaiters().size() < MAX_BATCH_SIZE) {
return exact;
}
// 没有精确匹配,找语义相似的(这里简化处理,实际可以维护一个相似度索引)
return pendingTasks.values().stream()
.filter(task -> task.getWaiters().size() < MAX_BATCH_SIZE)
.filter(task -> isSimilarEnough(request, task.getPrimaryRequest()))
.findFirst()
.orElse(null);
}
private boolean isSimilarEnough(AIRequest req1, AIRequest req2) {
// 同一个模型、同一个任务类型
if (!req1.getModelId().equals(req2.getModelId())) return false;
if (!req1.getTaskType().equals(req2.getTaskType())) return false;
// 关键词相似度检查(实际调用KeywordSimilarityCalculator)
// 这里简化
return false;
}
private void scheduleExecution(AggregatedTask task) {
// 等待窗口后执行
ScheduledFuture<?> scheduledFuture = scheduler.schedule(
() -> executeTask(task),
AGGREGATION_WINDOW_MS,
TimeUnit.MILLISECONDS
);
task.setScheduledFuture(scheduledFuture);
}
private void executeTask(AggregatedTask task) {
pendingTasks.remove(task.getRequestHash());
List<AIRequest> allRequests = task.getAllRequests();
log.info("执行聚合任务{},合并了{}个请求", task.getTaskId(), allRequests.size());
try {
// 执行主请求
AIResponse primaryResponse = inferenceClient.infer(task.getPrimaryRequest());
// 将主请求结果分发给所有等待者
// 对于相似但不完全相同的请求,可以做轻微调整
task.getPrimaryFuture().complete(primaryResponse);
for (Map.Entry<AIRequest, CompletableFuture<AIResponse>> waiter :
task.getWaiterEntries()) {
AIResponse adaptedResponse = adaptResponse(primaryResponse, waiter.getKey());
waiter.getValue().complete(adaptedResponse);
}
} catch (Exception e) {
task.getPrimaryFuture().completeExceptionally(e);
task.getWaiterEntries().forEach(entry ->
entry.getValue().completeExceptionally(e));
}
}
private AIResponse adaptResponse(AIResponse primary, AIRequest request) {
// 对于语义相似但略有不同的请求,可以对结果做简单适配
// 比如替换几个关键词
// 这里简化处理,直接返回相同结果
return primary;
}
}资源感知令牌桶
传统令牌桶的速率是固定的,但AI服务的处理能力随GPU负载动态变化。我们需要一个能根据实时GPU状态调整速率的令牌桶:
@Component
public class ResourceAwareTokenBucket {
private final AtomicLong availableTokens = new AtomicLong(0);
private final AtomicLong lastRefillTime = new AtomicLong(System.currentTimeMillis());
@Autowired
private GpuMonitor gpuMonitor;
// 基础速率:GPU满负载时的最大处理速率(token/秒)
private static final double BASE_RATE = 100.0;
// 桶容量
private static final long BUCKET_CAPACITY = 200;
@Scheduled(fixedDelay = 1000) // 每秒补充令牌
public void refillTokens() {
long now = System.currentTimeMillis();
long lastRefill = lastRefillTime.getAndSet(now);
double elapsed = (now - lastRefill) / 1000.0; // 转换为秒
// 根据GPU空闲率计算当前速率
double gpuIdleRate = gpuMonitor.getGpuIdleRate(); // 0.0-1.0
double currentRate = BASE_RATE * (0.2 + 0.8 * gpuIdleRate);
// GPU完全空闲时速率为BASE_RATE,GPU满载时速率为0.2*BASE_RATE
long tokensToAdd = (long) (elapsed * currentRate);
long current = availableTokens.get();
long newTokens = Math.min(current + tokensToAdd, BUCKET_CAPACITY);
availableTokens.set(newTokens);
}
public boolean tryAcquire(int requestedTokens) {
long current;
do {
current = availableTokens.get();
if (current < requestedTokens) {
return false; // 令牌不足
}
} while (!availableTokens.compareAndSet(current, current - requestedTokens));
return true;
}
public boolean tryAcquireWithCost(AIRequest request) {
// 根据请求复杂度估算消耗的令牌数
int estimatedCost = estimateRequestCost(request);
return tryAcquire(estimatedCost);
}
private int estimateRequestCost(AIRequest request) {
// 简单估算:基于max_tokens和任务类型
int baseTokens = 1;
if (request.getMaxTokens() != null) {
baseTokens = Math.max(1, request.getMaxTokens() / 100);
}
// 生成任务比分类任务消耗更多资源
int taskMultiplier = switch (request.getTaskType()) {
case "generation" -> 5;
case "summarization" -> 3;
case "classification" -> 1;
case "embedding" -> 1;
default -> 2;
};
return baseTokens * taskMultiplier;
}
}在Spring Cloud Gateway中集成
所有上面的组件,需要通过一个统一的Gateway Filter串起来:
@Component
public class AISmartRateLimitFilter implements GlobalFilter, Ordered {
@Autowired
private RequestNormalizer normalizer;
@Autowired
private RequestAggregator aggregator;
@Autowired
private ResourceAwareTokenBucket tokenBucket;
@Autowired
private ResultCache resultCache;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 只对AI推理接口生效
if (!request.getPath().toString().startsWith("/api/ai/")) {
return chain.filter(exchange);
}
return readRequestBody(exchange)
.flatMap(body -> {
AIRequest aiRequest = parseRequest(body);
// 第一步:规范化和哈希
String normalized = normalizer.normalize(aiRequest);
String hash = normalizer.computeHash(normalized);
// 第二步:查结果缓存(最近5分钟的相同请求直接返回缓存)
Optional<AIResponse> cached = resultCache.get(hash);
if (cached.isPresent()) {
return writeResponse(exchange, cached.get());
}
// 第三步:检查资源令牌
if (!tokenBucket.tryAcquireWithCost(aiRequest)) {
return writeRateLimitResponse(exchange);
}
// 第四步:提交到聚合器
return Mono.fromFuture(aggregator.submitRequest(aiRequest, hash))
.flatMap(response -> {
// 将结果放入缓存
resultCache.put(hash, response, Duration.ofMinutes(5));
return writeResponse(exchange, response);
})
.onErrorResume(e -> writeErrorResponse(exchange, e));
});
}
@Override
public int getOrder() {
return -100; // 优先级较高,在大部分其他filter之前执行
}
private Mono<Void> writeRateLimitResponse(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
response.getHeaders().add("Retry-After", "1");
response.getHeaders().add("X-RateLimit-Reason", "GPU资源不足,请稍后重试");
Map<String, Object> body = Map.of(
"code", 429,
"message", "请求太多,AI服务正在繁忙,请稍后重试",
"retryAfterMs", 1000
);
DataBuffer buffer = response.bufferFactory()
.wrap(JsonUtils.toBytes(body));
return response.writeWith(Mono.just(buffer));
}
}结果缓存:语义缓存的设计
这里的缓存不是普通的key-value缓存,而是"语义缓存"——相似的请求可以复用之前的结果。
@Component
public class SemanticResultCache {
// 精确匹配缓存
private final Cache<String, AIResponse> exactCache;
// 语义缓存(存储hash -> 向量的映射,用于相似查找)
private final List<CacheEntry> semanticEntries = new CopyOnWriteArrayList<>();
@Autowired
private EmbeddingService embeddingService;
public SemanticResultCache() {
this.exactCache = Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
}
public Optional<AIResponse> get(String exactHash) {
// 先查精确缓存
AIResponse exact = exactCache.getIfPresent(exactHash);
if (exact != null) {
return Optional.of(exact);
}
return Optional.empty();
}
public Optional<AIResponse> getSemanticMatch(AIRequest request, double threshold) {
// 异步语义查找(不阻塞主流程)
try {
float[] queryVector = embeddingService.embed(request.getPrompt());
return semanticEntries.stream()
.filter(entry -> !entry.isExpired())
.max(Comparator.comparingDouble(
entry -> cosineSimilarity(queryVector, entry.getVector())))
.filter(entry -> cosineSimilarity(queryVector, entry.getVector()) > threshold)
.map(CacheEntry::getResponse);
} catch (Exception e) {
log.warn("语义缓存查找失败: {}", e.getMessage());
return Optional.empty();
}
}
public void put(String hash, AIResponse response, Duration ttl) {
exactCache.put(hash, response);
// 异步将向量化结果加入语义缓存
CompletableFuture.runAsync(() -> {
try {
// 这里的prompt从哪来?需要在cache时一起存入
// 简化处理
} catch (Exception e) {
log.warn("语义缓存写入失败: {}", e.getMessage());
}
});
}
private double cosineSimilarity(float[] vec1, float[] vec2) {
double dot = 0, norm1 = 0, norm2 = 0;
for (int i = 0; i < vec1.length; i++) {
dot += vec1[i] * vec2[i];
norm1 += vec1[i] * vec1[i];
norm2 += vec2[i] * vec2[i];
}
return dot / (Math.sqrt(norm1) * Math.sqrt(norm2));
}
private record CacheEntry(float[] vector, AIResponse response, long expireAt) {
boolean isExpired() {
return System.currentTimeMillis() > expireAt;
}
}
}监控指标:怎么知道聚合效果好不好
上了这套系统,需要有指标来衡量效果:
@Component
public class AggregationMetrics {
private final Counter totalRequests;
private final Counter exactCacheHits;
private final Counter aggregatedRequests;
private final Counter rateLimitedRequests;
private final Histogram aggregationBatchSize;
private final Histogram savedInferenceCount;
public AggregationMetrics(MeterRegistry registry) {
this.totalRequests = Counter.builder("ai.gateway.requests.total")
.description("总请求数")
.register(registry);
this.exactCacheHits = Counter.builder("ai.gateway.cache.hits")
.tag("type", "exact")
.description("精确缓存命中数")
.register(registry);
this.aggregatedRequests = Counter.builder("ai.gateway.requests.aggregated")
.description("被聚合的请求数")
.register(registry);
this.rateLimitedRequests = Counter.builder("ai.gateway.requests.ratelimited")
.description("被限流的请求数")
.register(registry);
this.aggregationBatchSize = Histogram.builder("ai.gateway.aggregation.batch.size")
.description("聚合批次大小分布")
.register(registry);
this.savedInferenceCount = Histogram.builder("ai.gateway.saved.inferences")
.description("通过聚合节省的推理次数")
.register(registry);
}
// 关键指标:聚合率
// 聚合率 = (aggregatedRequests / totalRequests) * 100%
// 目标:聚合率在流量高峰时达到30%以上
// 缓存命中率
// 命中率 = (exactCacheHits / totalRequests) * 100%
// 目标:稳定流量下达到50%以上
}踩坑记录
做这套系统的时候,有几个坑我想特别提一下:
坑一:等待窗口设置不合理。我最开始设了200ms的等待窗口,结果用户体验很差,因为每个请求都要多等200ms才开始处理。后来改成50ms,效果好了很多。等待窗口是聚合率和响应延迟的权衡,要根据业务接受的延迟上限来定。
坑二:聚合结果不能无脑复用。我曾经把一个"帮我写一篇关于苹果手机的评测"的请求结果,返回给了一个"帮我写一篇关于苹果电脑的评测"的请求(因为语义相似度较高)。这种情况下,聚合不是优化,是Bug。对于生成类任务,语义缓存要非常保守,相似度阈值应该调高到0.95以上甚至只做精确匹配。对于分类、摘要这类"给定文本求答案"的任务才适合做语义聚合。
坑三:令牌桶的令牌估算要定期校准。我最初的估算函数是靠经验拍的,上线后发现实际GPU消耗和估算差了很多,导致要么过度保守(GPU实际还有余量,但令牌桶已满)、要么过度激进(令牌充足但GPU已满)。最终的解决方案是定期用实际GPU利用率反馈来校准估算函数。
效果数据
这套系统在我们生产环境跑了三个月之后,数据如下:
- 在下午流量高峰期,请求聚合率约28%(大约每4个请求能合并成3个推理任务)
- 精确缓存命中率约41%
- 综合下来,AI推理的GPU算力消耗降低了约35%
- 平均响应延迟略有增加(+38ms,主要来自聚合等待窗口),但P99延迟反而降低了,因为减少了GPU排队等待时间
对我们来说,35%的算力节省相当于每个月省了十几万的GPU费用,这个收益非常可观。
