第1657篇:AI应用的水平扩展策略——基于请求队列深度的自动伸缩
第1657篇:AI应用的水平扩展策略——基于请求队列深度的自动伸缩
做了两年AI服务,我对"弹性扩缩容"这个词的理解越来越保守了。
刚开始的时候,我觉得只要配上HPA,服务就能自动处理流量波动,躺着就行。后来发现,HPA的配置如果不考虑AI服务的特殊性,要么扩容太慢(高峰期还没扩上来,请求就超时了),要么缩容太激进(正在处理的推理请求被强制中断,用户端直接报错)。
这篇文章讲的是AI应用水平扩展里最核心的一个决策:用什么指标来驱动扩缩容。我会从CPU指标的局限说起,到自定义队列指标的实现,再到扩缩容策略的精细化调优。
CPU指标为什么不适合驱动AI服务扩缩容
先做个实验。拿一个7B参数量的LLM推理服务,在以下几种场景下观察CPU和队列的变化:
场景A:100个用户同时发来很短的问题("今天天气如何?"),每个请求生成20个token。 场景B:100个用户同时发来很长的问题(附带2000字的文档让模型总结),每个请求生成500个token。
从用户视角看,两种场景的并发量是一样的——都是100个并发请求。但实际的资源消耗差了10倍以上,队列里积压的时间也差了10倍。
CPU指标对这两种场景的反应是:场景A下CPU可能60-70%,场景B下CPU可能80-90%,差距不大(GPU在跑,CPU反而不是瓶颈)。
如果用CPU驱动HPA,两个场景的扩容决策会差不多,但实际上场景B明显需要更多实例来消化积压。
队列深度指标则能直接反映"有多少工作在等待处理",与请求的具体大小无关。队列深一点,扩容;队列浅了,缩容。这是最符合AI推理服务特性的指标。
构建请求队列深度的监控体系
Java层面的队列实现
AI推理服务的请求处理通常是这样的流程:
- 请求进来,放入队列
- 工作线程从队列取请求,进行推理
- 推理完成,返回结果
我们需要一个可监控的队列实现:
@Component
@Slf4j
public class InferenceRequestQueue {
// 有界阻塞队列,防止无限积压
private final BlockingQueue<InferenceTask> pendingQueue;
private final AtomicInteger activeCount = new AtomicInteger(0);
private final AtomicInteger completedCount = new AtomicInteger(0);
private final AtomicInteger rejectedCount = new AtomicInteger(0);
// 滑动窗口统计延迟
private final SlidingWindowLatencyTracker latencyTracker =
new SlidingWindowLatencyTracker(1000); // 最近1000个请求
public InferenceRequestQueue(
@Value("${inference.queue.max-size:200}") int maxQueueSize) {
this.pendingQueue = new ArrayBlockingQueue<>(maxQueueSize);
}
/**
* 提交推理请求到队列
* @return CompletableFuture,完成后包含推理结果
*/
public CompletableFuture<InferenceResult> submit(InferenceRequest request) {
CompletableFuture<InferenceResult> future = new CompletableFuture<>();
InferenceTask task = new InferenceTask(request, future, System.currentTimeMillis());
boolean offered = pendingQueue.offer(task);
if (!offered) {
// 队列满了,拒绝请求
rejectedCount.incrementAndGet();
future.completeExceptionally(
new QueueFullException("推理队列已满,请稍后重试。当前队列深度: " +
pendingQueue.size()));
}
return future;
}
/**
* 工作线程调用:从队列取出下一个任务
*/
public InferenceTask poll(long timeout, TimeUnit unit) throws InterruptedException {
InferenceTask task = pendingQueue.poll(timeout, unit);
if (task != null) {
activeCount.incrementAndGet();
}
return task;
}
/**
* 工作线程调用:通知任务完成
*/
public void taskCompleted(InferenceTask task, InferenceResult result) {
activeCount.decrementAndGet();
completedCount.incrementAndGet();
long latency = System.currentTimeMillis() - task.getSubmitTime();
latencyTracker.record(latency);
task.getFuture().complete(result);
}
public void taskFailed(InferenceTask task, Throwable error) {
activeCount.decrementAndGet();
task.getFuture().completeExceptionally(error);
}
// 各种指标的getter,供Micrometer收集
public int getPendingCount() { return pendingQueue.size(); }
public int getActiveCount() { return activeCount.get(); }
public int getCompletedCount() { return completedCount.get(); }
public int getRejectedCount() { return rejectedCount.get(); }
public double getQueueDepth() {
// 有效队列深度 = 等待中的 + 正在处理的(都需要资源)
return pendingQueue.size() + activeCount.get();
}
public double getLatencyP99() {
return latencyTracker.getPercentile(0.99);
}
public boolean isAcceptingRequests() {
return pendingQueue.remainingCapacity() > 10; // 留10个位置的安全余量
}
}工作线程的实现:
@Component
@Slf4j
public class InferenceWorker {
private final InferenceRequestQueue queue;
private final InferenceEngine inferenceEngine;
private final ExecutorService workerPool;
private volatile boolean running = true;
public InferenceWorker(InferenceRequestQueue queue,
InferenceEngine inferenceEngine,
@Value("${inference.worker.threads:2}") int workerThreads) {
this.queue = queue;
this.inferenceEngine = inferenceEngine;
this.workerPool = Executors.newFixedThreadPool(workerThreads,
new ThreadFactoryBuilder().setNameFormat("inference-worker-%d").build());
}
@PostConstruct
public void startWorkers() {
int threadCount = ((ThreadPoolExecutor) workerPool).getCorePoolSize();
for (int i = 0; i < threadCount; i++) {
workerPool.submit(this::workerLoop);
}
log.info("已启动{}个推理工作线程", threadCount);
}
private void workerLoop() {
log.info("推理工作线程启动: {}", Thread.currentThread().getName());
while (running) {
try {
InferenceTask task = queue.poll(1, TimeUnit.SECONDS);
if (task == null) continue;
// 检查任务是否已超时(在队列里等太久)
long waitTime = System.currentTimeMillis() - task.getSubmitTime();
if (waitTime > task.getRequest().getTimeoutMs()) {
queue.taskFailed(task,
new TimeoutException("请求在队列中等待超时: " + waitTime + "ms"));
continue;
}
try {
InferenceResult result = inferenceEngine.infer(task.getRequest());
queue.taskCompleted(task, result);
} catch (Exception e) {
log.error("推理执行失败", e);
queue.taskFailed(task, e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
log.info("推理工作线程退出: {}", Thread.currentThread().getName());
}
@PreDestroy
public void stopWorkers() {
running = false;
workerPool.shutdown();
try {
if (!workerPool.awaitTermination(120, TimeUnit.SECONDS)) {
workerPool.shutdownNow();
}
} catch (InterruptedException e) {
workerPool.shutdownNow();
}
}
}把队列指标暴露给Prometheus
@Component
public class InferenceQueueMetricsExporter {
public InferenceQueueMetricsExporter(InferenceRequestQueue queue,
MeterRegistry registry) {
// 队列深度(等待 + 处理中)
Gauge.builder("ai.inference.queue.depth")
.description("推理请求总队列深度(等待+处理中)")
.register(registry, queue, InferenceRequestQueue::getQueueDepth);
// 等待队列深度(只算等待中的)
Gauge.builder("ai.inference.queue.pending")
.description("等待处理的推理请求数")
.register(registry, queue, q -> (double) q.getPendingCount());
// 活跃处理中的请求数
Gauge.builder("ai.inference.queue.active")
.description("当前正在处理的推理请求数")
.register(registry, queue, q -> (double) q.getActiveCount());
// P99延迟
Gauge.builder("ai.inference.latency.p99.ms")
.description("推理请求P99延迟(毫秒)")
.register(registry, queue, InferenceRequestQueue::getLatencyP99);
// 被拒绝的请求(队列满)
Counter.builder("ai.inference.requests.rejected")
.description("因队列满而被拒绝的请求数")
.register(registry);
}
}配置Prometheus Adapter和HPA
有了Prometheus指标,接下来让HPA能读到这些自定义指标。
Prometheus Adapter配置
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-adapter-config
namespace: monitoring
data:
config.yaml: |
rules:
# 队列深度指标
- seriesQuery: 'ai_inference_queue_depth{namespace!="",pod!=""}'
resources:
overrides:
namespace:
resource: namespace
pod:
resource: pod
name:
matches: "ai_inference_queue_depth"
as: "inference_queue_depth_per_pod"
metricsQuery: >
avg(ai_inference_queue_depth{<<.LabelMatchers>>}) by (<<.GroupBy>>)
# P99延迟指标
- seriesQuery: 'ai_inference_latency_p99_ms{namespace!="",pod!=""}'
resources:
overrides:
namespace:
resource: namespace
pod:
resource: pod
name:
matches: "ai_inference_latency_p99_ms"
as: "inference_latency_p99_ms"
metricsQuery: >
max(ai_inference_latency_p99_ms{<<.LabelMatchers>>}) by (<<.GroupBy>>)HPA配置
现在来配置一个多指标驱动的HPA:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ai-inference-hpa
namespace: ai-prod
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ai-inference-service
minReplicas: 2
maxReplicas: 20
metrics:
# 主要指标:队列深度
# 每个Pod最多处理5个排队中的请求,超过就扩容
- type: Pods
pods:
metric:
name: inference_queue_depth_per_pod
target:
type: AverageValue
averageValue: "5"
# 辅助指标:CPU(防止CPU真正成为瓶颈时无法感知)
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# P99延迟过高时也触发扩容
- type: Pods
pods:
metric:
name: inference_latency_p99_ms
target:
type: AverageValue
averageValue: "10000" # P99超过10秒就扩容
behavior:
scaleUp:
# 扩容策略:稳定窗口30秒(避免频繁抖动),每次最多扩4个Pod
stabilizationWindowSeconds: 30
selectPolicy: Max
policies:
- type: Pods
value: 4
periodSeconds: 60
- type: Percent
value: 50 # 或者扩容50%(取较大值)
periodSeconds: 60
scaleDown:
# 缩容策略:稳定窗口10分钟(AI服务缩容要保守)
stabilizationWindowSeconds: 600
selectPolicy: Min
policies:
- type: Pods
value: 1 # 每次最多缩容1个Pod
periodSeconds: 120扩缩容行为的深入理解
behavior.scaleUp.stabilizationWindowSeconds: 30的含义是:在过去30秒内,HPA会取所有"建议副本数"的最大值作为实际目标。也就是说,扩容决策要稳定30秒(期间一直显示需要扩容)才会真正执行。这防止了短暂的流量尖峰触发不必要的扩容。
behavior.scaleDown.stabilizationWindowSeconds: 600的含义是:缩容前必须稳定10分钟。这对AI服务很重要——模型加载本来就慢,如果缩容后很快又需要扩容,那之前缩容的代价(冷启动延迟)就白费了。
高级策略:预测性扩容
被动式扩缩容(指标超阈值才扩容)有个天然的滞后性:需要等队列积压到阈值,才会触发扩容,而这段时间用户已经在感受延迟了。
更好的做法是预测性扩容:在业务高峰到来之前,就提前扩好容。
基于历史规律的定时扩容
很多AI服务的流量有明显的时间规律(工作日高峰、午间高峰)。可以用Kubernetes的KEDA(Kubernetes Event-Driven Autoscaler)的Cron Scaler来实现定时预扩容:
# 安装KEDA后,用ScaledObject替代HPA
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: ai-inference-scaledobject
namespace: ai-prod
spec:
scaleTargetRef:
name: ai-inference-service
minReplicaCount: 2
maxReplicaCount: 20
cooldownPeriod: 300 # 缩容冷却5分钟
triggers:
# 队列深度触发器(reactive)
- type: prometheus
metadata:
serverAddress: http://prometheus-server.monitoring:9090
metricName: inference_queue_depth
threshold: "5"
query: >
avg(ai_inference_queue_depth{namespace="ai-prod"})
# 定时触发器(proactive)
- type: cron
metadata:
timezone: Asia/Shanghai
start: 30 8 * * 1-5 # 工作日早上8:30
end: 00 10 * * 1-5 # 到10:00
desiredReplicas: "10" # 这段时间维持10个副本
- type: cron
metadata:
timezone: Asia/Shanghai
start: 00 13 * * 1-5 # 工作日下午1点(午后高峰)
end: 00 15 * * 1-5
desiredReplicas: "8"KEDA会取所有触发器中要求副本数最高的值,也就是说定时触发器和队列触发器是"取最大"的关系,不会冲突。
基于流量预测的动态预热
更复杂一些,可以用机器学习预测未来流量,提前扩容:
@Component
@Slf4j
public class PredictiveScaler {
private final TrafficPredictor trafficPredictor;
private final KubernetesClient k8sClient;
// 每5分钟执行一次预测和调整
@Scheduled(fixedDelay = 300_000)
public void predictAndScale() {
// 预测未来15分钟的请求量
TrafficForecast forecast = trafficPredictor.predict(Duration.ofMinutes(15));
int predictedQps = forecast.getMaxQps();
// 根据预测QPS计算需要的副本数
// 假设每个Pod能处理5 QPS
int requiredReplicas = (int) Math.ceil(predictedQps / 5.0);
// 至少保持最小副本数,最多不超过最大副本数
int targetReplicas = Math.max(2, Math.min(20, requiredReplicas));
// 获取当前副本数
Deployment deployment = k8sClient.apps().deployments()
.inNamespace("ai-prod")
.withName("ai-inference-service")
.get();
int currentReplicas = deployment.getSpec().getReplicas();
// 只有预测需要的副本数 > 当前数量才主动扩容
// 缩容仍然交给正常的HPA流程(保守)
if (targetReplicas > currentReplicas) {
log.info("预测性扩容:预测15分钟内最大QPS={},当前副本={},目标副本={}",
predictedQps, currentReplicas, targetReplicas);
k8sClient.apps().deployments()
.inNamespace("ai-prod")
.withName("ai-inference-service")
.scale(targetReplicas);
}
}
}流量预测模型可以简单到用移动平均(对周期性规律的业务效果不错),也可以用Prophet等时间序列模型。对于大多数内部AI服务,简单的"同一时段的历史平均值"就够了。
一个容易忽视的细节:排队超时的处理
队列扩容再完善,也有处理不过来的时候。当队列积压严重时,后来的请求应该尽快失败,而不是继续排队等很久最终超时。
一个请求进入队列等了30秒,轮到它处理时,用户端可能已经重试了(触发了重复请求),或者已经放弃了。这时候再处理这个请求不但浪费GPU资源,还可能造成重复响应。
解法是在队列里做超时检查:
// 在InferenceWorker的工作循环里,处理前检查是否已超时
InferenceTask task = queue.poll(1, TimeUnit.SECONDS);
if (task == null) continue;
long waitTimeMs = System.currentTimeMillis() - task.getSubmitTime();
long maxWaitMs = task.getRequest().getMaxQueueWaitMs(); // 客户端设置的最大等待时间
if (waitTimeMs > maxWaitMs) {
log.warn("请求在队列中等待过久({} ms > {} ms),直接超时返回",
waitTimeMs, maxWaitMs);
queue.taskFailed(task,
new QueueTimeoutException(
String.format("请求排队超时,等待了%dms,最大允许%dms",
waitTimeMs, maxWaitMs)));
// 不处理这个请求,继续下一个
continue;
}同时在HTTP层面设置合理的响应超时,让客户端尽早知道结果,而不是一直等着:
@PostMapping("/v1/inference")
public ResponseEntity<?> infer(@RequestBody InferenceRequest request,
@RequestParam(defaultValue = "30000") long timeoutMs) {
// 设置合理的客户端最大等待时间
request.setMaxQueueWaitMs(Math.min(timeoutMs, 60_000)); // 最多60秒
try {
CompletableFuture<InferenceResult> future = inferenceQueue.submit(request);
InferenceResult result = future.get(timeoutMs, TimeUnit.MILLISECONDS);
return ResponseEntity.ok(result);
} catch (TimeoutException e) {
return ResponseEntity.status(503)
.body(Map.of("error", "推理请求超时,当前服务负载较高,请稍后重试",
"retryAfterSeconds", 5));
} catch (ExecutionException e) {
if (e.getCause() instanceof QueueFullException) {
return ResponseEntity.status(429)
.body(Map.of("error", "服务繁忙,请稍后重试"));
}
return ResponseEntity.status(500)
.body(Map.of("error", "推理失败:" + e.getCause().getMessage()));
}
}总结
AI服务的水平扩展,核心在于用对指标。队列深度指标比CPU/内存指标更能反映AI推理服务的真实负载,是驱动HPA的首选。
扩缩容行为的精细化调优:
- 扩容要快,稳定窗口短,允许快速增加副本
- 缩容要慢,稳定窗口长(至少5-10分钟),给AI服务足够缓冲
- 结合定时扩容做预测性伸缩,在高峰到来前预热好实例
队列实现上:队列要有界(防止无限积压消耗内存)、要有超时(拒绝已无意义的超期请求)、要暴露充足的监控指标(给扩缩容决策提供数据基础)。
这些东西加起来,能让AI服务在流量波动时表现得更平稳,避免高峰期的请求堆积和低谷期的资源浪费。
