AI 系统的压测——和普通 HTTP 压测有哪些本质不同
AI 系统的压测——和普通 HTTP 压测有哪些本质不同
有一次我们的 AI 知识库服务要做上线前压测,负责这件事的同事用 JMeter 跑了一组压测,结论是:"100 并发下 P99 延迟 800ms,没问题,可以上线。"
我看了一眼他的 JMeter 配置,发现一个问题:他压的是 /health 接口。
不是开玩笑。他配错了 URL,压的是健康检查接口。
这件事当然是个乌龙,但它引出了一个真实的问题:很多工程师做 AI 系统压测的时候,用的是压普通 HTTP 服务的思路,压出来的结论要么没意义,要么明显错误。
这篇文章把 AI 压测和普通 HTTP 压测的区别讲清楚,以及怎么设计有意义的 AI 压测。
普通 HTTP 压测 vs AI 系统压测:五个本质差异
差异一:响应时间的含义不同
普通 HTTP 接口:请求进来,数据库查一下,几十毫秒返回。响应时间是一个数字,意义明确。
AI 流式接口:请求进来,2 秒后返回第一个 token,然后以 40 token/s 的速度持续输出,20 秒后输出完成。
这里有两个完全不同的指标:
- TTFT(Time to First Token):从发出请求到收到第一个 token 的时间。这代表用户感知到的"响应速度",是体验指标。
- TBT(Time Between Tokens):token 之间的间隔,决定输出是否流畅。
- 总完成时间:整个响应完成的时间,是吞吐量指标。
如果你用普通 HTTP 压测工具,测的是"总完成时间",但这个数字对用户体验的参考价值很有限——用户感知的快慢主要看 TTFT。
差异二:Token 消耗带来额外的资源约束
普通服务压测:并发越高,TPS 越高,直到到达系统上限。
AI 系统压测:高并发下,每秒处理的 token 数(Throughput,单位 token/s)更容易成为瓶颈,而不是请求数。
举个例子:你的 AI 服务最大吞吐量是 10000 token/s。
- 如果每个请求平均消耗 500 token,那极限是 20 QPS
- 如果每个请求平均消耗 2000 token,那极限是 5 QPS
用 QPS 来表征 AI 系统的容量是不够的,必须同时给出 Token/s 和平均 Token 消耗。
差异三:测试数据必须是真实的多样化数据
普通 HTTP 压测:可以用一个固定的请求体反复发送。
AI 系统压测:必须用多样化的、真实的 prompt。原因:
- 不同的 prompt 消耗的 token 数差异极大(短问题 50 token,长文档处理可能 4000 token)
- 某些 prompt 可能触发模型的特殊逻辑(内容过滤、超长输出等)
- RAG 场景下,不同的查询会命中不同数量的文档 chunk,处理时间差异显著
用一个固定 prompt 压出来的 P99 延迟,可能完全无法代表真实生产流量下的 P99。
差异四:成本是压测的约束条件
普通服务压测:基础设施成本是固定的,压测只消耗自己的机器资源。
AI 系统压测:每个压测请求都在消耗 API 费用(或者 GPU 算力)。压测 1000 个并发跑 10 分钟,可能花掉几百美元。
压测方案里必须有成本控制策略:
- 用便宜的模型做压测(比如用 GPT-3.5 模拟 GPT-4 的负载特征)
- 限制压测的 max_tokens(强制截断,减少费用)
- 在特定时间段(低峰期)运行压测
差异五:流式响应需要专门的压测工具支持
大多数压测工具(JMeter、Locust、k6)默认不支持 SSE(Server-Sent Events)流式响应的正确解析。你需要确保工具能正确:
- 接收 SSE 格式的流式数据
- 记录 TTFT(第一个 data: 行的时间)
- 记录总吞吐量(完整响应的总 token 数)
AI 系统压测指标体系
自定义压测脚本:用 Python 压测 AI 流式接口
JMeter 做 AI 流式压测比较麻烦,我通常用 Python + asyncio 写自定义压测脚本:
import asyncio
import aiohttp
import time
import json
import statistics
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime
@dataclass
class RequestMetrics:
request_id: str
start_time: float
ttft: Optional[float] = None # Time to First Token
total_time: Optional[float] = None # 总完成时间
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
error: Optional[str] = None
status_code: Optional[int] = None
@dataclass
class LoadTestResult:
total_requests: int
successful_requests: int
failed_requests: int
# 延迟统计
ttft_p50: float
ttft_p95: float
ttft_p99: float
total_time_p50: float
total_time_p95: float
total_time_p99: float
# 吞吐量
qps: float
total_token_throughput: float # token/s
# 成本(估算)
estimated_cost_usd: float
class AILoadTester:
def __init__(self, base_url: str, api_key: str, model: str = "gpt-3.5-turbo"):
self.base_url = base_url
self.api_key = api_key
self.model = model
self.metrics: List[RequestMetrics] = []
async def send_streaming_request(
self,
session: aiohttp.ClientSession,
prompt: str,
request_id: str,
max_tokens: int = 500 # 压测时限制输出长度,控制成本
) -> RequestMetrics:
metric = RequestMetrics(
request_id=request_id,
start_time=time.time()
)
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"stream": True,
"stream_options": {"include_usage": True} # 包含 token 使用量
}
try:
async with session.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=60)
) as response:
metric.status_code = response.status
if response.status != 200:
metric.error = f"HTTP {response.status}"
return metric
first_token_received = False
async for line in response.content:
line = line.decode('utf-8').strip()
if not line.startswith("data: "):
continue
data_str = line[6:] # 去掉 "data: " 前缀
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
# 记录 TTFT(第一个有内容的 token)
if not first_token_received:
choices = chunk.get("choices", [])
if choices and choices[0].get("delta", {}).get("content"):
metric.ttft = time.time() - metric.start_time
first_token_received = True
# 提取 token 使用量(stream_options 会在最后一个 chunk 里返回)
if "usage" in chunk and chunk["usage"]:
usage = chunk["usage"]
metric.prompt_tokens = usage.get("prompt_tokens", 0)
metric.completion_tokens = usage.get("completion_tokens", 0)
metric.total_tokens = usage.get("total_tokens", 0)
metric.total_time = time.time() - metric.start_time
except asyncio.TimeoutError:
metric.error = "timeout"
metric.total_time = time.time() - metric.start_time
except Exception as e:
metric.error = str(e)
metric.total_time = time.time() - metric.start_time
return metric
async def run_load_test(
self,
prompts: List[str],
concurrency: int,
duration_seconds: int,
max_tokens_per_request: int = 500
) -> LoadTestResult:
start_time = time.time()
request_counter = 0
all_metrics: List[RequestMetrics] = []
# 信号量控制并发数
semaphore = asyncio.Semaphore(concurrency)
async def bounded_request(prompt: str) -> RequestMetrics:
async with semaphore:
nonlocal request_counter
request_counter += 1
req_id = f"req-{request_counter}-{int(time.time() * 1000)}"
return await self.send_streaming_request(
session, prompt, req_id, max_tokens_per_request
)
connector = aiohttp.TCPConnector(limit=concurrency * 2)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
prompt_idx = 0
while time.time() - start_time < duration_seconds:
# 循环使用 prompts
prompt = prompts[prompt_idx % len(prompts)]
prompt_idx += 1
task = asyncio.create_task(bounded_request(prompt))
tasks.append(task)
# 控制任务提交速率,避免瞬间堆积太多任务
if len(tasks) >= concurrency * 3:
# 等待一部分完成
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for t in done:
all_metrics.append(t.result())
tasks = list(pending)
await asyncio.sleep(0.01) # 小间隔,避免 CPU 100%
# 等待所有剩余任务完成
if tasks:
remaining = await asyncio.gather(*tasks, return_exceptions=True)
for r in remaining:
if isinstance(r, RequestMetrics):
all_metrics.append(r)
return self._compute_result(all_metrics, duration_seconds)
def _compute_result(self, metrics: List[RequestMetrics], duration: float) -> LoadTestResult:
successful = [m for m in metrics if m.error is None and m.ttft is not None]
failed = [m for m in metrics if m.error is not None]
ttft_values = sorted([m.ttft for m in successful])
total_time_values = sorted([m.total_time for m in successful if m.total_time])
total_tokens = sum(m.total_tokens for m in successful)
def percentile(data, p):
if not data:
return 0.0
idx = int(len(data) * p / 100)
return data[min(idx, len(data) - 1)]
# 成本估算(GPT-3.5-turbo 价格)
prompt_tokens = sum(m.prompt_tokens for m in successful)
completion_tokens = sum(m.completion_tokens for m in successful)
estimated_cost = (prompt_tokens / 1_000_000 * 0.5 +
completion_tokens / 1_000_000 * 1.5) # 美元
return LoadTestResult(
total_requests=len(metrics),
successful_requests=len(successful),
failed_requests=len(failed),
ttft_p50=percentile(ttft_values, 50),
ttft_p95=percentile(ttft_values, 95),
ttft_p99=percentile(ttft_values, 99),
total_time_p50=percentile(total_time_values, 50),
total_time_p95=percentile(total_time_values, 95),
total_time_p99=percentile(total_time_values, 99),
qps=len(successful) / duration,
total_token_throughput=total_tokens / duration,
estimated_cost_usd=estimated_cost
)
# 使用示例
async def main():
tester = AILoadTester(
base_url="https://api.openai.com/v1",
api_key="your-api-key",
model="gpt-3.5-turbo" # 压测用便宜模型
)
# 压测用的 prompt 集合(必须是多样化的真实问题)
test_prompts = [
"什么是向量数据库?",
"解释一下 RAG 的工作原理",
"Java 和 Python 在 AI 开发中各有什么优势?",
"如何设计一个高可用的 AI 系统?",
"Transformer 架构的核心思想是什么?",
# ... 实际压测中应该有 100 个以上不同的 prompt
]
print("开始压测:50 并发,持续 120 秒")
result = await tester.run_load_test(
prompts=test_prompts,
concurrency=50,
duration_seconds=120,
max_tokens_per_request=300 # 限制输出,控制成本
)
print(f"\n=== 压测结果 ===")
print(f"总请求数: {result.total_requests}")
print(f"成功: {result.successful_requests} | 失败: {result.failed_requests}")
print(f"错误率: {result.failed_requests / result.total_requests * 100:.2f}%")
print(f"\nTTFT (Time to First Token):")
print(f" P50: {result.ttft_p50:.3f}s")
print(f" P95: {result.ttft_p95:.3f}s")
print(f" P99: {result.ttft_p99:.3f}s")
print(f"\n总完成时间:")
print(f" P50: {result.total_time_p50:.3f}s")
print(f" P95: {result.total_time_p95:.3f}s")
print(f" P99: {result.total_time_p99:.3f}s")
print(f"\n吞吐量:")
print(f" QPS: {result.qps:.2f}")
print(f" Token/s: {result.total_token_throughput:.0f}")
print(f"\n本次压测成本: ${result.estimated_cost_usd:.4f}")
asyncio.run(main())压测场景设计:不只是并发拉满
好的压测不是单纯把并发拉到最高,而是模拟真实的使用场景:
场景一:正常负载
- 并发:日常高峰的 80%
- 持续时间:30 分钟
- 目标:验证 P99 TTFT < 2s,错误率 < 0.1%
场景二:流量洪峰
- 并发:正常的 5 倍,持续 2 分钟
- 目标:验证系统不崩溃,有限流降级,不影响已建立的连接
场景三:长时间稳定性
- 并发:60% 负载
- 持续时间:4 小时
- 目标:验证没有内存泄漏,JVM GC 稳定
场景四:混合 Token 长度
- 30% 短问答(< 200 token 输出)
- 50% 中等长度(200-1000 token 输出)
- 20% 长文档处理(> 2000 token 输出)
- 目标:验证在真实负载分布下的表现
几个压测中常见的误区
误区一:只看 P50,不看 P99 AI 系统的延迟分布通常是重尾的——P50 可能是 1 秒,但 P99 可能是 15 秒(因为有些请求需要长思考)。用 P50 代表系统表现是自欺欺人。
误区二:不模拟真实的 token 长度分布 如果你的测试 prompt 全是短问题,压出来的吞吐量比真实场景好看得多。真实的长文档处理请求会消耗 10 倍以上的 GPU 算力,必须在压测中体现出来。
误区三:忽略预热时间 JVM 有 JIT 预热,AI 服务也有模型加载、KV Cache 预热等效果。刚启动时的前几分钟数据通常比稳态好很多(因为 KV Cache 是空的)或者差很多(JVM 没预热)。压测数据要丢弃前 5 分钟的预热数据。
误区四:在生产环境做压测 这个不用多说,但确实有人这么干过。应该用和生产同配置的独立压测环境。
小结
AI 系统压测和普通 HTTP 压测的核心差异:
- 关注 TTFT 而不只是总完成时间
- 用 Token/s 衡量吞吐量,而不只是 QPS
- 必须用多样化的真实 prompt,不能用固定数据
- 压测本身有 API 费用成本,需要规划
- 流式响应需要专门的工具支持
做好这五点,你的压测结论才有真实的参考价值。
