第1885篇:当模型API限流时——我们如何撑过双十一流量高峰
第1885篇:当模型API限流时——我们如何撑过双十一流量高峰
双十一前一周,我们开了一个会。
会上,运营同学把预测的流量曲线给我们看了。峰值流量是日常的40倍。
然后所有人都看向了我。
背景:我们的AI功能依赖哪些接口
我们做的是一个电商导购助手,用户在选购时可以问AI"这款产品适合我吗""跟那款相比哪个性价比更高"之类的问题。后端是调用OpenAI的API,用的是GPT-4o。
OpenAI的限流规则是分层的,主要有两个维度:
- TPM(Tokens Per Minute):每分钟的token消耗上限
- RPM(Requests Per Minute):每分钟的请求次数上限
我们当时的Tier是3,TPM上限是450,000,RPM上限是5,000。
日常流量下,我们的峰值RPM大概是800,TPM大概是120,000,都在限额的30%以内,相当宽裕。
但如果双十一峰值是日常的40倍……算了,不用算,肯定超。
分析:哪些请求必须实时,哪些可以等
应对限流,最错误的思路是"怎么发更多请求",正确的思路是"哪些请求可以不发"。
我们把AI功能的调用场景做了一个分类:
A类:用户主动提问(必须实时) 用户手动输入问题,等待回答。这类请求不能等待,必须在5秒内响应,否则用户会直接关掉。占总请求的约25%。
B类:商品详情页的自动分析(可以稍等) 用户打开商品页时,会自动触发一个"AI分析该商品优缺点"的请求。用户打开页面后通常会先看图片和参数,有3-5秒的自然等待时间。占总请求的约45%。
C类:搜索结果的智能摘要(可以大幅延迟) 搜索结果页会对每个商品生成一句话摘要。这个功能是锦上添花,没有也不影响核心体验。占总请求的约30%。
这个分类成了我们整个策略的基础。
策略一:请求优先级队列
我们在服务内部实现了一个优先级感知的请求队列,A类请求优先处理,B类次之,C类最低。
@Component
public class PriorityLlmRequestQueue {
// 三个优先级的队列
private final BlockingQueue<LlmRequest> criticalQueue = new LinkedBlockingQueue<>(1000);
private final BlockingQueue<LlmRequest> normalQueue = new LinkedBlockingQueue<>(5000);
private final BlockingQueue<LlmRequest> lowPriorityQueue = new LinkedBlockingQueue<>(10000);
// 令牌桶:控制整体对外调用速率,避免超过RPM限制
private final RateLimiter apiRateLimiter;
@PostConstruct
public void init() {
// 初始化令牌桶,留20%余量:实际上限RPM×0.8
int rpmLimit = Integer.parseInt(env.getProperty("openai.rpm.limit", "5000"));
this.apiRateLimiter = RateLimiter.create(rpmLimit * 0.8 / 60.0); // 转换为每秒速率
// 启动消费线程
startConsumerThreads();
}
private void startConsumerThreads() {
// 消费线程,按优先级取请求
Thread consumer = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
LlmRequest request = pollWithPriority();
if (request != null) {
// 获取令牌(这里会阻塞,起到限速作用)
apiRateLimiter.acquire();
processRequest(request);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "llm-queue-consumer");
consumer.setDaemon(true);
consumer.start();
}
private LlmRequest pollWithPriority() throws InterruptedException {
// 优先从高优先级队列取,高优先级队列空了再取低优先级
LlmRequest request = criticalQueue.poll();
if (request != null) return request;
request = normalQueue.poll();
if (request != null) return request;
// 低优先级队列,等待最多100ms
return lowPriorityQueue.poll(100, TimeUnit.MILLISECONDS);
}
/**
* 提交请求,返回Future,调用方可以设置不同的等待超时
*/
public CompletableFuture<String> submit(LlmRequest request) {
CompletableFuture<String> future = new CompletableFuture<>();
request.setFuture(future);
// 根据优先级选择队列
BlockingQueue<LlmRequest> targetQueue = switch (request.getPriority()) {
case CRITICAL -> criticalQueue;
case NORMAL -> normalQueue;
case LOW -> lowPriorityQueue;
};
// 队列满了就拒绝低优先级请求,而不是阻塞
if (!targetQueue.offer(request)) {
if (request.getPriority() == Priority.LOW) {
// 低优先级请求直接降级,不等了
future.complete(request.getFallbackResponse());
} else {
future.completeExceptionally(new QueueFullException("Request queue full"));
}
}
return future;
}
}调用方的使用方式
@Service
public class ProductAiService {
@Autowired
private PriorityLlmRequestQueue requestQueue;
/**
* A类:用户主动提问,高优先级,超时5秒
*/
public String answerUserQuestion(String question, String productContext) {
LlmRequest request = LlmRequest.builder()
.prompt(buildQuestionPrompt(question, productContext))
.priority(Priority.CRITICAL)
.fallbackResponse("抱歉,当前咨询量较大,请稍后重试。")
.build();
try {
return requestQueue.submit(request).get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.warn("User question timed out, returning fallback");
return "当前咨询量较大,我的回答可能需要稍等片刻,请稍候再试。";
}
}
/**
* B类:商品自动分析,普通优先级,超时15秒
*/
public CompletableFuture<String> analyzeProduct(String productId, String productInfo) {
LlmRequest request = LlmRequest.builder()
.prompt(buildAnalysisPrompt(productInfo))
.priority(Priority.NORMAL)
.fallbackResponse(null) // 返回null表示没有分析结果,前端不显示AI模块
.build();
return requestQueue.submit(request);
}
/**
* C类:搜索摘要,低优先级,可接受失败
*/
public void generateSearchSummaryAsync(String productId, String productBrief) {
LlmRequest request = LlmRequest.builder()
.prompt(buildSummaryPrompt(productBrief))
.priority(Priority.LOW)
.fallbackResponse("") // 降级时返回空字符串,前端不显示摘要
.build();
requestQueue.submit(request)
.thenAccept(summary -> {
if (!summary.isEmpty()) {
summaryCache.put(productId, summary, Duration.ofMinutes(30));
}
})
.exceptionally(e -> {
log.debug("Search summary generation failed for product {}: {}", productId, e.getMessage());
return null;
});
}
}策略二:多级缓存,大幅削减实际请求量
分析我们的历史数据发现,很多用户在问同样或类似的问题。比如同一款手机,会有很多用户都问"这款手机适合打游戏吗"。
我们设计了两级缓存:
第一级:精确缓存(问题+商品ID完全相同)
@Service
public class LlmResponseCache {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 精确缓存:相同的productId + question,直接命中
*/
public Optional<String> getExactMatch(String productId, String question) {
String key = "llm:exact:" + productId + ":" + DigestUtils.md5Hex(question);
String cached = redisTemplate.opsForValue().get(key);
return Optional.ofNullable(cached);
}
public void putExactMatch(String productId, String question, String answer) {
String key = "llm:exact:" + productId + ":" + DigestUtils.md5Hex(question);
// 商品问答缓存1小时(商品信息基本不变)
redisTemplate.opsForValue().set(key, answer, Duration.ofHours(1));
}
}第二级:语义缓存(相似问题共用一个答案)
这是更有意思的一层。用户可能会用不同的方式问同一个问题,比如"这手机能打游戏吗"和"这款手机游戏性能怎么样",语义上是一样的,但文字不同。
@Service
public class SemanticResponseCache {
@Autowired
private EmbeddingService embeddingService;
@Autowired
private VectorStoreClient vectorStore;
private static final String CACHE_COLLECTION = "response_cache";
private static final float SIMILARITY_THRESHOLD = 0.92f; // 相似度阈值
public Optional<String> findSimilar(String productId, String question) {
float[] questionVector = embeddingService.embed(question);
// 只在同一个商品的缓存中搜索
Map<String, Object> filter = Map.of("product_id", productId);
List<SearchResult> results = vectorStore.search(
CACHE_COLLECTION, questionVector, 1, filter
);
if (results.isEmpty()) return Optional.empty();
SearchResult top = results.get(0);
if (top.getScore() >= SIMILARITY_THRESHOLD) {
log.debug("Semantic cache hit for product {}, similarity={}", productId, top.getScore());
return Optional.of(top.getMetadata().get("answer").toString());
}
return Optional.empty();
}
public void put(String productId, String question, String answer) {
float[] questionVector = embeddingService.embed(question);
Map<String, Object> metadata = Map.of(
"product_id", productId,
"question", question,
"answer", answer,
"created_at", System.currentTimeMillis()
);
vectorStore.upsert(CACHE_COLLECTION, UUID.randomUUID().toString(), questionVector, metadata);
}
}语义缓存的引入,让重复问题的命中率从30%提升到了62%。这意味着实际发到OpenAI的请求量减少了将近一半。
策略三:备用模型切换
只依赖一个模型提供商是很脆弱的。我们预备了两个备选方案:
- 降级到GPT-3.5-turbo:速度更快,价格更低,质量略差但可接受
- 降级到本地部署的Qwen模型:完全不受外部限流影响,但质量更有限
@Service
public class AdaptiveLlmRouter {
private final Map<String, LlmClient> clients;
// 当前主用模型
private volatile String primaryModel = "gpt-4o";
// 各模型的实时错误率(滑动窗口)
private final Map<String, SlidingWindowCounter> errorRates = new ConcurrentHashMap<>();
public String complete(String prompt, Priority priority) {
String modelToUse = selectModel(priority);
try {
String response = clients.get(modelToUse).complete(prompt);
errorRates.get(modelToUse).recordSuccess();
return response;
} catch (RateLimitException e) {
// 限流了,升级降级到下一个模型
String fallbackModel = getFallbackModel(modelToUse);
log.warn("Model {} rate limited, falling back to {}", modelToUse, fallbackModel);
errorRates.get(modelToUse).recordError();
return clients.get(fallbackModel).complete(prompt);
} catch (Exception e) {
errorRates.get(modelToUse).recordError();
throw e;
}
}
private String selectModel(Priority priority) {
// 高优先级请求始终用最好的模型(只要不超限)
if (priority == Priority.CRITICAL) {
double errorRate = errorRates.get("gpt-4o").getRate();
return errorRate < 0.3 ? "gpt-4o" : "gpt-3.5-turbo";
}
// 普通和低优先级,根据当前负载选择
if (isHighLoad()) {
return priority == Priority.NORMAL ? "gpt-3.5-turbo" : "qwen-local";
}
return "gpt-4o";
}
private boolean isHighLoad() {
// 检查队列积压情况
return requestQueue.getNormalQueueSize() > 500
|| requestQueue.getLowQueueSize() > 2000;
}
private String getFallbackModel(String current) {
return switch (current) {
case "gpt-4o" -> "gpt-3.5-turbo";
case "gpt-3.5-turbo" -> "qwen-local";
default -> "qwen-local";
};
}
}双十一当天:实际发生了什么
凌晨零点,流量开始上来。
我们在监控大盘前坐了一整夜。
0:00-0:05:流量快速攀升,RPM从日常的800冲到了8000。精确缓存命中率85%,很多预热的商品问题直接命中缓存。
0:05-0:15:RPM超过了12000,超出了我们设定的安全阈值(5000×0.8=4000 RPS)。优先级队列开始工作,C类请求(搜索摘要)开始积压并降级。
0:15-0:30:OpenAI出现了第一次429限流响应。我们的AdaptiveLlmRouter捕获到了这个异常,部分B类请求自动切换到了GPT-3.5-turbo。
0:30:峰值到了。实测RPM达到18000,但因为缓存的存在,实际打到OpenAI的RPM只有约3500,在阈值之内。A类请求的P95响应时间是3.8秒,卡在了5秒边界以内。
整个双十一期间,A类请求(用户主动提问)的成功率是97.3%,有2.7%的请求因为队列等待超时返回了降级文案。这个结果比预期要好。
事后总结:几个关键的决策点
决策一:提前做缓存预热,而不是等用户触发。
双十一前两天,我们用爬虫把所有促销商品的常见问题提前跑了一遍,把结果存进缓存。这个预热让双十一当天的缓存命中率高出了约20%。
决策二:限流的单位要细化到模型和用户维度。
不同模型有不同的限额,粗粒度的限流会导致某个模型明明还有余量但没被充分利用。此外,我们给重度用户设置了单用户的请求限速(每分钟最多20个AI请求),防止个别用户消耗掉大量配额。
决策三:降级要做得优雅,不能让用户感受到明显的体验断层。
C类请求降级时,前端直接隐藏AI摘要模块,而不是显示错误信息。B类请求降级到GPT-3.5时,对用户完全透明,回答质量差异在商品导购场景下用户几乎感受不到。
限流不是"挡住用户",是"在资源有限时做出最优的用户体验决策"。这两种思路出来的方案,差别很大。
