第1764篇:AI辅助的容量规划——预测性扩缩容的工程实现
第1764篇:AI辅助的容量规划——预测性扩缩容的工程实现
容量规划这件事,以前我们公司是怎么做的?
每个季度开一次会,业务方说"下个季度我们的流量预计增长30%",基础设施团队就在现有资源基础上乘以一个安全系数(通常是1.5),然后去采购服务器或者扩Kubernetes节点。
这种方式有两个问题:一是业务方的预估往往不准,30%说不定实际是300%(大促)或者-20%(业务收缩);二是采购周期长,等到服务器到货,黄花菜都凉了。
云时代有了弹性伸缩,但HPA(Horizontal Pod Autoscaler)那套基于当前指标的反应式扩容,在流量突发时依然有一段明显的扩容滞后期。
真正解决问题的,是预测性扩缩容——在流量到来之前就把资源准备好。
预测性扩缩容的核心挑战
这个问题比看起来难,原因在于:
挑战1:流量模式的多因素性
影响流量的因素很多:时间(每天/每周周期)、运营活动(大促、推送)、外部事件(新闻热点)、季节性(节假日)。单纯靠时序模型预测,一旦有运营活动就会大幅偏差。
挑战2:扩容时机的把握
扩容太早浪费资源,扩容太晚影响用户体验。这个"提前量"的计算,需要知道当前服务的扩容耗时、冷启动时间、请求队列的承受能力。
挑战3:多服务的协同规划
一个请求链路上有多个服务,扩容必须协同进行,不能只扩前端忘了数据库连接池,或者只扩API网关忘了下游RPC服务。
这三个挑战,都需要AI来帮忙。
整体架构
流量预测模型
我们采用Prophet+LSTM的混合预测方案,Prophet处理周期性和假期效果,LSTM捕捉短期模式和近期趋势。
@Service
@Slf4j
public class TrafficForecastService {
@Value("${forecast.prophet.api-url}")
private String prophetApiUrl; // Prophet模型部署为独立服务(Python实现)
@Autowired
private PrometheusQueryService prometheusService;
@Autowired
private BusinessCalendarService calendarService;
@Data
public static class ForecastResult {
private String serviceName;
private String metricName;
private List<ForecastPoint> predictions;
private double confidenceLevel; // 0-1
private String modelUsed;
}
@Data
@AllArgsConstructor
public static class ForecastPoint {
private Instant timestamp;
private double predictedValue;
private double lowerBound; // 90%置信区间下界
private double upperBound; // 90%置信区间上界
}
public ForecastResult forecast(String serviceName,
String metricName,
Duration forecastHorizon) {
// 1. 拉取历史数据(过去30天)
List<DataPoint> historicalData = prometheusService.queryRange(
String.format("sum(rate(%s{job=\"%s\"}[5m]))", metricName, serviceName),
Instant.now().minus(Duration.ofDays(30)),
Instant.now(),
Duration.ofMinutes(5)
);
// 2. 获取未来的节假日和运营活动信息
List<CalendarEvent> events = calendarService.getEvents(
Instant.now(),
Instant.now().plus(forecastHorizon)
);
// 3. 调用Prophet预测服务
ProphetForecastRequest prophetReq = ProphetForecastRequest.builder()
.historicalData(toTimeSeries(historicalData))
.futureHorizonHours((int) forecastHorizon.toHours())
.holidays(toHolidayDf(events))
.build();
ProphetForecastResponse prophetResp = callProphetService(prophetReq);
// 4. 如果置信度不高(如近期数据模式有明显变化),切换到LSTM
if (prophetResp.getConfidence() < 0.7) {
log.info("Prophet置信度不足({}), 切换LSTM预测", prophetResp.getConfidence());
return forecastWithLSTM(serviceName, metricName, historicalData, forecastHorizon);
}
return convertResult(prophetResp, serviceName, metricName);
}
// 基于运营活动的修正
public ForecastResult applyActivityMultiplier(ForecastResult base,
List<CalendarEvent> events) {
List<ForecastPoint> adjusted = new ArrayList<>(base.getPredictions());
for (CalendarEvent event : events) {
if (event.getType() == EventType.PROMOTION) {
// 大促期间,按历史同类活动的实际增幅修正预测值
double multiplier = event.getHistoricalMultiplier();
adjusted = adjusted.stream()
.filter(p -> isInTimeRange(p.getTimestamp(), event))
.map(p -> new ForecastPoint(
p.getTimestamp(),
p.getPredictedValue() * multiplier,
p.getLowerBound() * multiplier * 0.8, // 大促不确定性更大
p.getUpperBound() * multiplier * 1.2
))
.collect(Collectors.toList());
}
}
base.setPredictions(adjusted);
return base;
}
}Prophet Python服务(关键组件)
# prophet_service.py - 部署为独立Flask服务
from prophet import Prophet
import pandas as pd
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/forecast', methods=['POST'])
def forecast():
data = request.json
# 构建DataFrame
df = pd.DataFrame({
'ds': pd.to_datetime(data['timestamps']),
'y': data['values']
})
# 节假日配置
holidays = None
if data.get('holidays'):
holidays = pd.DataFrame(data['holidays'])
# 训练Prophet模型
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=True,
holidays=holidays,
changepoint_prior_scale=0.05, # 控制趋势变化的灵活性
seasonality_prior_scale=10.0 # 控制季节性的强度
)
# 添加自定义季节性(比如每天业务高峰期的模式)
model.add_seasonality(name='business_hours', period=1, fourier_order=8)
model.fit(df)
# 生成预测
future = model.make_future_dataframe(periods=data['future_hours'], freq='H')
forecast = model.predict(future)
# 只返回未来部分
future_forecast = forecast[forecast['ds'] > df['ds'].max()]
confidence = calculate_confidence(df, model)
return jsonify({
'timestamps': future_forecast['ds'].astype(str).tolist(),
'yhat': future_forecast['yhat'].tolist(),
'yhat_lower': future_forecast['yhat_lower'].tolist(),
'yhat_upper': future_forecast['yhat_upper'].tolist(),
'confidence': confidence
})
def calculate_confidence(df, model):
"""基于历史数据的CV误差评估预测置信度"""
from sklearn.model_selection import TimeSeriesSplit
errors = []
tscv = TimeSeriesSplit(n_splits=5)
for train_idx, val_idx in tscv.split(df):
train_df = df.iloc[train_idx]
val_df = df.iloc[val_idx]
m = Prophet()
m.fit(train_df)
future = m.make_future_dataframe(periods=len(val_df), freq='5min')
pred = m.predict(future)
mape = ((val_df['y'] - pred['yhat'].tail(len(val_df))).abs() /
(val_df['y'] + 1e-9)).mean()
errors.append(mape)
avg_mape = sum(errors) / len(errors)
return max(0, 1 - avg_mape) # MAPE转换为置信度分
if __name__ == '__main__':
app.run(port=8091)资源需求计算
预测出流量之后,要计算出需要多少资源。这需要服务的性能基线数据。
@Service
public class ResourceCalculator {
@Data
public static class ServicePerformanceBaseline {
private String serviceName;
private double cpuPerRequest; // 每个请求消耗的CPU(毫核)
private double memoryPerInstance; // 每个实例的内存(MB)
private int maxQpsPerInstance; // 每个实例最大QPS
private Duration coldStartTime; // 冷启动时间
private Duration scaleOutLatency; // 从触发到新实例就绪的延迟
}
@Data
@Builder
public static class ScalingPlan {
private String serviceName;
private Instant effectiveTime; // 什么时候需要完成扩容
private Instant actionTime; // 什么时候开始扩容(effectiveTime - scaleOutLatency)
private int currentReplicas;
private int targetReplicas;
private ScalingReason reason;
private double expectedPeakQps;
}
public List<ScalingPlan> calculateScalingPlans(
Map<String, ForecastResult> forecasts,
Map<String, ServicePerformanceBaseline> baselines,
ServiceTopology topology) {
List<ScalingPlan> plans = new ArrayList<>();
for (Map.Entry<String, ForecastResult> entry : forecasts.entrySet()) {
String serviceName = entry.getKey();
ForecastResult forecast = entry.getValue();
ServicePerformanceBaseline baseline = baselines.get(serviceName);
if (baseline == null) {
log.warn("服务{}缺少性能基线数据,跳过规划", serviceName);
continue;
}
// 找到预测中的峰值时间点
ForecastPoint peak = forecast.getPredictions().stream()
.max(Comparator.comparingDouble(ForecastPoint::getUpperBound))
.orElseThrow();
// 使用置信区间上界作为规划依据(悲观估计,保障性能)
double planningQps = peak.getUpperBound();
// 计算所需实例数
int requiredInstances = (int) Math.ceil(
planningQps / baseline.getMaxQpsPerInstance() * 1.2 // 20%余量
);
int currentInstances = getCurrentReplicas(serviceName);
if (requiredInstances > currentInstances) {
// 需要扩容
Instant effectiveTime = peak.getTimestamp();
Instant actionTime = effectiveTime
.minus(baseline.getScaleOutLatency())
.minus(Duration.ofMinutes(5)); // 额外提前5分钟
plans.add(ScalingPlan.builder()
.serviceName(serviceName)
.effectiveTime(effectiveTime)
.actionTime(actionTime)
.currentReplicas(currentInstances)
.targetReplicas(requiredInstances)
.reason(ScalingReason.PREDICTED_TRAFFIC_INCREASE)
.expectedPeakQps(planningQps)
.build());
}
}
// 根据服务拓扑,确保依赖链路上的服务协同扩容
return reconcileWithTopology(plans, topology);
}
private List<ScalingPlan> reconcileWithTopology(List<ScalingPlan> plans,
ServiceTopology topology) {
// 如果上游服务要扩容,检查下游依赖是否也需要扩
Set<String> servicesNeedingScale = plans.stream()
.map(ScalingPlan::getServiceName)
.collect(Collectors.toSet());
List<ScalingPlan> additionalPlans = new ArrayList<>();
for (ScalingPlan plan : plans) {
List<String> downstreams = topology.getDownstreamServices(plan.getServiceName());
for (String downstream : downstreams) {
if (!servicesNeedingScale.contains(downstream)) {
// 检查下游服务在扩容后的上游流量下能否承受
double expectedDownstreamQps = plan.getExpectedPeakQps()
* topology.getCallRatio(plan.getServiceName(), downstream);
ServicePerformanceBaseline downstreamBaseline =
getBaseline(downstream);
if (downstreamBaseline != null) {
int currentDownstreamReplicas = getCurrentReplicas(downstream);
double currentCapacity = currentDownstreamReplicas *
downstreamBaseline.getMaxQpsPerInstance();
if (expectedDownstreamQps > currentCapacity * 0.8) {
// 下游服务也需要扩容
int targetReplicas = (int) Math.ceil(
expectedDownstreamQps / downstreamBaseline.getMaxQpsPerInstance() * 1.2
);
additionalPlans.add(ScalingPlan.builder()
.serviceName(downstream)
.effectiveTime(plan.getEffectiveTime())
.actionTime(plan.getActionTime().minus(Duration.ofMinutes(2)))
.currentReplicas(currentDownstreamReplicas)
.targetReplicas(targetReplicas)
.reason(ScalingReason.CASCADE_FROM_UPSTREAM)
.expectedPeakQps(expectedDownstreamQps)
.build());
}
}
}
}
}
plans.addAll(additionalPlans);
return plans;
}
}LLM审阅扩容计划
生成扩容计划后,用LLM做一次全面的审阅,输出可读的分析报告。
@Service
public class ScalingPlanReviewer {
@Autowired
private OpenAiService openAiService;
public ScalingPlanReview review(List<ScalingPlan> plans,
Map<String, ForecastResult> forecasts) {
String planSummary = buildPlanSummary(plans, forecasts);
String prompt = String.format("""
以下是一份基于流量预测生成的扩容计划,请你进行专业审阅:
%s
请从以下角度进行评估,并以JSON格式输出:
1. 扩容时机是否合理(提前量是否足够,是否过于保守导致资源浪费)
2. 扩容幅度是否合理(是否与流量预测匹配,是否有漏扩的服务)
3. 扩容顺序是否合理(是否考虑了服务依赖关系)
4. 风险点(哪些服务的扩容存在潜在问题)
5. 优化建议(可以优化成本效率的地方)
输出JSON字段:overallAssessment, timingAnalysis, capacityAnalysis,
riskPoints, optimizationSuggestions, recommendedAdjustments
""", planSummary);
ChatCompletionRequest request = ChatCompletionRequest.builder()
.model("gpt-4o")
.messages(List.of(
new ChatMessage("system", "你是一位资深的云原生架构师和容量规划专家。"),
new ChatMessage("user", prompt)
))
.temperature(0.1)
.responseFormat(new ResponseFormat("json_object"))
.build();
String response = openAiService.createChatCompletion(request)
.getChoices().get(0).getMessage().getContent();
return parseReview(response);
}
private String buildPlanSummary(List<ScalingPlan> plans,
Map<String, ForecastResult> forecasts) {
StringBuilder sb = new StringBuilder();
sb.append("扩容计划摘要:\n\n");
for (ScalingPlan plan : plans) {
sb.append(String.format(
"服务: %s\n操作时间: %s\n当前副本数: %d → 目标副本数: %d\n" +
"扩容原因: %s\n预期峰值QPS: %.0f\n\n",
plan.getServiceName(),
plan.getActionTime(),
plan.getCurrentReplicas(),
plan.getTargetReplicas(),
plan.getReason().getDescription(),
plan.getExpectedPeakQps()
));
}
sb.append("\n流量预测详情:\n");
forecasts.forEach((service, forecast) -> {
ForecastPoint peak = forecast.getPredictions().stream()
.max(Comparator.comparingDouble(ForecastPoint::getPredictedValue))
.orElseThrow();
sb.append(String.format("服务%s预测峰值: %.0f rps @ %s (置信度: %.1f%%)\n",
service, peak.getPredictedValue(), peak.getTimestamp(),
forecast.getConfidenceLevel() * 100));
});
return sb.toString();
}
}自动执行层:调整Kubernetes HPA
@Service
@Slf4j
public class KubernetesScalingExecutor {
@Autowired
private KubernetesClient k8sClient;
public void executeScalingPlan(ScalingPlan plan) {
log.info("执行扩容计划: service={}, {} -> {} 副本",
plan.getServiceName(), plan.getCurrentReplicas(), plan.getTargetReplicas());
// 方案一:直接调整Deployment副本数(立即生效)
if (plan.getReason() == ScalingReason.PREDICTED_TRAFFIC_INCREASE) {
scaleDeployment(plan);
}
// 同时调整HPA的minReplicas,防止HPA自动缩容到不够用的数量
adjustHpaMinReplicas(plan);
// 安排自动回收(峰值后4小时恢复原副本数)
scheduleScaleDown(plan, Duration.ofHours(4));
}
private void scaleDeployment(ScalingPlan plan) {
k8sClient.apps().deployments()
.inNamespace("production")
.withName(plan.getServiceName())
.scale(plan.getTargetReplicas());
// 等待新Pod就绪
waitForPodsReady(plan.getServiceName(), plan.getTargetReplicas());
}
private void adjustHpaMinReplicas(ScalingPlan plan) {
HorizontalPodAutoscaler hpa = k8sClient.autoscaling().v2()
.horizontalPodAutoscalers()
.inNamespace("production")
.withName(plan.getServiceName() + "-hpa")
.get();
if (hpa != null) {
// 临时提升HPA的minReplicas
k8sClient.autoscaling().v2()
.horizontalPodAutoscalers()
.inNamespace("production")
.withName(plan.getServiceName() + "-hpa")
.edit(h -> {
h.getSpec().setMinReplicas(plan.getTargetReplicas());
return h;
});
}
}
private void waitForPodsReady(String serviceName, int targetReplicas) {
int maxWaitSeconds = 120;
int waited = 0;
while (waited < maxWaitSeconds) {
Deployment deployment = k8sClient.apps().deployments()
.inNamespace("production")
.withName(serviceName)
.get();
if (deployment != null &&
deployment.getStatus().getReadyReplicas() != null &&
deployment.getStatus().getReadyReplicas() >= targetReplicas) {
log.info("服务{}已扩容完成,就绪副本数: {}",
serviceName, deployment.getStatus().getReadyReplicas());
return;
}
try {
Thread.sleep(5000);
waited += 5;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
log.warn("服务{}扩容等待超时,当前状态需要人工检查", serviceName);
}
}实际效果与经验总结
上线三个月后,和以前的反应式HPA相比:
- 大促期间的服务扩容成功率:从92%提升到99.7%(之前每次大促都有几个服务因为扩容不及时出现短时超时)
- 资源平均利用率:从45%提升到67%(预测式扩容避免了过度提前扩容,也减少了峰后的资源浪费)
- 扩容操作的人工干预次数:从每月平均23次降到4次
最大的教训是:流量预测模型的准确性直接决定系统价值。一次预测偏差50%导致扩容不足,比反应式HPA还差。所以我们建立了预测精度的持续监控,当某个服务的预测误差连续3次超过30%时,自动降级为保守的规则式扩容(取过去7天同时段最大值×1.5),等模型恢复精度再切回来。
