第2301篇:大规模AI系统的压测——LLM服务的性能基准测试方法
第2301篇:大规模AI系统的压测——LLM服务的性能基准测试方法
适读人群:负责AI系统性能验证的工程师和QA工程师 | 阅读时长:约14分钟 | 核心价值:掌握LLM服务压测的特殊方法论,设计有效的基准测试方案
普通HTTP服务的压测我做过很多次,但第一次给AI服务做压测时,我发现很多经验用不上。
最大的差别是:普通服务的每个请求响应时间在毫秒级,可以轻松发几千QPS来观察系统行为。但LLM调用每次可能需要5-60秒,如果按同样的逻辑发请求,短时间内就会积累几千个未完成的连接,搞崩JMeter甚至测试机本身。
更麻烦的是,LLM服务的性能不只是"能处理多少QPS",还涉及TTFT、TBT(每token时间)、吞吐量(tokens/秒)等专门指标,这些在普通压测工具里没有内置支持。
LLM服务压测的核心指标
TTFT(Time To First Token):用户发出请求到看到第一个字符的时间。这是影响用户感知最重要的指标,直接影响"感觉快不快"。
TPOT(Time Per Output Token):每生成一个token需要的时间,越低越好。
Throughput(吞吐量):系统每秒能生成多少token,衡量整体处理能力。
并发用户数vs延迟曲线:随着并发增加,延迟如何变化?找到"膝盖点"——延迟开始急剧上升的并发量。
P50/P95/P99延迟:长尾延迟特别重要,因为慢请求会影响真实用户体验。
压测工具选择
Locust(Python):最常用的AI压测工具,支持异步HTTP,容易编写自定义的压测逻辑。
llmperf:专门针对LLM服务的开源压测工具,内置TTFT等指标。
k6:JavaScript编写压测脚本,支持流式响应测试。
对于Java项目,可以用JMeter结合自定义Sampler,或者用Gatling(Scala)。这里我用Java写一个自定义的压测框架:
@Component
public class LlmLoadTester {
private final WebClient webClient;
private final MetricsCollector metricsCollector;
/**
* 执行负载测试
* @param targetUrl AI服务地址
* @param testScenario 测试场景配置
*/
public LoadTestReport runLoadTest(String targetUrl, LoadTestScenario scenario)
throws InterruptedException {
log.info("开始压测: 并发={}, 持续时间={}s",
scenario.getConcurrency(), scenario.getDurationSeconds());
List<RequestMetrics> allMetrics = new CopyOnWriteArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(scenario.getConcurrency());
AtomicBoolean running = new AtomicBoolean(true);
AtomicInteger requestCount = new AtomicInteger(0);
// 启动并发用户
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < scenario.getConcurrency(); i++) {
final int userId = i;
futures.add(executor.submit(() -> {
while (running.get()) {
// 按用户思考时间控制请求速率
String prompt = scenario.getPromptGenerator().generate(userId);
RequestMetrics metrics = sendStreamingRequest(targetUrl, prompt);
allMetrics.add(metrics);
requestCount.incrementAndGet();
// 模拟用户思考时间
try {
Thread.sleep(scenario.getThinkTimeMs());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}));
}
// 运行指定时间后停止
Thread.sleep(scenario.getDurationSeconds() * 1000L);
running.set(false);
executor.shutdown();
executor.awaitTermination(120, TimeUnit.SECONDS);
return analyzeMetrics(allMetrics, scenario);
}
/**
* 发送流式请求并测量各阶段延迟
*/
private RequestMetrics sendStreamingRequest(String url, String prompt) {
RequestMetrics metrics = new RequestMetrics();
metrics.setPromptLength(estimateTokens(prompt));
metrics.setRequestStartTime(System.currentTimeMillis());
Map<String, Object> body = Map.of(
"messages", List.of(Map.of("role", "user", "content", prompt)),
"stream", true,
"max_tokens", 512
);
AtomicBoolean firstTokenReceived = new AtomicBoolean(false);
AtomicInteger tokenCount = new AtomicInteger(0);
AtomicLong lastTokenTime = new AtomicLong(System.currentTimeMillis());
try {
webClient.post()
.uri(url)
.bodyValue(body)
.retrieve()
.bodyToFlux(String.class)
.filter(line -> line.startsWith("data: ") && !line.contains("[DONE]"))
.doOnNext(line -> {
long now = System.currentTimeMillis();
if (!firstTokenReceived.getAndSet(true)) {
// 记录TTFT
metrics.setTtftMs(now - metrics.getRequestStartTime());
}
// 记录每个token的时间(TBT)
long timeSinceLastToken = now - lastTokenTime.get();
metrics.addTokenLatency(timeSinceLastToken);
lastTokenTime.set(now);
tokenCount.incrementAndGet();
})
.blockLast(Duration.ofMinutes(2));
metrics.setTotalTokens(tokenCount.get());
metrics.setTotalLatencyMs(System.currentTimeMillis() - metrics.getRequestStartTime());
metrics.setSuccess(true);
} catch (Exception e) {
metrics.setSuccess(false);
metrics.setErrorMessage(e.getMessage());
metrics.setTotalLatencyMs(System.currentTimeMillis() - metrics.getRequestStartTime());
}
return metrics;
}
/**
* 分析并生成报告
*/
private LoadTestReport analyzeMetrics(List<RequestMetrics> metrics, LoadTestScenario scenario) {
List<RequestMetrics> successful = metrics.stream()
.filter(RequestMetrics::isSuccess)
.collect(Collectors.toList());
if (successful.isEmpty()) {
return LoadTestReport.allFailed(metrics.size());
}
LongSummaryStatistics ttftStats = successful.stream()
.mapToLong(RequestMetrics::getTtftMs)
.summaryStatistics();
LongSummaryStatistics latencyStats = successful.stream()
.mapToLong(RequestMetrics::getTotalLatencyMs)
.summaryStatistics();
// 计算百分位数
long[] ttftValues = successful.stream()
.mapToLong(RequestMetrics::getTtftMs)
.sorted()
.toArray();
long totalTokens = successful.stream()
.mapToLong(RequestMetrics::getTotalTokens)
.sum();
double throughputTokensPerSecond = totalTokens / (scenario.getDurationSeconds() * 1.0);
return LoadTestReport.builder()
.totalRequests(metrics.size())
.successfulRequests(successful.size())
.errorRate((double)(metrics.size() - successful.size()) / metrics.size())
.ttftP50Ms(percentile(ttftValues, 50))
.ttftP95Ms(percentile(ttftValues, 95))
.ttftP99Ms(percentile(ttftValues, 99))
.avgTotalLatencyMs(latencyStats.getAverage())
.p99TotalLatencyMs(percentile(latencyStats, 99))
.throughputTokensPerSecond(throughputTokensPerSecond)
.concurrency(scenario.getConcurrency())
.build();
}
}压测场景设计
光测"能处理多少并发"不够,要设计有代表性的测试场景:
@Configuration
public class LoadTestScenarios {
/**
* 场景1:阶梯加压测试
* 从低并发开始,逐步加到高并发,找拐点
*/
public void runRampUpTest(LlmLoadTester tester, String url) throws Exception {
int[] concurrencyLevels = {1, 5, 10, 20, 30, 50};
for (int concurrency : concurrencyLevels) {
LoadTestScenario scenario = LoadTestScenario.builder()
.concurrency(concurrency)
.durationSeconds(60)
.thinkTimeMs(1000)
.promptGenerator(userId -> selectRepresentativePrompt(userId))
.build();
LoadTestReport report = tester.runLoadTest(url, scenario);
log.info("并发={}: TTFT P95={}ms, P99={}ms, 吞吐量={}tokens/s, 错误率={}%",
concurrency,
report.getTtftP95Ms(),
report.getTtftP99Ms(),
report.getThroughputTokensPerSecond(),
report.getErrorRate() * 100
);
// 如果P99超过10秒,停止加压(达到性能瓶颈)
if (report.getTtftP99Ms() > 10000) {
log.warn("并发{}时P99 TTFT超过10秒,停止加压测试", concurrency);
break;
}
Thread.sleep(30000); // 等30秒让系统恢复
}
}
/**
* 场景2:混合请求类型测试
* 模拟真实流量:短问答60% + 长文档分析30% + 代码生成10%
*/
public PromptGenerator buildMixedPromptGenerator() {
List<PromptTemplate> templates = List.of(
new PromptTemplate(0.6, "短问答",
() -> randomShortQuestion()),
new PromptTemplate(0.3, "长文档分析",
() -> "分析以下合同条款:" + generateLongDocument()),
new PromptTemplate(0.1, "代码生成",
() -> "写一个Java " + randomCodeTask())
);
return userId -> weightedRandomChoice(templates).getPrompt();
}
/**
* 场景3:峰值突刺测试
* 模拟大促等突发流量
*/
public void runSpikeTest(LlmLoadTester tester, String url) throws Exception {
// 正常负载5分钟
runAtConcurrency(tester, url, 10, 300);
// 突然10倍流量30秒
log.info("触发流量突刺!");
runAtConcurrency(tester, url, 100, 30);
// 恢复正常,观察是否能恢复
runAtConcurrency(tester, url, 10, 120);
}
}压测结果分析
压测数据要结合服务端指标一起分析:
@Service
public class LoadTestAnalyzer {
/**
* 分析压测报告,给出优化建议
*/
public AnalysisReport analyze(LoadTestReport report, SystemMetrics systemMetrics) {
List<String> findings = new ArrayList<>();
List<String> recommendations = new ArrayList<>();
// 检查TTFT
if (report.getTtftP95Ms() > 5000) {
findings.add("TTFT P95超过5秒,用户体验差");
recommendations.add("考虑启用流式输出以改善感知延迟");
recommendations.add("检查prompt token数量是否过多");
}
// 检查错误率
if (report.getErrorRate() > 0.01) {
findings.add("错误率" + report.getErrorRate() * 100 + "%,超过1%阈值");
if (systemMetrics.getAiApiRateLimitHitCount() > 0) {
recommendations.add("遭遇AI API速率限制,需要增加请求间隔或扩大API配额");
}
}
// 检查吞吐量瓶颈
if (systemMetrics.getCpuUtilization() > 0.8) {
findings.add("CPU利用率超过80%,计算资源不足");
recommendations.add("增加应用实例数量");
}
if (systemMetrics.getRedisLatencyP99Ms() > 50) {
findings.add("Redis P99延迟超过50ms,上下文加载成为瓶颈");
recommendations.add("检查Redis网络配置,考虑本地缓存热点对话上下文");
}
PerformanceGrade grade = calculateGrade(report);
return AnalysisReport.builder()
.grade(grade)
.findings(findings)
.recommendations(recommendations)
.maxSafeC oncurrency(findMaxSafeConcurrency(report))
.build();
}
}LLM服务的压测是一门新技术,但基本方法论和传统压测一致:找瓶颈、量化指标、迭代优化。特殊之处在于指标体系(TTFT/TBT/tokens-per-second)和流式请求的处理方式。建议每次重大变更前都跑一轮压测,把性能基准固化下来。
