第1954篇:AI应用的压测方法——模拟真实流量分布的负载测试方案
第1954篇:AI应用的压测方法——模拟真实流量分布的负载测试方案
去年我们上线了一个AI客服系统,性能测试阶段做了压测,500并发稳稳的没问题。上线第一天,实际流量峰值才200并发,结果响应时间飙到了30秒,一半请求超时。
事后复盘,找到了问题所在:我们的压测用的是均匀流量——500个并发请求同时开始,每个请求内容大同小异,Prompt长度都差不多。但真实用户的请求完全不是这样:有的Prompt超长(用户粘贴了一整篇文档),有的超短("帮我总结一下"五个字);有时候流量是脉冲式涌入(上午9点的早高峰),有时候是平稳的细水长流;有的用户在同一会话里连续发了20条消息,有的发完一条就走了。
这些"真实流量特征"在传统压测里根本不考虑,但在AI应用里,它们会让系统以完全不同的方式崩溃。
今天就来讲,怎么做一次真正有效的AI应用压测。
AI应用压测的特殊性
先说清楚AI应用和普通Web应用在压测上的本质区别。
第一,请求的"重量"差异极大。 普通Web接口,请求体可能就几十字节,响应也差不多。但AI应用的Prompt可能从100 Token到10000 Token,响应从50 Token到4000 Token,这种差异会导致LLM推理时间差几十倍。如果你的压测用的是均匀大小的请求,你测的只是一个虚假的"平均场景",既不能反映轻流量下的表现,也不能反映重流量下的崩溃点。
第二,LLM调用有固有的随机性。 同一个Prompt,LLM的响应时间每次都不一样,分布可能很宽。这对压测的统计分析提出了要求:你需要足够的样本量才能得到稳定的P95/P99数字。
第三,Token限流是压测的隐藏边界。 LLM API的限流不只是"每秒多少请求(RPS)",还有"每分钟多少Token(TPM)"。如果你的压测只盯着RPS,可能压测通过了,但生产环境里因为某次流量带了大量长请求,把TPM打满了,然后一切都开始排队。
第四,流式响应影响连接占用时间。 开了Streaming的接口,一个请求会占用一个HTTP连接几秒到几十秒,这对连接池的压力远大于一个50ms就返回的普通接口。
真实流量分布建模
在写压测代码之前,先要分析生产流量,建立流量模型。
@Service
public class TrafficProfileAnalyzer {
private final ProductionLogRepository logRepo;
/**
* 分析历史流量,构建流量画像
* 这是压测的基础,要先搞清楚"真实流量长什么样"
*/
public TrafficProfile analyzeTrafficProfile(LocalDateTime from, LocalDateTime to) {
List<RequestLog> logs = logRepo.findByTimeRange(from, to);
// 1. Prompt Token分布
List<Integer> promptTokens = logs.stream()
.map(RequestLog::getPromptTokens)
.collect(Collectors.toList());
TokenDistribution promptDist = analyzeDistribution(promptTokens);
// 2. 响应Token分布
List<Integer> completionTokens = logs.stream()
.map(RequestLog::getCompletionTokens)
.collect(Collectors.toList());
TokenDistribution completionDist = analyzeDistribution(completionTokens);
// 3. 并发模式(按小时分析)
Map<Integer, Long> concurrencyByHour = logs.stream()
.collect(Collectors.groupingBy(
l -> l.getTimestamp().getHour(),
Collectors.counting()
));
// 4. 会话长度分布(用户在一个会话里发了几条消息)
Map<String, Long> sessionMessageCounts = logs.stream()
.collect(Collectors.groupingBy(
RequestLog::getSessionId,
Collectors.counting()
));
List<Long> sessionLengths = new ArrayList<>(sessionMessageCounts.values());
// 5. 流量突发模式:找出峰值倍数
long avgRps = logs.size() / ((to.toEpochSecond(ZoneOffset.UTC) -
from.toEpochSecond(ZoneOffset.UTC)));
long peakRps = findPeakRps(logs);
double burstRatio = (double) peakRps / avgRps;
return TrafficProfile.builder()
.promptTokenDistribution(promptDist)
.completionTokenDistribution(completionDist)
.concurrencyByHour(concurrencyByHour)
.sessionLengthDistribution(analyzeDistribution(
sessionLengths.stream().map(Long::intValue).collect(Collectors.toList())))
.avgRps(avgRps)
.peakRps(peakRps)
.burstRatio(burstRatio)
.build();
}
private TokenDistribution analyzeDistribution(List<Integer> values) {
if (values.isEmpty()) return TokenDistribution.empty();
List<Integer> sorted = new ArrayList<>(values);
Collections.sort(sorted);
int size = sorted.size();
return TokenDistribution.builder()
.min(sorted.get(0))
.p25(sorted.get(size / 4))
.p50(sorted.get(size / 2))
.p75(sorted.get(size * 3 / 4))
.p90(sorted.get((int)(size * 0.9)))
.p95(sorted.get((int)(size * 0.95)))
.p99(sorted.get((int)(size * 0.99)))
.max(sorted.get(size - 1))
.mean(values.stream().mapToInt(i -> i).average().orElse(0))
.stdDev(calculateStdDev(values))
.build();
}
}拿到流量画像之后,就可以构建压测场景了。核心原则是:压测数据要反映真实分布,而不是用你方便构造的假数据。
压测数据集的生成
@Service
public class LoadTestDataGenerator {
private final TrafficProfile trafficProfile;
private final Random random = new SecureRandom();
/**
* 根据真实流量分布生成压测请求
*/
public List<LoadTestRequest> generateRequests(int count) {
List<LoadTestRequest> requests = new ArrayList<>();
for (int i = 0; i < count; i++) {
// 按真实分布采样Prompt长度
int promptTokenTarget = sampleFromDistribution(
trafficProfile.getPromptTokenDistribution());
// 生成对应长度的Prompt
String prompt = generatePromptWithTokenCount(promptTokenTarget);
// 是否是多轮对话的continuation
boolean isMultiTurn = random.nextDouble() < getMultiTurnProbability();
List<ChatMessage> history = isMultiTurn ?
generateConversationHistory() : Collections.emptyList();
requests.add(LoadTestRequest.builder()
.requestId(UUID.randomUUID().toString())
.prompt(prompt)
.conversationHistory(history)
.estimatedPromptTokens(promptTokenTarget)
.scenario(classifyScenario(promptTokenTarget, isMultiTurn))
.build());
}
return requests;
}
/**
* 从分布中按概率采样
* 使用逆CDF方法,保证采样结果符合真实分布
*/
private int sampleFromDistribution(TokenDistribution dist) {
double p = random.nextDouble();
if (p < 0.25) {
// P0-P25:均匀分布在min到p25之间
return dist.getMin() + (int)(random.nextDouble() * (dist.getP25() - dist.getMin()));
} else if (p < 0.50) {
return dist.getP25() + (int)(random.nextDouble() * (dist.getP50() - dist.getP25()));
} else if (p < 0.75) {
return dist.getP50() + (int)(random.nextDouble() * (dist.getP75() - dist.getP50()));
} else if (p < 0.90) {
return dist.getP75() + (int)(random.nextDouble() * (dist.getP90() - dist.getP75()));
} else if (p < 0.95) {
return dist.getP90() + (int)(random.nextDouble() * (dist.getP95() - dist.getP90()));
} else if (p < 0.99) {
return dist.getP95() + (int)(random.nextDouble() * (dist.getP99() - dist.getP95()));
} else {
// P99以上:少量超长请求
return dist.getP99() + (int)(random.nextDouble() * (dist.getMax() - dist.getP99()));
}
}
/**
* 生成接近指定Token数的Prompt
* 实际项目里应该用真实业务场景的脱敏数据
*/
private String generatePromptWithTokenCount(int targetTokens) {
// 选择对应的场景模板
PromptTemplate template = selectTemplate(targetTokens);
return template.generate(targetTokens);
}
private double getMultiTurnProbability() {
// 从历史数据中得出的多轮对话比例
long multiTurnSessions = trafficProfile.getSessionLengthDistribution().getP75() > 1 ?
1 : 0;
// 假设45%的会话是多轮
return 0.45;
}
}并发模式模拟
真实流量不是均匀分布的,有早晚高峰,有突发流量。要在压测里模拟这些。
@Service
public class ConcurrencyPatternSimulator {
/**
* 模拟工作日的典型流量曲线
* 9-10点:早高峰
* 12-13点:午休小高峰
* 14-18点:工作时段稳定流量
* 20-22点:晚间高峰
*/
public ConcurrencySchedule buildWorkdaySchedule(int baseConcurrency) {
List<ConcurrencyStage> stages = new ArrayList<>();
// 预热阶段
stages.add(ConcurrencyStage.rampUp(
Duration.ofMinutes(5),
0,
(int)(baseConcurrency * 0.3)));
// 平稳阶段
stages.add(ConcurrencyStage.steady(
Duration.ofMinutes(10),
(int)(baseConcurrency * 0.3)));
// 模拟早高峰突发
stages.add(ConcurrencyStage.rampUp(
Duration.ofMinutes(2),
(int)(baseConcurrency * 0.3),
(int)(baseConcurrency * 1.5))); // 1.5倍的突发
stages.add(ConcurrencyStage.steady(
Duration.ofMinutes(15),
(int)(baseConcurrency * 1.5)));
// 高峰后回落
stages.add(ConcurrencyStage.rampDown(
Duration.ofMinutes(5),
(int)(baseConcurrency * 1.5),
baseConcurrency));
// 稳定工作时段
stages.add(ConcurrencyStage.steady(
Duration.ofMinutes(20),
baseConcurrency));
return new ConcurrencySchedule(stages);
}
/**
* 模拟瞬时流量突发(比如某个活动推送触发的流量尖刺)
*/
public ConcurrencySchedule buildBurstSchedule(int baseConcurrency,
double burstMultiplier) {
List<ConcurrencyStage> stages = new ArrayList<>();
// 正常流量
stages.add(ConcurrencyStage.steady(Duration.ofMinutes(5), baseConcurrency));
// 极速上升(30秒内达到峰值)
stages.add(ConcurrencyStage.rampUp(
Duration.ofSeconds(30),
baseConcurrency,
(int)(baseConcurrency * burstMultiplier)));
// 短暂峰值
stages.add(ConcurrencyStage.steady(
Duration.ofMinutes(3),
(int)(baseConcurrency * burstMultiplier)));
// 缓慢回落
stages.add(ConcurrencyStage.rampDown(
Duration.ofMinutes(10),
(int)(baseConcurrency * burstMultiplier),
baseConcurrency));
return new ConcurrencySchedule(stages);
}
}压测执行引擎
@Service
public class AILoadTestRunner {
private final AIServiceClient aiClient;
private final LoadTestMetricsCollector metricsCollector;
private final LoadTestDataGenerator dataGenerator;
public LoadTestReport runTest(LoadTestConfig config) throws Exception {
log.info("开始AI压测: {}", config.getName());
// 生成压测数据
List<LoadTestRequest> testData = dataGenerator.generateRequests(
config.getRequestCount());
ConcurrencySchedule schedule = config.getConcurrencyPattern();
// 执行各阶段
List<StageResult> stageResults = new ArrayList<>();
for (ConcurrencyStage stage : schedule.getStages()) {
log.info("执行阶段: type={}, duration={}s, concurrency={}",
stage.getType(), stage.getDuration().getSeconds(),
stage.getTargetConcurrency());
StageResult result = executeStage(stage, testData, config);
stageResults.add(result);
// 检查是否需要提前终止(错误率过高)
if (result.getErrorRate() > config.getAbortThreshold()) {
log.error("错误率超过阈值{},终止压测", config.getAbortThreshold());
break;
}
}
return buildReport(config, stageResults);
}
private StageResult executeStage(ConcurrencyStage stage,
List<LoadTestRequest> testData,
LoadTestConfig config) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(
stage.getTargetConcurrency() * 2); // 留一些余量
Semaphore concurrencyLimit = new Semaphore(stage.getTargetConcurrency());
List<RequestResult> results = Collections.synchronizedList(new ArrayList<>());
long stageEnd = System.currentTimeMillis() + stage.getDuration().toMillis();
AtomicInteger requestIndex = new AtomicInteger(0);
List<Future<?>> futures = new ArrayList<>();
while (System.currentTimeMillis() < stageEnd) {
// 获取并发许可
if (!concurrencyLimit.tryAcquire(100, TimeUnit.MILLISECONDS)) {
continue; // 并发已满,等待
}
int idx = requestIndex.getAndIncrement() % testData.size();
LoadTestRequest request = testData.get(idx);
futures.add(pool.submit(() -> {
try {
RequestResult result = executeRequest(request, config);
results.add(result);
metricsCollector.record(result);
} finally {
concurrencyLimit.release();
}
}));
}
// 等待所有正在飞行的请求完成
pool.shutdown();
pool.awaitTermination(60, TimeUnit.SECONDS);
return aggregateResults(stage, results);
}
private RequestResult executeRequest(LoadTestRequest request,
LoadTestConfig config) {
long startTime = System.currentTimeMillis();
long firstTokenTime = -1;
try {
if (config.isStreamingMode()) {
// 流式模式:记录首token时间(TTFT)
StringBuilder fullResponse = new StringBuilder();
aiClient.streamComplete(
request.getPrompt(),
request.getConversationHistory(),
chunk -> {
if (firstTokenTime == -1) {
// 记录首token延迟
}
fullResponse.append(chunk);
}
);
long endTime = System.currentTimeMillis();
return RequestResult.success(
request.getRequestId(),
startTime, endTime,
firstTokenTime - startTime,
fullResponse.toString().length(),
request.getScenario()
);
} else {
String response = aiClient.complete(
request.getPrompt(),
request.getConversationHistory()
);
long endTime = System.currentTimeMillis();
return RequestResult.success(
request.getRequestId(),
startTime, endTime,
-1, // 非流式没有TTFT
response.length(),
request.getScenario()
);
}
} catch (RateLimitException e) {
return RequestResult.rateLimited(request.getRequestId(),
System.currentTimeMillis() - startTime);
} catch (TimeoutException e) {
return RequestResult.timeout(request.getRequestId(), config.getTimeoutMs());
} catch (Exception e) {
return RequestResult.error(request.getRequestId(), e.getMessage(),
System.currentTimeMillis() - startTime);
}
}
}关键指标的收集与分析
AI应用的压测指标比普通Web应用多:
@Component
public class LoadTestMetricsCollector {
// 按场景分别统计(长Prompt和短Prompt的延迟差别很大,混在一起没意义)
private final Map<RequestScenario, List<Long>> latencyByScenario =
new ConcurrentHashMap<>();
private final Map<RequestScenario, List<Long>> ttftByScenario =
new ConcurrentHashMap<>();
private final AtomicLong totalRequests = new AtomicLong(0);
private final AtomicLong successRequests = new AtomicLong(0);
private final AtomicLong timeoutRequests = new AtomicLong(0);
private final AtomicLong rateLimitedRequests = new AtomicLong(0);
private final AtomicLong totalPromptTokens = new AtomicLong(0);
private final AtomicLong totalCompletionTokens = new AtomicLong(0);
public void record(RequestResult result) {
totalRequests.incrementAndGet();
switch (result.getStatus()) {
case SUCCESS -> {
successRequests.incrementAndGet();
latencyByScenario
.computeIfAbsent(result.getScenario(), k ->
Collections.synchronizedList(new ArrayList<>()))
.add(result.getLatencyMs());
if (result.getTtftMs() > 0) {
ttftByScenario
.computeIfAbsent(result.getScenario(), k ->
Collections.synchronizedList(new ArrayList<>()))
.add(result.getTtftMs());
}
}
case TIMEOUT -> timeoutRequests.incrementAndGet();
case RATE_LIMITED -> rateLimitedRequests.incrementAndGet();
}
}
public MetricsSummary getSummary() {
Map<RequestScenario, LatencyStats> latencyStats = new HashMap<>();
for (Map.Entry<RequestScenario, List<Long>> entry :
latencyByScenario.entrySet()) {
latencyStats.put(entry.getKey(),
computeLatencyStats(entry.getValue()));
}
long total = totalRequests.get();
return MetricsSummary.builder()
.totalRequests(total)
.successRate(total > 0 ? (double)successRequests.get() / total : 0)
.timeoutRate(total > 0 ? (double)timeoutRequests.get() / total : 0)
.rateLimitRate(total > 0 ? (double)rateLimitedRequests.get() / total : 0)
.latencyByScenario(latencyStats)
.ttftByScenario(computeTtftStats())
.avgTokensPerRequest(total > 0 ?
(totalPromptTokens.get() + totalCompletionTokens.get()) / total : 0)
.build();
}
private LatencyStats computeLatencyStats(List<Long> latencies) {
if (latencies.isEmpty()) return LatencyStats.empty();
List<Long> sorted = new ArrayList<>(latencies);
Collections.sort(sorted);
int size = sorted.size();
return LatencyStats.builder()
.p50(sorted.get(size / 2))
.p75(sorted.get(size * 3 / 4))
.p90(sorted.get((int)(size * 0.9)))
.p95(sorted.get((int)(size * 0.95)))
.p99(sorted.get((int)(size * 0.99)))
.max(sorted.get(size - 1))
.avg(latencies.stream().mapToLong(l -> l).average().orElse(0))
.build();
}
}压测报告的解读
压测完了,怎么看报告?几个关键判断标准:
P99延迟比P50更重要。 P50延迟是大多数用户的体验,P99是倒霉用户的体验。AI应用的P99/P50比值通常比普通接口大得多,因为LLM对长Prompt的处理时间可以是短Prompt的十几倍。
区分TTFT和总延迟。 对流式接口,用户感知的延迟主要是"第一个字出来要多久"(TTFT),而不是"全部内容出来要多久"。优化TTFT比优化总延迟对用户体验改善更明显。
关注限流比率。 如果有10%以上的请求被限流(HTTP 429),说明你的系统已经进入了"被流控"的状态,需要加排队机制或者申请更高的限流额度。
我踩过的坑
坑1:忘记考虑流式响应的连接占用
压测时没有开Streaming,所有请求都是同步等待整个响应。实际上生产环境开了Streaming,一个请求会占用连接15秒。上线后连接池很快耗尽,大量请求排队。后来专门加了Streaming模式的压测场景。
坑2:压测数据被LLM缓存命中
用固定的测试Prompt做压测,发现延迟异常低,以为系统性能很好。后来发现是LLM API层有缓存,完全相同的Prompt会直接返回缓存结果,根本没走推理。生产环境每个用户的问题都不一样,不存在这种缓存。所以压测数据要保证足够的多样性,避免触发缓存。
坑3:多轮对话的上下文累积被忽略
多轮对话越谈越长,第10轮的Prompt里包含了前9轮的历史,Token数可能是第1轮的5-10倍。我们最初只用单轮请求压测,结果上线后多轮对话场景下延迟远超预期。后来专门设计了"长会话压测场景",模拟用户连续20轮对话的情况。
AI应用的压测没有捷径,只有认真分析真实流量分布,构建贴近真实的测试场景,才能找到真正的性能边界。
你跑过的那些"500并发稳定"的压测数据,如果用的是均匀的假数据,大概率是在骗自己。
