第2382篇:RAG系统的冗余与容灾——向量数据库的高可用设计
大约 6 分钟
第2382篇:RAG系统的冗余与容灾——向量数据库的高可用设计
适读人群:负责RAG系统稳定性的架构师和工程师 | 阅读时长:约18分钟 | 核心价值:掌握向量数据库的高可用架构设计,从单点到多副本的工程方案
去年有次深夜电话,我们的Milvus集群主节点磁盘满了,整个向量检索服务停了。
RAG系统检索不了,下游的AI客服全部退化到"很抱歉,我现在无法回答您的问题"。客服系统停了45分钟,那一晚的负面反馈是平时的10倍。
问题根源是:我们把向量数据库当成了"不太重要"的组件,没有做高可用设计。实际上对于RAG系统,向量数据库的重要性和MySQL一样,必须认真对待可用性。
向量数据库的高可用挑战
向量数据库的高可用比关系型数据库更复杂:
/**
* 向量数据库高可用的特殊挑战
*
* 挑战1:数据规模大
* 向量库经常有几亿条向量,每个向量512-1536维
* 全量复制的时间长,网络带宽消耗大
*
* 挑战2:内存密集
* 向量索引(HNSW、IVF等)需要大量内存
* 节点切换时,新节点索引预热需要时间
*
* 挑战3:一致性要求不同
* 向量数据库一般不要求强一致性(不是金融数据)
* 可以接受短暂的读不到最新写入的情况(最终一致性)
* 这让高可用设计有更多灵活空间
*
* 挑战4:查询特性
* 向量相似度查询是计算密集型操作
* 节点负载不均会导致性能差异大
*/架构一:主从复制 + 读写分离
@Configuration
public class VectorStoreHAConfig {
/**
* 高可用向量库配置
* 主节点处理写入,从节点处理读取
*/
@Bean
public HAVectorStore haVectorStore() {
// 主节点(写入)
VectorStoreNode primary = VectorStoreNode.builder()
.host("vector-primary.internal")
.port(19530)
.role(NodeRole.PRIMARY)
.build();
// 从节点(读取,多个)
List<VectorStoreNode> replicas = List.of(
VectorStoreNode.builder()
.host("vector-replica-1.internal")
.port(19530)
.role(NodeRole.REPLICA)
.build(),
VectorStoreNode.builder()
.host("vector-replica-2.internal")
.port(19530)
.role(NodeRole.REPLICA)
.build()
);
return HAVectorStore.builder()
.primary(primary)
.replicas(replicas)
.readStrategy(ReadStrategy.ROUND_ROBIN) // 读请求轮询到从节点
.writeTimeout(Duration.ofSeconds(5))
.readTimeout(Duration.ofSeconds(2))
.build();
}
}
@Service
public class HAVectorStoreClient {
private final HAVectorStore haConfig;
private final CircuitBreaker primaryCircuitBreaker;
private final Map<String, CircuitBreaker> replicaCircuitBreakers;
/**
* 写入:写主节点
*/
public void add(List<Document> documents) {
try {
primaryCircuitBreaker.executeSupplier(() -> {
getPrimaryConnection().insert(documents);
return null;
});
} catch (CallNotPermittedException e) {
throw new VectorStoreUnavailableException("Primary node unavailable");
}
}
/**
* 读取:读从节点,从节点全挂了才读主节点
*/
public List<Document> similaritySearch(SearchRequest request) {
// 尝试可用的从节点
for (VectorStoreNode replica : getAvailableReplicas()) {
CircuitBreaker cb = replicaCircuitBreakers.get(replica.getHost());
try {
return cb.executeSupplier(() ->
getReplicaConnection(replica).search(request)
);
} catch (CallNotPermittedException | Exception e) {
log.warn("Replica {} unavailable, trying next", replica.getHost());
}
}
// 所有从节点不可用,降级到主节点
log.warn("All replicas unavailable, falling back to primary for read");
return getPrimaryConnection().search(request);
}
private List<VectorStoreNode> getAvailableReplicas() {
return haConfig.getReplicas().stream()
.filter(r -> !replicaCircuitBreakers.get(r.getHost()).getState()
.equals(CircuitBreaker.State.OPEN))
.collect(Collectors.toList());
}
}架构二:多活集群 + 路由
/**
* 更高可用性的方案:多个完全独立的集群
* 每个集群都有完整的数据,正常情况下各自承担一部分流量
* 任何一个集群故障,流量全部切到其他集群
*/
@Service
public class MultiClusterVectorStore {
private final List<VectorStoreCluster> clusters;
private final LoadBalancer loadBalancer;
private final ClusterHealthChecker healthChecker;
/**
* 写入:同步写到所有集群(保证数据一致)
* 注意:这会增加写入延迟,但保证了数据完整性
*/
public void add(List<Document> documents) {
List<CompletableFuture<Void>> futures = clusters.stream()
.filter(c -> healthChecker.isHealthy(c))
.map(cluster -> CompletableFuture.runAsync(() ->
cluster.insert(documents)
))
.collect(Collectors.toList());
// 等待所有集群写入完成(也可以只等多数,根据业务需求)
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.orTimeout(10, TimeUnit.SECONDS)
.join();
}
/**
* 读取:根据负载均衡策略选择集群
* 某个集群不可用时,自动切换到其他集群
*/
public List<Document> similaritySearch(SearchRequest request) {
List<VectorStoreCluster> healthyClusters = clusters.stream()
.filter(c -> healthChecker.isHealthy(c))
.collect(Collectors.toList());
if (healthyClusters.isEmpty()) {
throw new AllClustersUnavailableException("No healthy vector store clusters");
}
// 选择一个集群
VectorStoreCluster selected = loadBalancer.select(healthyClusters);
try {
return selected.search(request);
} catch (Exception e) {
// 当前集群失败,从列表移除并重试
healthyClusters.remove(selected);
healthChecker.markUnhealthy(selected);
if (!healthyClusters.isEmpty()) {
VectorStoreCluster fallback = healthyClusters.get(0);
return fallback.search(request);
}
throw e;
}
}
}健康检查和自动故障转移
@Service
public class ClusterHealthChecker {
private final Map<String, ClusterHealth> healthStatus = new ConcurrentHashMap<>();
/**
* 定期健康检查
*/
@Scheduled(fixedRate = 10000) // 每10秒检查一次
public void checkAllClusters() {
for (VectorStoreCluster cluster : getAllClusters()) {
ClusterHealth health = checkCluster(cluster);
ClusterHealth previousHealth = healthStatus.put(cluster.getId(), health);
// 状态变化时发出告警
if (previousHealth != null && previousHealth.isHealthy() && !health.isHealthy()) {
alertService.sendAlert(
AlertLevel.CRITICAL,
"Vector store cluster DOWN: " + cluster.getId() +
"\nReason: " + health.getFailureReason()
);
} else if (previousHealth != null && !previousHealth.isHealthy() && health.isHealthy()) {
alertService.sendAlert(
AlertLevel.INFO,
"Vector store cluster RECOVERED: " + cluster.getId()
);
}
}
}
private ClusterHealth checkCluster(VectorStoreCluster cluster) {
long startTime = System.currentTimeMillis();
try {
// 发送一个简单的探测查询
cluster.ping();
// 检查延迟
long latency = System.currentTimeMillis() - startTime;
boolean latencyOk = latency < 500; // 500ms以内认为正常
return ClusterHealth.builder()
.clusterId(cluster.getId())
.healthy(latencyOk)
.latencyMs(latency)
.checkedAt(LocalDateTime.now())
.failureReason(latencyOk ? null : "Latency too high: " + latency + "ms")
.build();
} catch (Exception e) {
return ClusterHealth.builder()
.clusterId(cluster.getId())
.healthy(false)
.checkedAt(LocalDateTime.now())
.failureReason(e.getMessage())
.build();
}
}
}灾难恢复:数据备份和恢复
@Service
public class VectorStoreBackupService {
/**
* 定期备份向量库元数据
*
* 注意:向量本身可以从原始文档重新生成,
* 所以备份的重点是:
* 1. 文档原始内容
* 2. 文档元数据(权限、版本等)
* 3. 向量索引配置(用于重建时保持一致性)
*/
@Scheduled(cron = "0 0 2 * * *") // 每天凌晨2点
public void dailyBackup() {
String backupId = "backup-" + LocalDate.now();
try {
// 1. 导出所有文档(内容+元数据)
List<DocumentBackupRecord> records = exportAllDocuments();
// 2. 序列化并上传到对象存储
String backupData = objectMapper.writeValueAsString(records);
objectStorageClient.upload(
"vector-store-backups/" + backupId + ".json",
backupData.getBytes(StandardCharsets.UTF_8)
);
// 3. 记录备份元信息
BackupRecord backup = BackupRecord.builder()
.id(backupId)
.documentCount(records.size())
.createdAt(LocalDateTime.now())
.storagePath("vector-store-backups/" + backupId + ".json")
.status(BackupStatus.SUCCESS)
.build();
backupRepository.save(backup);
log.info("Backup completed: {} with {} documents", backupId, records.size());
} catch (Exception e) {
log.error("Backup failed: {}", backupId, e);
alertService.sendAlert(AlertLevel.HIGH, "Vector store backup FAILED: " + e.getMessage());
}
}
/**
* 从备份恢复知识库
*
* 场景:灾难性数据丢失后,从最近的备份重建
* 这个操作比较耗时(需要重新向量化所有文档)
*/
public RestoreResult restoreFromBackup(String backupId) {
BackupRecord backup = backupRepository.findById(backupId)
.orElseThrow(() -> new BackupNotFoundException(backupId));
try {
// 下载备份数据
byte[] backupData = objectStorageClient.download(backup.getStoragePath());
List<DocumentBackupRecord> records = objectMapper.readValue(
backupData,
new TypeReference<List<DocumentBackupRecord>>() {}
);
int restored = 0;
int failed = 0;
for (DocumentBackupRecord record : records) {
try {
// 重新向量化并写入
List<Document> chunks = documentSplitter.split(record.getContent());
vectorStore.add(chunks);
restored++;
} catch (Exception e) {
log.error("Failed to restore document: {}", record.getDocId(), e);
failed++;
}
}
return RestoreResult.builder()
.backupId(backupId)
.totalDocuments(records.size())
.restoredDocuments(restored)
.failedDocuments(failed)
.build();
} catch (Exception e) {
throw new RestoreFailedException("Restore from backup " + backupId + " failed", e);
}
}
}容量规划和扩容
@Service
public class VectorStoreCapacityPlanner {
/**
* 容量监控和预警
* 防止"磁盘满了才发现"的情况重演
*/
@Scheduled(cron = "0 */30 * * * *") // 每30分钟检查一次
public void checkCapacity() {
for (VectorStoreCluster cluster : getAllClusters()) {
ClusterCapacity capacity = cluster.getCapacityInfo();
double diskUsageRatio = (double) capacity.getDiskUsed() / capacity.getDiskTotal();
double memoryUsageRatio = (double) capacity.getMemoryUsed() / capacity.getMemoryTotal();
// 磁盘使用率超过70%,预警
if (diskUsageRatio > 0.7) {
alertService.sendAlert(
diskUsageRatio > 0.85 ? AlertLevel.CRITICAL : AlertLevel.HIGH,
String.format("Vector store disk usage: %.1f%% on cluster %s",
diskUsageRatio * 100, cluster.getId())
);
}
// 内存使用率超过80%,预警
if (memoryUsageRatio > 0.8) {
alertService.sendAlert(
AlertLevel.HIGH,
String.format("Vector store memory usage: %.1f%% on cluster %s",
memoryUsageRatio * 100, cluster.getId())
);
}
}
}
}向量数据库的高可用是一个容易被低估的工程问题。在系统设计阶段,花一周时间把高可用架构设计好,远比在生产故障时临时救火要值得得多。至少做到:有备份、有健康检查、有告警。
