第2380篇:RAG系统的并发优化——高并发场景下向量检索的性能瓶颈
大约 6 分钟
第2380篇:RAG系统的并发优化——高并发场景下向量检索的性能瓶颈
适读人群:需要支撑高并发的RAG系统工程师 | 阅读时长:约20分钟 | 核心价值:掌握RAG系统在高并发下的性能优化策略,从单机到集群的工程解法
双十一前一个月,我们的RAG客服系统开始压测。模拟1000个并发用户,结果系统直接崩了——不是内存溢出,而是向量检索超时导致的级联故障。
查了一下原因:向量数据库的QPS上限约是200,但我们设计时假设每个用户问题只触发一次检索,1000并发就是1000QPS,直接压垮了。
这件事让我深入研究了一次RAG系统在高并发下的性能瓶颈和优化策略。分享一下我们最终做了什么。
识别RAG的性能瓶颈
/**
* RAG系统的典型性能瓶颈分布
*
* 组件 典型延迟 扩展性
* --------------------------------------------------------
* 查询向量化 50-200ms 好(无状态,水平扩展容易)
* 向量数据库检索 100-500ms 一般(有状态,扩展有代价)
* Reranking模型 200-1000ms 好(无状态,CPU密集)
* LLM API调用 500-5000ms 好(外部服务,有Rate Limit)
* 上下文构建 <10ms 优秀(内存操作)
*
* 结论:
* - 向量数据库是最容易成为瓶颈的组件(有状态,扩展成本高)
* - LLM API有Rate Limit,是另一个主要瓶颈
* - 要针对这两个方向做优化
*/优化一:查询结果缓存
最高ROI的优化:相同或相似的查询,不需要重复检索。
@Service
public class CachedVectorRetriever {
private final VectorStore vectorStore;
private final EmbeddingModel embeddingModel;
// 精确查询缓存(完全相同的查询字符串)
@Autowired
private Cache<String, List<Document>> exactQueryCache;
// 语义查询缓存(相似向量的查询共用结果)
private final SemanticQueryCache semanticCache;
/**
* 带缓存的检索
*/
public List<Document> retrieve(String query) {
// 第一层:精确字符串缓存(最快,命中率中等)
List<Document> exactCached = exactQueryCache.getIfPresent(query);
if (exactCached != null) {
metrics.recordCacheHit("exact");
return exactCached;
}
// 计算查询向量
float[] queryVector = embeddingModel.embed(query);
// 第二层:语义缓存(稍慢,命中率更高)
Optional<List<Document>> semanticCached = semanticCache.get(queryVector, 0.98f);
if (semanticCached.isPresent()) {
metrics.recordCacheHit("semantic");
// 把结果也放入精确缓存
exactQueryCache.put(query, semanticCached.get());
return semanticCached.get();
}
// 缓存未命中,执行实际检索
List<Document> result = vectorStore.similaritySearchByVector(queryVector, 5);
// 写入两级缓存
exactQueryCache.put(query, result);
semanticCache.put(queryVector, result);
metrics.recordCacheMiss();
return result;
}
}
/**
* 语义缓存:缓存向量相似的查询结果
* 核心:查询A和查询B的向量相似度>0.98,就认为结果一样
*/
@Component
public class SemanticQueryCache {
// 存储:(向量, 结果) 对
private final List<CachedEntry> entries = new CopyOnWriteArrayList<>();
// 缓存大小限制
private static final int MAX_ENTRIES = 5000;
public Optional<List<Document>> get(float[] queryVector, float threshold) {
return entries.stream()
.filter(e -> !e.isExpired())
.filter(e -> cosineSimilarity(queryVector, e.getVector()) >= threshold)
.findFirst()
.map(CachedEntry::getResult);
}
public void put(float[] vector, List<Document> result) {
if (entries.size() >= MAX_ENTRIES) {
// LRU淘汰
entries.remove(0);
}
entries.add(new CachedEntry(vector, result,
Instant.now().plusSeconds(1800))); // 30分钟过期
}
}优化二:请求合并(Batching)
高并发下,把多个并发查询合并成一次批量检索。
@Service
public class BatchedVectorRetriever {
private final VectorStore vectorStore;
// 批处理队列
private final BlockingQueue<PendingQuery> queryQueue = new LinkedBlockingQueue<>();
// 批处理配置
private static final int MAX_BATCH_SIZE = 20;
private static final long MAX_BATCH_WAIT_MS = 10; // 最多等待10ms
@PostConstruct
public void startBatchProcessor() {
Thread batchThread = new Thread(this::processBatches, "rag-batch-processor");
batchThread.setDaemon(true);
batchThread.start();
}
/**
* 提交查询,等待批处理结果
*/
public CompletableFuture<List<Document>> retrieveAsync(String query, float[] queryVector) {
CompletableFuture<List<Document>> future = new CompletableFuture<>();
queryQueue.offer(new PendingQuery(query, queryVector, future));
return future;
}
/**
* 批处理循环
* 收集一批查询,一次性发给向量库
*/
private void processBatches() {
while (true) {
try {
List<PendingQuery> batch = new ArrayList<>();
// 等待第一个查询到来
PendingQuery first = queryQueue.poll(100, TimeUnit.MILLISECONDS);
if (first == null) continue;
batch.add(first);
// 继续收集,直到达到最大批大小或超时
long batchStart = System.currentTimeMillis();
while (batch.size() < MAX_BATCH_SIZE) {
long remaining = MAX_BATCH_WAIT_MS - (System.currentTimeMillis() - batchStart);
if (remaining <= 0) break;
PendingQuery next = queryQueue.poll(remaining, TimeUnit.MILLISECONDS);
if (next == null) break;
batch.add(next);
}
// 执行批量检索
executeBatch(batch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void executeBatch(List<PendingQuery> batch) {
// 注:实际向量库的批量接口需要根据具体产品调整
// 这里展示概念
try {
for (PendingQuery query : batch) {
List<Document> result = vectorStore.similaritySearchByVector(
query.getVector(), 5
);
query.getFuture().complete(result);
}
} catch (Exception e) {
batch.forEach(q -> q.getFuture().completeExceptionally(e));
}
}
}优化三:连接池和超时控制
@Configuration
public class VectorStoreConfig {
/**
* 向量数据库连接池配置
*
* 关键参数调优:
* - 连接数不是越多越好,要结合服务端能力
* - 超时要设合理,避免慢查询拖垮整个系统
*/
@Bean
public VectorStoreConnectionPool vectorStoreConnectionPool() {
return VectorStoreConnectionPool.builder()
.maxConnections(50) // 最大连接数
.minIdleConnections(10) // 最小空闲连接
.connectionTimeout(200) // 连接超时 200ms
.queryTimeout(1000) // 查询超时 1000ms
.idleTimeout(300000) // 空闲连接超时 5分钟
.build();
}
/**
* 熔断器配置
* 向量库故障时,快速失败而不是等待超时
*/
@Bean
public CircuitBreaker vectorStoreCircuitBreaker() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 50%失败率触发熔断
.waitDurationInOpenState(Duration.ofSeconds(30)) // 熔断30秒
.slidingWindowSize(10) // 10次调用的滑动窗口
.minimumNumberOfCalls(5) // 最少5次调用才统计
.build();
return CircuitBreakerRegistry.of(config)
.circuitBreaker("vectorStore");
}
}
@Service
public class ResilientVectorRetriever {
private final VectorStore vectorStore;
private final CircuitBreaker circuitBreaker;
private final Cache<String, List<Document>> emergencyCache;
/**
* 带熔断和降级的检索
*/
public List<Document> retrieve(String query) {
Supplier<List<Document>> retrieval = CircuitBreaker
.decorateSupplier(circuitBreaker, () ->
vectorStore.similaritySearch(SearchRequest.query(query).withTopK(5))
);
try {
return retrieval.get();
} catch (CallNotPermittedException e) {
// 熔断器打开,使用降级策略
log.warn("Vector store circuit breaker is OPEN, using fallback");
return fallbackRetrieval(query);
}
}
/**
* 降级检索:向量库不可用时的备用方案
* 使用关键词搜索或从缓存获取
*/
private List<Document> fallbackRetrieval(String query) {
// 方案1:从紧急缓存获取(最近的热点查询)
List<Document> cached = emergencyCache.getIfPresent(query);
if (cached != null) {
return cached;
}
// 方案2:关键词全文搜索(慢但总比没有好)
return fallbackKeywordSearch(query);
}
}优化四:向量索引预加热
@Component
public class VectorIndexWarmer {
/**
* 系统启动时预热向量索引
*
* 问题:向量数据库冷启动时,索引不在内存中,
* 第一批查询会非常慢(触发磁盘加载)
*
* 解决:启动时发送一批查询,让热点数据加载进内存
*/
@PostConstruct
public void warmUpIndex() {
log.info("Starting vector index warm-up...");
// 获取历史高频查询
List<String> topQueries = queryLogService.getTopQueries(100);
// 并发发送预热查询
ExecutorService warmupExecutor = Executors.newFixedThreadPool(10);
List<Future<?>> futures = new ArrayList<>();
for (String query : topQueries) {
futures.add(warmupExecutor.submit(() -> {
try {
vectorStore.similaritySearch(SearchRequest.query(query).withTopK(3));
} catch (Exception e) {
log.warn("Warmup query failed: {}", query);
}
}));
}
// 等待预热完成
futures.forEach(f -> {
try { f.get(30, TimeUnit.SECONDS); }
catch (Exception e) { /* ignore */ }
});
warmupExecutor.shutdown();
log.info("Vector index warm-up completed for {} queries", topQueries.size());
}
}压测指标体系
@Component
public class RAGPerformanceMetrics {
private final MeterRegistry meterRegistry;
/**
* 需要监控的关键指标
*/
public void recordRetrieval(String queryId, long embeddingMs,
long searchMs, int docCount) {
// 向量化延迟
meterRegistry.timer("rag.embedding.duration")
.record(embeddingMs, TimeUnit.MILLISECONDS);
// 检索延迟
meterRegistry.timer("rag.search.duration")
.record(searchMs, TimeUnit.MILLISECONDS);
// 检索文档数
meterRegistry.summary("rag.search.doc_count")
.record(docCount);
// 缓存命中率(自动由缓存组件记录)
}
public void recordEndToEnd(long totalMs, boolean fromCache) {
meterRegistry.timer("rag.total.duration")
.tag("cache", fromCache ? "hit" : "miss")
.record(totalMs, TimeUnit.MILLISECONDS);
}
}实际优化效果
经过这几个优化后,我们的压测结果:
| 优化措施 | P99延迟 | 最大QPS |
|---|---|---|
| 基础版本 | 4.2s | 200 |
| +查询缓存 | 2.1s | 800 |
| +请求合并 | 1.8s | 1200 |
| +连接池优化 | 1.5s | 1500 |
| +熔断降级 | 1.5s (stable) | 1500 |
最关键的一点是:缓存命中率要到60%以上,优化效果才明显。如果业务场景里每个用户的问题都是独一无二的,缓存帮不了多少,要从向量库本身的扩展去解决。
