第2477篇:AI系统的全链路压测——模拟真实AI负载的压测设计
2026/4/30大约 5 分钟
第2477篇:AI系统的全链路压测——模拟真实AI负载的压测设计
适读人群:性能工程师、后端工程师、SRE | 阅读时长:约16分钟 | 核心价值:针对AI系统特有的流式响应、Token消耗、模型延迟等特征设计专项压测方案
前几篇写了AI系统的压测,有读者问:既然普通HTTP压测的思路用在AI系统上有问题,那应该怎么设计AI系统的压测?
这篇专门讲这个。
我们有一套RAG知识库系统,在上线前做了传统方式的压测:用JMeter模拟100并发,跑了30分钟,P99延迟680ms,通过了。上线后,真实用户使用时,有时候一个问题要等15秒才出来第一个字。
后来分析,发现问题出在我们的压测没有模拟流式响应的TTFT(Time to First Token),也没有模拟真实的prompt多样性——JMeter里发的都是短prompt,而真实用户的问题长得多,触发了更多的RAG检索,LLM的输入token量更大。
AI系统压测的五个特殊性
1. TTFT是核心用户体验指标
/**
* 对于流式AI接口,需要单独测量TTFT
* 而不是总响应时间
*/
public class StreamingMetricsCollector {
public StreamingRequestMetrics measureStreamingRequest(
String endpoint,
String requestBody) throws Exception {
long requestStart = System.currentTimeMillis();
long firstTokenTime = -1;
long lastTokenTime = -1;
int totalTokens = 0;
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("Content-Type", "application/json")
.header("Accept", "text/event-stream")
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();
// 流式接收响应
HttpResponse<InputStream> response = client.send(
request, HttpResponse.BodyHandlers.ofInputStream()
);
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(response.body()))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("data: ") && !line.equals("data: [DONE]")) {
String data = line.substring(6);
// 解析SSE数据,检查是否有token
if (containsToken(data)) {
if (firstTokenTime == -1) {
firstTokenTime = System.currentTimeMillis();
}
lastTokenTime = System.currentTimeMillis();
totalTokens += countTokens(data);
}
}
}
}
long requestEnd = System.currentTimeMillis();
return StreamingRequestMetrics.builder()
.ttft(firstTokenTime > 0 ? firstTokenTime - requestStart : -1)
.totalTime(requestEnd - requestStart)
.generationTime(lastTokenTime > 0 ? lastTokenTime - firstTokenTime : -1)
.totalTokens(totalTokens)
.throughputTokensPerSecond(
lastTokenTime > firstTokenTime ?
totalTokens * 1000.0 / (lastTokenTime - firstTokenTime) : 0
)
.statusCode(response.statusCode())
.build();
}
}2. 多样化的真实prompt数据集
@Component
public class PromptDatasetGenerator {
private final ChatClient chatClient;
/**
* 生成多样化的压测prompt,模拟真实用户行为分布
* 真实用户的问题长度、复杂度、主题都是多样化的
*/
public PromptDataset generateDataset(
String domain,
int totalPrompts,
PromptDistribution distribution) {
List<String> prompts = new ArrayList<>();
// 按照分布生成不同类型的prompt
int shortCount = (int)(totalPrompts * distribution.getShortRatio());
int mediumCount = (int)(totalPrompts * distribution.getMediumRatio());
int longCount = totalPrompts - shortCount - mediumCount;
prompts.addAll(generatePromptsOfType(domain, "短问题(10-30字)", shortCount));
prompts.addAll(generatePromptsOfType(domain, "中等问题(50-100字,带上下文)", mediumCount));
prompts.addAll(generatePromptsOfType(domain, "长问题(200字以上,包含详细背景)", longCount));
// 随机打乱
Collections.shuffle(prompts);
return PromptDataset.of(prompts, computeDatasetStats(prompts));
}
private List<String> generatePromptsOfType(String domain, String type, int count) {
String generationPrompt = """
为以下领域生成%d个真实的用户问题:
领域: %s
问题类型: %s
要求:
1. 问题要真实,模拟真实用户会问的内容
2. 问题之间要有多样性,不要重复
3. 用中文
直接返回问题列表,每行一个问题,不要编号。
""".formatted(count, domain, type);
String response = chatClient.call(new Prompt(new UserMessage(generationPrompt)))
.getResult().getOutput().getContent();
return Arrays.stream(response.split("\n"))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(toList());
}
}3. Token消耗建模
@Service
public class TokenConsumptionModeler {
/**
* 统计历史请求的token消耗分布
* 用于在压测中模拟真实的token消耗模式
*/
public TokenConsumptionModel buildModel(List<HistoricalRequest> historicalRequests) {
IntSummaryStatistics inputStats = historicalRequests.stream()
.mapToInt(r -> r.getInputTokens())
.summaryStatistics();
IntSummaryStatistics outputStats = historicalRequests.stream()
.mapToInt(r -> r.getOutputTokens())
.summaryStatistics();
// 构建token消耗的分布模型(正态分布近似)
double inputMean = inputStats.getAverage();
double inputStdDev = computeStdDev(
historicalRequests.stream().mapToInt(r -> r.getInputTokens()).boxed().collect(toList()),
inputMean
);
double outputMean = outputStats.getAverage();
double outputStdDev = computeStdDev(
historicalRequests.stream().mapToInt(r -> r.getOutputTokens()).boxed().collect(toList()),
outputMean
);
return TokenConsumptionModel.builder()
.inputTokenMean(inputMean)
.inputTokenStdDev(inputStdDev)
.outputTokenMean(outputMean)
.outputTokenStdDev(outputStdDev)
.p50InputTokens(computePercentile(historicalRequests, r -> r.getInputTokens(), 50))
.p95InputTokens(computePercentile(historicalRequests, r -> r.getInputTokens(), 95))
.p99InputTokens(computePercentile(historicalRequests, r -> r.getInputTokens(), 99))
.build();
}
}4. 压测执行框架
@Service
public class AILoadTestRunner {
private final StreamingMetricsCollector metricsCollector;
private final PromptDataset promptDataset;
public LoadTestResult runLoadTest(LoadTestConfig config) {
ExecutorService executor = Executors.newFixedThreadPool(config.getConcurrency());
List<Future<StreamingRequestMetrics>> futures = new ArrayList<>();
AtomicInteger requestCount = new AtomicInteger(0);
long testStart = System.currentTimeMillis();
long testDuration = config.getDuration().toMillis();
// 持续发送请求直到达到时间限制
while (System.currentTimeMillis() - testStart < testDuration) {
// 控制发送速率
int targetRPS = config.getTargetRPS();
long sleepInterval = 1000L / targetRPS;
String prompt = promptDataset.getNext(); // 循环取多样化prompt
String requestBody = buildRequestBody(prompt, config);
Future<StreamingRequestMetrics> future = executor.submit(() ->
metricsCollector.measureStreamingRequest(config.getEndpoint(), requestBody)
);
futures.add(future);
requestCount.incrementAndGet();
try { Thread.sleep(sleepInterval); } catch (InterruptedException e) { break; }
}
// 等待所有请求完成
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
// 收集结果
List<StreamingRequestMetrics> results = futures.stream()
.map(f -> { try { return f.get(); } catch (Exception e) { return null; } })
.filter(Objects::nonNull)
.collect(toList());
return analyzeResults(results, config);
}
private LoadTestResult analyzeResults(
List<StreamingRequestMetrics> results,
LoadTestConfig config) {
// TTFT分布
LongSummaryStatistics ttftStats = results.stream()
.filter(r -> r.getTtft() > 0)
.mapToLong(StreamingRequestMetrics::getTtft)
.summaryStatistics();
// 总响应时间分布
LongSummaryStatistics totalTimeStats = results.stream()
.mapToLong(StreamingRequestMetrics::getTotalTime)
.summaryStatistics();
// Token吞吐量
double totalTokens = results.stream()
.mapToInt(StreamingRequestMetrics::getTotalTokens)
.sum();
double testDurationSeconds = config.getDuration().toSeconds();
double tokenThroughput = totalTokens / testDurationSeconds;
// 错误率
long errorCount = results.stream()
.filter(r -> r.getStatusCode() >= 400)
.count();
double errorRate = (double) errorCount / results.size();
return LoadTestResult.builder()
.totalRequests(results.size())
.errorRate(errorRate)
.ttftP50(computePercentile(results, StreamingRequestMetrics::getTtft, 50))
.ttftP95(computePercentile(results, StreamingRequestMetrics::getTtft, 95))
.ttftP99(computePercentile(results, StreamingRequestMetrics::getTtft, 99))
.totalTimeP50(computePercentile(results, StreamingRequestMetrics::getTotalTime, 50))
.totalTimeP99(computePercentile(results, StreamingRequestMetrics::getTotalTime, 99))
.tokenThroughput(tokenThroughput)
.successRate(1 - errorRate)
.build();
}
}压测结果的AI辅助分析
压测跑完之后,让LLM来帮助解读结果:
@Service
public class LoadTestResultAnalyzer {
private final ChatClient chatClient;
public String analyzeResults(LoadTestResult result, LoadTestConfig config) {
String prompt = """
分析以下AI系统压测结果:
压测配置:
- 目标并发: %d
- 目标QPS: %d
- 持续时长: %s
压测结果:
- 实际QPS: %.1f
- 错误率: %.2f%%
- TTFT P50: %dms, P95: %dms, P99: %dms
- 总响应时间 P50: %dms, P99: %dms
- Token吞吐量: %.0f tokens/s
请分析:
1. 系统是否达到了预期的性能目标?
2. TTFT的表现是否符合用户体验要求(通常要求P95 < 2000ms)?
3. 在这个负载下,系统是否处于正常工作状态,还是已经过载?
4. 如果系统没有达到预期,可能的瓶颈在哪里?
5. 系统能支撑多少并发用户?
""".formatted(
config.getConcurrency(), config.getTargetRPS(), config.getDuration(),
result.getActualQPS(), result.getErrorRate() * 100,
result.getTtftP50(), result.getTtftP95(), result.getTtftP99(),
result.getTotalTimeP50(), result.getTotalTimeP99(),
result.getTokenThroughput()
);
return chatClient.call(new Prompt(new UserMessage(prompt)))
.getResult().getOutput().getContent();
}
}压测的实际经验
对于RAG系统,要分别测向量检索延迟和LLM推理延迟。这两部分的延迟来源不同,优化手段也不同,混在一起测分析不出问题所在。
高并发下的模型限流是主要瓶颈。大多数LLM API都有RPM(请求/分钟)和TPM(Token/分钟)的限制。压测时往往不是服务器撑不住,而是LLM API先限流了。要在压测前确认API的限制,在压测配置里考虑这个上限。
压测环境和生产环境的差异要量化。压测环境的GPU配置、模型部署方式可能和生产不同,要记录下来,在分析结果时考虑这个差异系数。
