RAG系统压测实战:如何验证你的知识库能扛住生产流量
RAG系统压测实战:如何验证你的知识库能扛住生产流量
上线第三天的噩梦
2026年1月,广州某零售集团的Java工程师小周,经历了职业生涯中最紧张的72小时。
公司内部知识库系统1月5日正式上线。第一天,50个用户试用,没有任何问题。第二天,公司全员推广,用户增长到400人,系统偶尔有些卡顿,但还算稳定。
第三天,一篇内部推广邮件出发了——IT部门在全司通讯里推荐了这个AI助手,并附上了几个"神奇的使用案例"。
邮件发出去2小时后,系统开始报错。
首先是嵌入API开始超时,然后Milvus查询延迟从80ms飙升到8000ms,最后Spring Boot应用线程池被打满,全线503。
小周疯狂看日志,才发现:一时间涌入了1000多个并发用户,每个用户连续发了好几条消息。系统在完全没有压测的情况下上线,根本没想到会有这种流量。
他不得不发公告:"系统暂时维护,两小时后恢复。"
两小时实际变成了一整个通宵。
凌晨5点,他终于把系统限流、队列、缓存全部加好,重新上线,勉强撑住了。但在那个通宵里,他反复想的一件事是:
"如果上线前做过压测,这一切都不会发生。"
后来他写了一份复盘报告,第一条经验教训就是:RAG系统上线前,必须做压测。不是可选项,是必选项。
本文就是他那份压测体系的完整实现。
先说结论(TL;DR)
| 压测维度 | 工具推荐 | 关键指标 | 基准参考值 |
|---|---|---|---|
| 接口性能压测 | Gatling | TPS、P99延迟 | P99<5s,TPS>50 |
| 向量检索专项 | 自定义脚本 | 检索P99 | <500ms |
| 嵌入服务专项 | Gatling | 批量吞吐量 | >100 doc/s |
| 端到端场景 | Gatling | 全链路P99 | <10s |
| LLM并发 | 观测为主 | 等待队列长度 | <20 |
压测黄金三角: 找到系统的性能上限 → 确认性能瓶颈在哪里 → 验证优化措施的效果。
RAG系统的性能瓶颈分析
各环节耗时拆解
在做压测之前,先理解RAG系统的延迟来自哪里:
各环节耗时分析:
| 环节 | P50 | P99 | 是否可并发 | 瓶颈风险 |
|---|---|---|---|---|
| Query嵌入 | 50ms | 300ms | 是 | API限流 |
| 向量检索 | 80ms | 500ms | 是 | 内存/索引 |
| Reranker | 300ms | 1500ms | 是 | API限流/网络 |
| LLM生成 | 2000ms | 15000ms | 否(流式) | API限流 |
| 总计 | ~2500ms | ~17000ms | - | LLM占主导 |
关键发现:
- LLM调用占总延迟的80%以上,但它是"天花板"——你无法让LLM更快(只能并发)
- 向量检索和嵌入是可以优化的瓶颈
- 高并发下,嵌入API的限流最先成为瓶颈(OpenAI有RPM/TPM限制)
压测工具选型
Gatling vs JMeter vs k6
| 工具 | 语言 | 优点 | 缺点 | RAG压测适合度 |
|---|---|---|---|---|
| Gatling | Scala DSL | 报告漂亮、异步架构、性能好 | 学习曲线稍高 | ★★★★★ |
| JMeter | XML+GUI | 入门简单、插件多 | 内存占用大、异步支持弱 | ★★★☆☆ |
| k6 | JavaScript | 轻量、云端集成 | 功能相对简单 | ★★★★☆ |
| Locust | Python | 脚本灵活、可视化 | 性能有限 | ★★★☆☆ |
推荐选Gatling的原因:
- Scala DSL表达力强,RAG的复杂场景(条件分支、数据准备)写起来方便
- 原生异步,能模拟RAG的流式响应(SSE/WebSocket)
- 内置HTML报告,P99延迟、TPS直接可视化
- 团队可以把压测脚本提交到Git,作为CI/CD的一部分
用Gatling写RAG压测场景
项目结构
rag-load-test/
├── pom.xml
├── src/
│ └── test/
│ ├── scala/
│ │ └── simulation/
│ │ ├── RagBasicLoadSimulation.scala # 基础负载测试
│ │ ├── RagSpikeTestSimulation.scala # 突发流量测试
│ │ ├── RagEnduranceSimulation.scala # 耐久性测试
│ │ └── RagVectorDbSimulation.scala # 向量库专项
│ └── resources/
│ ├── gatling.conf
│ └── data/
│ └── test_questions.csv # 测试问题库Maven配置
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rag-load-test</artifactId>
<version>1.0.0</version>
<properties>
<gatling.version>3.10.3</gatling.version>
<scala.version>2.13.12</scala.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.gatling.highcharts</groupId>
<artifactId>gatling-charts-highcharts</artifactId>
<version>${gatling.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.gatling</groupId>
<artifactId>gatling-maven-plugin</artifactId>
<version>4.9.0</version>
<configuration>
<simulationClass>simulation.RagBasicLoadSimulation</simulationClass>
<resultsFolder>${project.build.directory}/gatling-results</resultsFolder>
</configuration>
</plugin>
</plugins>
</build>
</project>基础负载测试脚本
// RagBasicLoadSimulation.scala
package simulation
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._
import scala.util.Random
/**
* RAG系统基础负载测试
*
* 测试目标:
* - 验证系统在正常并发下的响应性能
* - 找出P99延迟和最大TPS
* - 验证错误率是否在可接受范围内
*/
class RagBasicLoadSimulation extends Simulation {
// ============== 配置 ==============
val baseUrl = System.getProperty("baseUrl", "http://localhost:8080")
val authToken = System.getProperty("authToken", "test-token-for-load-test")
// 负载参数(可通过系统属性覆盖)
val rampUsers = System.getProperty("rampUsers", "50").toInt
val rampDuration = System.getProperty("rampDuration", "60").toInt // 秒
val steadyDuration = System.getProperty("steadyDuration", "300").toInt // 秒
val maxUsers = System.getProperty("maxUsers", "100").toInt
// ============== HTTP配置 ==============
val httpProtocol = http
.baseUrl(baseUrl)
.acceptHeader("application/json")
.contentTypeHeader("application/json")
.authorizationHeader(s"Bearer $authToken")
.userAgentHeader("Gatling-RAG-LoadTest/1.0")
// 连接超时:10s,响应超时:60s(LLM可能很慢)
.connectionHeader("keep-alive")
// ============== 测试数据 ==============
// 从CSV加载真实问题(更接近生产流量)
val questionFeeder = csv("data/test_questions.csv").circular
// 也可以使用程序生成的问题
val syntheticQuestions = List(
"公司的年假政策是什么?",
"如何申请报销差旅费用?",
"绩效考核的评分标准是什么?",
"新员工试用期是多久?",
"IT设备采购流程是什么?",
"如何申请加班费?",
"公司的保密协议有哪些要求?",
"项目管理的标准流程是什么?",
"数据安全规范有哪些?",
"如何提交代码review?"
)
// ============== 场景定义 ==============
/**
* 场景1:单次简单查询
*/
val simpleQueryScenario = scenario("简单知识查询")
.feed(questionFeeder)
.exec(
http("RAG查询")
.post("/api/rag/query")
.body(StringBody("""{"question": "${question}"}"""))
.asJson
.check(status.is(200))
.check(jsonPath("$.answer").exists)
.check(jsonPath("$.answer").not(""))
.check(responseTimeInMillis.lte(15000)) // 15秒超时
)
.pause(1, 3) // 模拟用户思考时间(1-3秒)
/**
* 场景2:多轮对话(模拟真实用户行为)
*/
val multiTurnChatScenario = scenario("多轮对话")
.exec(
http("第1轮:入门问题")
.post("/api/agent/query")
.body(StringBody("""{"question": "公司内部有什么知识库可以用?"}"""))
.asJson
.check(status.is(200))
.check(jsonPath("$.answer").exists)
)
.pause(2, 5)
.exec(
http("第2轮:深入问题")
.post("/api/agent/query")
.body(StringBody("""{"question": "具体的报销流程是什么,需要准备什么材料?"}"""))
.asJson
.check(status.is(200))
)
.pause(3, 8)
.exec(
http("第3轮:确认问题")
.post("/api/agent/query")
.body(StringBody("""{"question": "报销金额有上限吗?"}"""))
.asJson
.check(status.is(200))
)
/**
* 场景3:混合压测(70%简单查询 + 20%多轮对话 + 10%复杂查询)
*/
val mixedScenario = scenario("混合场景压测")
.randomSwitch(
70.0 -> exec(
http("简单查询")
.post("/api/rag/query")
.body(StringBody(s"""{"question": "${syntheticQuestions(Random.nextInt(syntheticQuestions.size))}"}"""))
.asJson
.check(status.in(200, 429)) // 允许限流(429)
.check(responseTimeInMillis.lte(15000))
),
20.0 -> exec(
http("Agent查询")
.post("/api/agent/query")
.body(StringBody("""{"question": "最近一周有什么重要通知?"}"""))
.asJson
.check(status.in(200, 429))
.check(responseTimeInMillis.lte(30000))
),
10.0 -> exec(
http("复杂查询")
.post("/api/agent/query")
.body(StringBody("""{"question": "公司AI工具采购的完整流程是什么,最近有什么变化,需要找哪个部门审批?"}"""))
.asJson
.check(status.in(200, 429))
.check(responseTimeInMillis.lte(60000))
)
)
.pause(1, 5)
// ============== 负载模型定义 ==============
/**
* 压测阶段1:阶梯加压(找系统上限)
* 从0开始,每30秒增加10个用户,直到100并发
*/
val stairCaseLoad = setUp(
simpleQueryScenario.inject(
// 预热阶段:5个用户跑30秒
atOnceUsers(5),
nothingFor(30.seconds),
// 阶梯加压
rampUsersPerSec(0).to(10).during(60.seconds),
nothingFor(60.seconds),
rampUsersPerSec(10).to(30).during(60.seconds),
nothingFor(60.seconds),
rampUsersPerSec(30).to(50).during(60.seconds),
nothingFor(60.seconds),
rampUsersPerSec(50).to(100).during(60.seconds)
)
).protocols(httpProtocol)
.assertions(
global.responseTime.percentile(99).lte(15000),
global.successfulRequests.percent.gte(95.0),
global.requestsPerSec.gte(10.0)
)
/**
* 压测阶段2:稳定负载测试
* 50并发持续5分钟,验证系统稳定性
*/
val steadyStateLoad = setUp(
mixedScenario.inject(
rampUsers(rampUsers).during(rampDuration.seconds),
constantUsersPerSec(maxUsers / 2).during(steadyDuration.seconds)
)
).protocols(httpProtocol)
.assertions(
global.responseTime.percentile(50).lte(5000),
global.responseTime.percentile(99).lte(15000),
global.successfulRequests.percent.gte(95.0),
forAll.responseTime.percentile(95).lte(20000)
)
// 选择要运行的负载模型(注释掉其他的)
// setUp(stairCaseLoad) // 阶梯加压测上限
setUp(
mixedScenario.inject(
rampUsers(rampUsers).during(rampDuration.seconds),
constantUsersPerSec(maxUsers / 2).during(steadyDuration.seconds)
).protocols(httpProtocol)
).assertions(
global.responseTime.percentile(50).lte(5000),
global.responseTime.percentile(99).lte(15000),
global.successfulRequests.percent.gte(95.0)
)
}突发流量测试(Spike Test)
// RagSpikeTestSimulation.scala
package simulation
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._
/**
* RAG系统突发流量测试
*
* 模拟场景:平时50人用,突然来了500人(全公司同时收到推广邮件)
* 验证:系统的限流、队列、降级机制是否生效
*/
class RagSpikeTestSimulation extends Simulation {
val baseUrl = System.getProperty("baseUrl", "http://localhost:8080")
val authToken = System.getProperty("authToken", "test-token")
val httpProtocol = http
.baseUrl(baseUrl)
.acceptHeader("application/json")
.contentTypeHeader("application/json")
.authorizationHeader(s"Bearer $authToken")
val spikeScenario = scenario("突发流量测试")
.exec(
http("知识库查询(突发)")
.post("/api/rag/query")
.body(StringBody("""{"question": "差旅报销标准"}"""))
.asJson
// 突发场景下,允许429(限流)和503(超载),但不允许500(服务崩溃)
.check(status.in(200, 429, 503))
// 突发场景,超时放宽到30秒
.check(responseTimeInMillis.lte(30000))
)
setUp(
spikeScenario.inject(
// 基准:50个用户的正常负载
rampUsersPerSec(0).to(10).during(30.seconds),
constantUsersPerSec(10).during(60.seconds),
// 突发:1秒内注入500个用户
atOnceUsers(200),
// 观察:突发后系统的恢复情况
nothingFor(10.seconds),
constantUsersPerSec(10).during(120.seconds),
// 第二次突发
atOnceUsers(300),
nothingFor(10.seconds),
// 恢复阶段
constantUsersPerSec(5).during(60.seconds)
)
).protocols(httpProtocol)
.assertions(
// 突发场景的断言更宽松
global.successfulRequests.percent.gte(80.0), // 允许20%失败(限流)
global.responseTime.percentile(99).lte(30000),
// 关键:不允许出现500错误(服务崩溃)
details("知识库查询(突发)").failedRequests.percent.lte(25.0)
)
}耐久性测试(Soak Test)
// RagEnduranceSimulation.scala
package simulation
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._
/**
* RAG系统耐久性测试
*
* 测试目标:
* - 内存泄漏(JVM堆是否持续增长?)
* - 连接池耗尽(数据库/Redis连接是否会耗尽?)
* - 向量数据库长时间运行的稳定性
*
* 运行时间:2小时(生产环境前至少跑1小时)
*/
class RagEnduranceSimulation extends Simulation {
val baseUrl = System.getProperty("baseUrl", "http://localhost:8080")
val httpProtocol = http
.baseUrl(baseUrl)
.acceptHeader("application/json")
.contentTypeHeader("application/json")
.authorizationHeader("Bearer test-token")
// 轮询多种问题类型,模拟长时间的多样化流量
val questionRotation = List(
"""{"question": "年假政策"}""",
"""{"question": "报销流程"}""",
"""{"question": "绩效考核标准"}""",
"""{"question": "IT设备采购"}""",
"""{"question": "数据安全规范"}"""
)
var questionIndex = 0
val enduranceScenario = scenario("耐久性测试")
.exec(session => {
val question = questionRotation(questionIndex % questionRotation.size)
questionIndex += 1
session.set("question", question)
})
.exec(
http("长时间稳定查询")
.post("/api/rag/query")
.body(StringBody("${question}"))
.asJson
.check(status.is(200))
.check(responseTimeInMillis.lte(15000))
)
.pause(2, 8) // 2-8秒的用户思考时间
setUp(
enduranceScenario.inject(
// 慢慢加到20个用户,然后保持2小时
rampUsers(20).during(5.minutes),
constantUsersPerSec(5).during(115.minutes) // ~2小时
)
).protocols(httpProtocol)
.assertions(
// 耐久性测试最重要:成功率不能随时间下降
global.successfulRequests.percent.gte(98.0),
// P99不能随时间增长(排除内存泄漏等问题)
global.responseTime.percentile(99).lte(12000),
// 不允许任何错误率超过2%
global.failedRequests.percent.lte(2.0)
)
}测试数据准备
// TestQuestionGenerator.java
package com.example.ragloadtest;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
/**
* 压测问题数据生成器
*
* 生成test_questions.csv,供Gatling feeder使用
*
* 重要原则:
* 1. 使用真实的业务问题(从生产日志中采样)
* 2. 覆盖不同类型的问题(短问题/长问题/模糊问题/精确问题)
* 3. 包含一些"找不到答案"的问题(测试空结果处理)
*/
public class TestQuestionGenerator {
private static final List<String> QUESTIONS = List.of(
// ===== HR相关(30%流量)=====
"公司年假怎么算",
"病假工资怎么发",
"新员工试用期多久",
"绩效考核一年几次",
"哺乳假有多少天",
"婚假几天有薪",
// ===== 财务/报销(25%流量)=====
"差旅费用怎么报销",
"出差补贴标准是什么",
"报销需要提交哪些材料",
"发票丢了怎么处理",
"超标消费能不能报",
// ===== IT/技术(20%流量)=====
"申请电脑需要什么流程",
"VPN怎么连接",
"代码提交规范是什么",
"软件采购审批流程",
"数据备份策略是什么",
// ===== 项目管理(15%流量)=====
"项目上线需要哪些审批",
"代码review标准是什么",
"需求变更如何处理",
"项目复盘怎么做",
// ===== 边界测试(10%流量)=====
"xxxxxx随机不存在的内容xxxx", // 测试空结果
"???", // 测试特殊字符
"a", // 测试超短查询
"公司的发展历史最近的变化趋势和未来战略规划方向以及核心竞争力分析" // 测试超长查询
);
public static void generateCsv(String outputPath) throws IOException {
try (FileWriter writer = new FileWriter(outputPath)) {
writer.write("question\n"); // CSV头
for (String q : QUESTIONS) {
writer.write(escapeCsv(q) + "\n");
}
}
System.out.println("生成测试数据: " + outputPath + ",共" + QUESTIONS.size() + "条");
}
private static String escapeCsv(String value) {
if (value.contains(",") || value.contains("\"") || value.contains("\n")) {
return "\"" + value.replace("\"", "\"\"") + "\"";
}
return value;
}
public static void main(String[] args) throws IOException {
generateCsv("src/test/resources/data/test_questions.csv");
}
}向量数据库专项压测
// MilvusLoadTest.java
package com.example.ragloadtest;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.SearchResults;
import io.milvus.param.ConnectParam;
import io.milvus.param.R;
import io.milvus.param.dml.SearchParam;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Milvus向量数据库专项压测
*
* 直接压测向量检索,排除嵌入API和LLM的干扰,
* 测试向量数据库本身的性能上限
*/
@Slf4j
public class MilvusLoadTest {
private static final int VECTOR_DIM = 1536;
private static final String COLLECTION_NAME = "knowledge_base";
/**
* 并发向量检索压测
*
* @param host Milvus地址
* @param port Milvus端口
* @param concurrency 并发线程数
* @param duration 测试持续时间(秒)
*/
public static void runConcurrentSearchTest(String host, int port,
int concurrency, int duration) {
MilvusServiceClient client = new MilvusServiceClient(
ConnectParam.newBuilder()
.withHost(host)
.withPort(port)
.build()
);
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
// 统计指标
AtomicLong totalRequests = new AtomicLong(0);
AtomicLong successRequests = new AtomicLong(0);
AtomicLong failedRequests = new AtomicLong(0);
ConcurrentLinkedQueue<Long> latencies = new ConcurrentLinkedQueue<>();
long startTime = System.currentTimeMillis();
long endTime = startTime + (duration * 1000L);
log.info("开始Milvus向量检索压测: 并发={}, 持续={}秒", concurrency, duration);
// 启动并发测试线程
List<Future<?>> futures = IntStream.range(0, concurrency)
.mapToObj(i -> executor.submit(() -> {
Random random = new Random(i);
while (System.currentTimeMillis() < endTime) {
// 生成随机查询向量
List<Float> queryVector = generateRandomVector(VECTOR_DIM, random);
long reqStart = System.nanoTime();
try {
SearchParam searchParam = SearchParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withMetricType(io.milvus.param.MetricType.COSINE)
.withOutFields(List.of("content", "title"))
.withTopK(10)
.withVectors(List.of(queryVector))
.withVectorFieldName("vector")
.withParams("{\"nprobe\": 32}")
.build();
R<SearchResults> result = client.search(searchParam);
long latencyMs = (System.nanoTime() - reqStart) / 1_000_000;
latencies.add(latencyMs);
if (result.getStatus() == 0) {
successRequests.incrementAndGet();
} else {
failedRequests.incrementAndGet();
}
totalRequests.incrementAndGet();
} catch (Exception e) {
failedRequests.incrementAndGet();
totalRequests.incrementAndGet();
log.warn("检索异常: {}", e.getMessage());
}
}
}))
.collect(Collectors.toList());
// 等待所有线程完成
futures.forEach(f -> {
try { f.get(); } catch (Exception e) { log.error("线程异常", e); }
});
executor.shutdown();
client.close();
// 输出统计结果
long actualDuration = System.currentTimeMillis() - startTime;
printReport(totalRequests.get(), successRequests.get(), failedRequests.get(),
latencies, actualDuration);
}
private static void printReport(long total, long success, long failed,
ConcurrentLinkedQueue<Long> latencies, long durationMs) {
List<Long> sortedLatencies = new ArrayList<>(latencies);
Collections.sort(sortedLatencies);
double qps = total * 1000.0 / durationMs;
double successRate = success * 100.0 / total;
System.out.println("\n========== Milvus压测报告 ==========");
System.out.printf("总请求数: %,d%n", total);
System.out.printf("成功请求数: %,d (%.1f%%)%n", success, successRate);
System.out.printf("失败请求数: %,d%n", failed);
System.out.printf("测试时长: %.1f秒%n", durationMs / 1000.0);
System.out.printf("QPS: %.1f/s%n", qps);
System.out.println();
System.out.printf("延迟统计(ms):%n");
System.out.printf(" P50: %d ms%n", getPercentile(sortedLatencies, 50));
System.out.printf(" P75: %d ms%n", getPercentile(sortedLatencies, 75));
System.out.printf(" P95: %d ms%n", getPercentile(sortedLatencies, 95));
System.out.printf(" P99: %d ms%n", getPercentile(sortedLatencies, 99));
System.out.printf(" P999: %d ms%n", getPercentile(sortedLatencies, 99.9));
System.out.printf(" Max: %d ms%n", sortedLatencies.get(sortedLatencies.size() - 1));
System.out.println("=====================================\n");
// 给出评估
long p99 = getPercentile(sortedLatencies, 99);
if (p99 < 200) {
System.out.println("评估: ✓ 向量检索P99 < 200ms,性能优秀");
} else if (p99 < 500) {
System.out.println("评估: ⚠ 向量检索P99 < 500ms,性能良好");
} else {
System.out.println("评估: ✗ 向量检索P99 >= 500ms,建议优化索引参数");
}
}
private static List<Float> generateRandomVector(int dim, Random random) {
List<Float> vector = new ArrayList<>(dim);
float norm = 0;
for (int i = 0; i < dim; i++) {
float val = random.nextFloat() * 2 - 1;
vector.add(val);
norm += val * val;
}
// 归一化
norm = (float) Math.sqrt(norm);
for (int i = 0; i < dim; i++) {
vector.set(i, vector.get(i) / norm);
}
return vector;
}
private static long getPercentile(List<Long> sorted, double percentile) {
if (sorted.isEmpty()) return 0;
int index = (int) Math.ceil(percentile / 100.0 * sorted.size()) - 1;
return sorted.get(Math.max(0, Math.min(index, sorted.size() - 1)));
}
public static void main(String[] args) {
String host = System.getProperty("milvus.host", "localhost");
int port = Integer.parseInt(System.getProperty("milvus.port", "19530"));
int concurrency = Integer.parseInt(System.getProperty("concurrency", "20"));
int duration = Integer.parseInt(System.getProperty("duration", "60"));
runConcurrentSearchTest(host, port, concurrency, duration);
}
}嵌入服务专项压测
// EmbeddingServiceLoadTest.java
package com.example.ragloadtest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.embedding.EmbeddingModel;
import org.springframework.ai.embedding.EmbeddingRequest;
import org.springframework.ai.embedding.EmbeddingResponse;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 嵌入服务专项压测
*
* 测试目标:
* 1. 单次嵌入的延迟分布
* 2. 批量嵌入的吞吐量
* 3. 在API限流下的降级表现
*
* OpenAI text-embedding-3-small的限制:
* - RPM: 3000 (requests per minute)
* - TPM: 1,000,000 (tokens per minute)
* - 建议QPS上限: ~50/s(留有余量)
*/
@Slf4j
public class EmbeddingServiceLoadTest {
private final EmbeddingModel embeddingModel;
public EmbeddingServiceLoadTest(EmbeddingModel embeddingModel) {
this.embeddingModel = embeddingModel;
}
/**
* 测试单次嵌入延迟
*/
public void testSingleEmbeddingLatency(int iterations) {
String testText = "这是一个用于测试嵌入性能的示例文本,包含中英文混合内容。This is a test embedding.";
List<Long> latencies = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
long start = System.nanoTime();
try {
embeddingModel.embed(testText);
long latencyMs = (System.nanoTime() - start) / 1_000_000;
latencies.add(latencyMs);
} catch (Exception e) {
log.warn("嵌入计算失败: {}", e.getMessage());
}
// 控制请求频率(避免被限流)
try { Thread.sleep(20); } catch (InterruptedException ignored) {}
}
Collections.sort(latencies);
System.out.printf("单次嵌入延迟(%d次): P50=%dms, P95=%dms, P99=%dms%n",
iterations,
getPercentile(latencies, 50),
getPercentile(latencies, 95),
getPercentile(latencies, 99));
}
/**
* 测试并发嵌入吞吐量
*/
public void testConcurrentEmbedding(int concurrency, int durationSeconds) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
AtomicLong successCount = new AtomicLong(0);
AtomicLong failCount = new AtomicLong(0);
AtomicLong rateLimitCount = new AtomicLong(0);
ConcurrentLinkedQueue<Long> latencies = new ConcurrentLinkedQueue<>();
long endTime = System.currentTimeMillis() + durationSeconds * 1000L;
List<String> testTexts = List.of(
"公司年假政策相关内容",
"差旅报销审批流程文档",
"技术架构设计规范说明",
"项目管理标准化流程",
"员工绩效考核评定方案"
);
log.info("开始嵌入服务压测: 并发={}, 持续={}秒", concurrency, durationSeconds);
// 启动并发请求
List<Future<?>> futures = IntStream.range(0, concurrency)
.mapToObj(i -> executor.submit(() -> {
Random random = new Random(i);
while (System.currentTimeMillis() < endTime) {
String text = testTexts.get(random.nextInt(testTexts.size()));
long start = System.nanoTime();
try {
embeddingModel.embed(text);
long latencyMs = (System.nanoTime() - start) / 1_000_000;
latencies.add(latencyMs);
successCount.incrementAndGet();
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("429")) {
rateLimitCount.incrementAndGet();
// 限流时退避
try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
} else {
failCount.incrementAndGet();
}
}
}
}))
.collect(Collectors.toList());
futures.forEach(f -> {
try { f.get(); } catch (Exception e) { log.error("压测线程异常", e); }
});
executor.shutdown();
// 统计输出
List<Long> sortedLatencies = new ArrayList<>(latencies);
Collections.sort(sortedLatencies);
long total = successCount.get() + failCount.get() + rateLimitCount.get();
double qps = (double) successCount.get() / durationSeconds;
System.out.println("\n========== 嵌入服务压测报告 ==========");
System.out.printf("成功请求: %,d (QPS: %.1f/s)%n", successCount.get(), qps);
System.out.printf("失败请求: %,d%n", failCount.get());
System.out.printf("限流次数: %,d%n", rateLimitCount.get());
if (!sortedLatencies.isEmpty()) {
System.out.printf("P50延迟: %dms%n", getPercentile(sortedLatencies, 50));
System.out.printf("P95延迟: %dms%n", getPercentile(sortedLatencies, 95));
System.out.printf("P99延迟: %dms%n", getPercentile(sortedLatencies, 99));
}
System.out.println("==========================================");
}
private long getPercentile(List<Long> sorted, double percentile) {
if (sorted.isEmpty()) return 0;
int index = (int) Math.ceil(percentile / 100.0 * sorted.size()) - 1;
return sorted.get(Math.max(0, Math.min(index, sorted.size() - 1)));
}
}端到端压测:模拟真实用户场景
// RagE2ESimulation.scala
package simulation
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._
/**
* RAG系统端到端场景压测
*
* 模拟真实用户的完整使用路径:
* 1. 登录获取Token
* 2. 问一个简单问题
* 3. 基于回答追问
* 4. 查询相关文档
*
* 更接近生产流量特征
*/
class RagE2ESimulation extends Simulation {
val baseUrl = System.getProperty("baseUrl", "http://localhost:8080")
val httpProtocol = http
.baseUrl(baseUrl)
.acceptHeader("application/json")
.contentTypeHeader("application/json")
// 共享HTTP连接(模拟真实浏览器行为)
.shareConnections
// 用户账号feeder
val userFeeder = Iterator.continually(Map(
"username" -> s"test_user_${scala.util.Random.nextInt(100)}",
"password" -> "test_password_123"
))
val e2eScenario = scenario("完整用户会话")
.feed(userFeeder)
// 步骤1:登录
.exec(
http("用户登录")
.post("/api/auth/login")
.body(StringBody("""{"username": "${username}", "password": "${password}"}"""))
.asJson
.check(status.is(200))
.check(jsonPath("$.token").saveAs("authToken"))
)
.pause(1, 3) // 用户看完登录界面的停留时间
// 步骤2:首次查询
.exec(
http("首次知识查询")
.post("/api/rag/query")
.header("Authorization", "Bearer ${authToken}")
.body(StringBody("""{"question": "公司的报销政策是什么"}"""))
.asJson
.check(status.is(200))
.check(jsonPath("$.answer").exists)
.check(jsonPath("$.answer").saveAs("firstAnswer"))
)
.pause(5, 15) // 用户阅读答案的时间
// 步骤3:基于回答追问
.exec(
http("追问细节")
.post("/api/rag/query")
.header("Authorization", "Bearer ${authToken}")
.body(StringBody("""{"question": "发票超过报销金额上限怎么处理"}"""))
.asJson
.check(status.is(200))
)
.pause(3, 10)
// 步骤4:查询相关文档(如果系统有文档列表功能)
.exec(
http("查看关联文档")
.get("/api/documents/search?keyword=报销")
.header("Authorization", "Bearer ${authToken}")
.check(status.in(200, 404)) // 可能没有这个接口
)
.pause(2, 5)
// 步骤5:最后一个问题后退出
.exec(
http("最终查询")
.post("/api/rag/query")
.header("Authorization", "Bearer ${authToken}")
.body(StringBody("""{"question": "感谢,再见"}"""))
.asJson
.check(status.in(200, 400)) // 可能返回400(无效查询)
)
setUp(
e2eScenario.inject(
// 渐进加压,模拟工作日上午的流量增长
rampUsers(10).during(2.minutes),
constantUsersPerSec(5).during(10.minutes),
rampUsersPerSec(5).to(20).during(5.minutes),
constantUsersPerSec(20).during(10.minutes)
)
).protocols(httpProtocol)
.assertions(
global.successfulRequests.percent.gte(95.0),
global.responseTime.percentile(50).lte(5000),
global.responseTime.percentile(99).lte(30000),
// 关键:登录成功率必须100%
details("用户登录").successfulRequests.percent.is(100)
)
}找瓶颈:压测结果定位优化点
性能瓶颈诊断工具
// PerformanceBottleneckAnalyzer.java
package com.example.ragloadtest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
/**
* 性能瓶颈分析器
*
* 通过分段测量找出端到端延迟中的最大瓶颈
*/
@Slf4j
public class PerformanceBottleneckAnalyzer {
private final VectorStore vectorStore;
private final WebClient openAiClient;
public PerformanceBottleneckAnalyzer(VectorStore vectorStore,
WebClient.Builder webClientBuilder) {
this.vectorStore = vectorStore;
this.openAiClient = webClientBuilder
.baseUrl("https://api.openai.com")
.build();
}
/**
* 对各环节单独压测,找出最慢的瓶颈
*/
public BottleneckReport analyze(int iterations, int concurrency) throws InterruptedException {
System.out.println("开始性能瓶颈分析...\n");
// 测量各环节延迟
long embeddingP99 = benchmarkEmbedding(iterations);
long vectorSearchP99 = benchmarkVectorSearch(iterations, concurrency);
long llmP99 = estimateLlmLatency(); // LLM通常无法直接压测,估算
return BottleneckReport.builder()
.embeddingP99Ms(embeddingP99)
.vectorSearchP99Ms(vectorSearchP99)
.llmP99Ms(llmP99)
.totalEstimatedP99Ms(embeddingP99 + vectorSearchP99 + llmP99)
.bottleneck(identifyBottleneck(embeddingP99, vectorSearchP99, llmP99))
.build();
}
private long benchmarkEmbedding(int iterations) {
System.out.print("测量嵌入延迟...");
List<Long> latencies = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
long start = System.nanoTime();
try {
// 实际测试嵌入调用
Thread.sleep(60); // 模拟:实际应该调用embeddingModel.embed()
latencies.add((System.nanoTime() - start) / 1_000_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (i % 10 == 0) System.out.print(".");
}
Collections.sort(latencies);
long p99 = latencies.get((int)(latencies.size() * 0.99));
System.out.printf(" P99=%dms%n", p99);
return p99;
}
private long benchmarkVectorSearch(int iterations, int concurrency) throws InterruptedException {
System.out.print("测量向量检索延迟...");
ConcurrentLinkedQueue<Long> latencies = new ConcurrentLinkedQueue<>();
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
CountDownLatch latch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) {
executor.submit(() -> {
long start = System.nanoTime();
try {
vectorStore.similaritySearch("测试向量检索查询");
latencies.add((System.nanoTime() - start) / 1_000_000);
} catch (Exception e) {
latencies.add(9999L); // 标记失败
} finally {
latch.countDown();
}
});
}
latch.await(60, TimeUnit.SECONDS);
executor.shutdown();
List<Long> sorted = new ArrayList<>(latencies);
Collections.sort(sorted);
long p99 = sorted.get((int)(sorted.size() * 0.99));
System.out.printf(" P99=%dms%n", p99);
return p99;
}
private long estimateLlmLatency() {
// LLM延迟通过观测生产日志估算,不直接压测(会产生费用)
System.out.println("LLM延迟(估算): P99=8000ms(基于历史观测)");
return 8000;
}
private String identifyBottleneck(long embedding, long vectorSearch, long llm) {
if (llm > embedding + vectorSearch) return "LLM生成(正常,LLM本身慢)";
if (embedding > vectorSearch) return "嵌入服务(检查API限流)";
return "向量检索(检查索引和内存配置)";
}
@lombok.Builder @lombok.Data
public static class BottleneckReport {
private long embeddingP99Ms;
private long vectorSearchP99Ms;
private long llmP99Ms;
private long totalEstimatedP99Ms;
private String bottleneck;
public void print() {
System.out.println("\n========== 性能瓶颈分析报告 ==========");
System.out.printf("嵌入服务 P99: %,dms (%.1f%%)%n", embeddingP99Ms,
100.0 * embeddingP99Ms / totalEstimatedP99Ms);
System.out.printf("向量检索 P99: %,dms (%.1f%%)%n", vectorSearchP99Ms,
100.0 * vectorSearchP99Ms / totalEstimatedP99Ms);
System.out.printf("LLM生成 P99: %,dms (%.1f%%)%n", llmP99Ms,
100.0 * llmP99Ms / totalEstimatedP99Ms);
System.out.printf("端到端估算P99: %,dms%n", totalEstimatedP99Ms);
System.out.printf("主要瓶颈: %s%n", bottleneck);
System.out.println("==========================================");
}
}
}压测报告模板
// LoadTestReportGenerator.java
package com.example.ragloadtest;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 压测报告生成器
*
* 生成可以直接汇报给老板/技术领导的格式化报告
*/
public class LoadTestReportGenerator {
/**
* 生成完整压测报告(Markdown格式)
*/
public static String generateReport(LoadTestResult result) {
return String.format("""
# RAG系统压测报告
**测试时间:** %s
**测试环境:** %s
**测试人员:** %s
---
## 一、测试结论
> **系统能否上线:%s**
%s
---
## 二、测试结果摘要
| 指标 | 测试值 | 目标值 | 是否达标 |
|------|--------|--------|---------|
| 最大TPS | %,.1f | ≥%d | %s |
| P50延迟 | %dms | ≤5000ms | %s |
| P99延迟 | %dms | ≤15000ms | %s |
| 错误率 | %.2f%% | ≤5%% | %s |
| 最大并发 | %d | ≥%d | %s |
---
## 三、性能瓶颈
**主要瓶颈:** %s
**瓶颈原因:** %s
---
## 四、优化建议
%s
---
## 五、风险评估
| 风险项 | 风险描述 | 缓解措施 |
|--------|---------|---------|
| API限流 | OpenAI嵌入API在高并发下可能限流 | 增加本地嵌入缓存,异步队列控制并发 |
| LLM慢响应 | LLM P99延迟达15s,用户体验差 | 前端流式显示,设置30s超时 |
| 内存增长 | 长时间压测后Milvus内存使用率升高 | 上线前监控内存,设置告警阈值80%% |
---
## 六、上线建议
**建议上线配置:**
- 并发限制:最大 %d 个同时在线用户
- 限流策略:每用户每分钟最多 20 次查询
- 超时设置:嵌入 10s,检索 5s,LLM 60s
- 扩容触发:P99延迟 > 10s 时自动扩容
---
*本报告由压测工具自动生成,具体数据以Gatling HTML报告为准。*
""",
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")),
result.getEnvironment(),
result.getTester(),
result.isPassedOverall() ? "✅ 可以上线" : "❌ 不建议上线",
result.getOverallConclusion(),
result.getMaxTps(), result.getTargetTps(),
result.getMaxTps() >= result.getTargetTps() ? "✅" : "❌",
result.getP50LatencyMs(), result.getP50LatencyMs() <= 5000 ? "✅" : "❌",
result.getP99LatencyMs(), result.getP99LatencyMs() <= 15000 ? "✅" : "❌",
result.getErrorRate(), result.getErrorRate() <= 5.0 ? "✅" : "❌",
result.getMaxConcurrency(), result.getTargetConcurrency(),
result.getMaxConcurrency() >= result.getTargetConcurrency() ? "✅" : "❌",
result.getBottleneck(),
result.getBottleneckReason(),
result.getOptimizationSuggestions(),
result.getRecommendedMaxUsers()
);
}
@lombok.Builder @lombok.Data
public static class LoadTestResult {
private String environment;
private String tester;
private boolean passedOverall;
private String overallConclusion;
private double maxTps;
private int targetTps;
private long p50LatencyMs;
private long p99LatencyMs;
private double errorRate;
private int maxConcurrency;
private int targetConcurrency;
private String bottleneck;
private String bottleneckReason;
private String optimizationSuggestions;
private int recommendedMaxUsers;
}
}生产注意事项
压测执行清单
#!/bin/bash
# pre_load_test_checklist.sh
# 压测前检查清单
echo "===== RAG系统压测前检查清单 ====="
# 1. 确认是测试环境(不要在生产压测!)
read -p "确认当前环境是测试环境(非生产)?(y/n): " confirm
if [ "$confirm" != "y" ]; then
echo "已取消:请在测试环境执行压测"
exit 1
fi
# 2. 检查测试数据是否准备好
if [ ! -f "src/test/resources/data/test_questions.csv" ]; then
echo "ERROR: 测试问题文件不存在,请先运行 TestQuestionGenerator"
exit 1
fi
echo "✓ 测试数据文件存在"
# 3. 确认基础服务健康
for service in "http://localhost:8081/actuator/health" "http://localhost:9091/healthz"; do
status=$(curl -s -o /dev/null -w "%{http_code}" "$service")
if [ "$status" = "200" ]; then
echo "✓ $service 健康"
else
echo "✗ $service 异常 (HTTP $status)"
exit 1
fi
done
# 4. 确认监控系统正在运行(压测期间要看监控)
grafana_status=$(curl -s -o /dev/null -w "%{http_code}" "http://localhost:3000")
if [ "$grafana_status" = "200" ]; then
echo "✓ Grafana监控系统正常"
else
echo "⚠ Grafana不可用,建议先启动监控再压测"
fi
# 5. 记录压测前的基准状态
echo "记录压测前基准状态..."
curl -s "http://localhost:9090/api/v1/query?query=jvm_memory_used_bytes{area='heap'}" > /tmp/before_heap.json
echo ""
echo "===== 检查通过,可以开始压测 ====="
echo "运行命令: mvn gatling:test -DsimulationClass=simulation.RagBasicLoadSimulation"常见问题解答
Q1:压测时应该用真实Token还是测试Token?
生产前的压测建议使用专门的压测账号Token,不要用真实用户Token(避免产生审计日志干扰、避免触发安全告警)。压测账号应该有正常的权限,能访问大部分功能,但在日志中可以通过user_id识别为压测流量。
Q2:压测会产生OpenAI费用吗?
会。每次压测都会产生嵌入调用费用(text-embedding-3-small约$0.02/1000次请求)和LLM生成费用。对于50并发、5分钟的压测,费用大约在$5-20之间,可以接受。如果想节省费用,可以在嵌入层加一层mock,只压测向量检索和应用层的性能。
Q3:压测结果P99很高(15s),是正常的吗?
RAG系统的P99高是正常的,主要因为LLM生成本身就慢(5-15s),加上偶尔的API限流和网络抖动。关键是P99不能随时间持续增长——如果1小时耐久测试后P99从15s增长到30s,说明有内存泄漏或连接池耗尽问题。
Q4:压测时Milvus内存使用率不断增长怎么办?
Milvus在运行过程中会缓存查询结果和向量数据,内存使用率增长一定程度后会稳定,这是正常的。但如果一直线性增长不停止,可能是索引未正确加载或存在内存泄漏。压测时观察Milvus内存是否会在某个水位线稳定下来,不稳定则需要排查。
Q5:如何模拟更真实的用户行为?
使用Gatling的feeder加载真实生产问题(从日志中采样),而不是用固定的测试问题。另外,加入真实的用户停留时间(pause),比如用户阅读答案通常需要5-30秒,不是连续发请求。这些细节让压测结果更接近生产实际。
Q6:压测多高的并发才算"够了"?
计算公式:目标并发 = 预期每日活跃用户 * 峰值系数 / 平均会话时长(分钟)。例如:5000日活用户,峰值是均值的3倍,平均会话10分钟 = 5000 * 3 / 144 ≈ 104并发。建议目标并发设为计算值的1.5倍作为安全余量(约150并发)。
总结
"没有压测的系统,是在用用户来压测。"
行动清单:
压测的终点不是"系统没挂",而是"我们知道系统能撑多少,以及哪里是瓶颈"。
