AI 系统的数据备份——向量数据库怎么备份
AI 系统的数据备份——向量数据库怎么备份
我之前有一个误区:向量数据库只是个索引,原始数据还在关系型数据库里,大不了重建就好了。
直到有一次,我们的 Milvus 集群出了问题,两个分区的数据损坏了,需要从头重建向量。我算了一下:15 万条知识库文档,每条平均需要调用一次 Embedding API,按当时的速度大概需要 8 小时,费用大约 40 美元。
8 小时这段时间里,RAG 系统的检索能力降低了 30%,相关业务不可用。
从那之后,我重新认真对待向量数据库的备份设计。
一、向量数据库备份和关系型数据库备份的差异
先搞清楚差异在哪里,才能设计合适的备份策略。
关系型数据库的数据是「原始的」——你存的是文本、数字、JSON,备份就是把这些原始数据写出来,恢复就是把原始数据读进去。数据是「可移植」的,MySQL 的备份可以在同版本的任何 MySQL 上恢复。
向量数据库存的是「派生数据」——向量是原始文本经过 Embedding 模型计算出来的。这里有一个关键约束:同一段文本,用不同的 Embedding 模型(甚至同一模型的不同版本)计算出的向量是不可互换的。
这意味着:
- 如果只备份向量数据,换了 Embedding 模型之后备份就没法用
- 如果只备份原始文档,恢复时需要重新 Embedding,耗时耗钱
- 理想的备份策略是:原始文档 + 向量同时备份,互为冗余
另一个差异是索引结构。向量数据库的索引(HNSW、IVF_FLAT 等)是专门构建的,不像 B-Tree 索引那样可以快速重建。大型向量库的索引重建可能需要几十分钟到几个小时。
二、Milvus 和 Qdrant 的备份策略对比
2.1 Milvus 的备份方式
方式一:Snapshot
Milvus 2.x 提供了集合级别的快照(Backup)功能:
# 使用 milvus-backup 工具
# 安装
wget https://github.com/zilliztech/milvus-backup/releases/latest/download/milvus-backup-linux-amd64
chmod +x milvus-backup-linux-amd64
mv milvus-backup-linux-amd64 /usr/local/bin/milvus-backup
# 备份指定集合
milvus-backup create \
--config backup.yaml \
--name "kb_backup_20241215" \
--colls "customer_service_kb,product_kb"
# 备份配置文件 backup.yaml
# milvus:
# address: milvus-endpoint:19530
# minio:
# address: minio-endpoint:9000
# accessKeyID: minio_user
# secretAccessKey: minio_password
# bucketName: milvus-backup方式二:MinIO 存储层备份
Milvus 的数据实际存储在 MinIO(或 S3)上。直接备份 MinIO 的 Bucket 也是可行的,但这种方式依赖 Milvus 的内部存储格式,版本依赖性强,不推荐作为主要备份手段。
方式三:数据导出(向量 + 元数据)
最灵活但最慢的方式,适合跨版本迁移:
// Java 导出示例
MilvusServiceClient milvusClient = new MilvusServiceClient(
ConnectParam.newBuilder()
.withHost("milvus-host")
.withPort(19530)
.build()
);
// 分批导出(避免一次性加载太多数据到内存)
public void exportCollection(String collectionName, String outputPath) throws Exception {
long totalCount = getTotalCount(collectionName);
int batchSize = 1000;
int offset = 0;
try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(outputPath))) {
while (offset < totalCount) {
QueryResultsWrapper results = queryBatch(collectionName, offset, batchSize);
for (QueryResultsWrapper.RowRecord record : results.getRowRecords()) {
Map<String, Object> row = new LinkedHashMap<>();
row.put("id", record.get("id"));
row.put("content", record.get("content")); // 原始文本
row.put("embedding", record.get("embedding")); // 向量
row.put("metadata", record.get("metadata"));
writer.write(objectMapper.writeValueAsString(row));
writer.newLine();
}
offset += batchSize;
log.info("Exported {}/{} records", Math.min(offset, totalCount), totalCount);
}
}
}2.2 Qdrant 的备份方式
Qdrant 提供了更简洁的快照 API:
# 创建集合快照
curl -X POST "http://qdrant-host:6333/collections/customer_service_kb/snapshots"
# 响应示例
{
"result": {
"name": "customer_service_kb-2024-12-15-14-30-00-snapshot",
"creation_time": "2024-12-15T14:30:00.000Z",
"size": 156789012
},
"status": "ok",
"time": 0.567
}
# 下载快照
curl -X GET "http://qdrant-host:6333/collections/customer_service_kb/snapshots/customer_service_kb-2024-12-15-14-30-00-snapshot" \
--output backup.snapshot
# 从快照恢复(到新集合)
curl -X POST "http://qdrant-host:6333/collections/customer_service_kb_restored/snapshots/upload" \
-H "Content-Type: multipart/form-data" \
-F "snapshot=@backup.snapshot"三、双备份策略:向量 + 原始文档
这是我目前认为最稳的备份策略:原始文档和向量同时备份,恢复时可以选择:
- 快速路径:直接从向量备份恢复,不需要重新 Embedding(前提是 Embedding 模型没有变)
- 兜底路径:从原始文档重新构建,比较慢但一定能成
四、自动化备份脚本(Java 实现)
4.1 备份任务调度器
@Component
@Slf4j
public class VectorDatabaseBackupScheduler {
@Autowired
private QdrantBackupService qdrantBackupService;
@Autowired
private DocumentBackupService documentBackupService;
@Autowired
private OssStorageService ossStorageService;
@Autowired
private BackupMetaRepository backupMetaRepo;
// 每天凌晨 2 点执行完整备份
@Scheduled(cron = "0 0 2 * * *")
public void scheduledFullBackup() {
String backupId = "backup-" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
log.info("Starting scheduled full backup: {}", backupId);
try {
executeFullBackup(backupId);
} catch (Exception e) {
log.error("Scheduled backup failed: {}", backupId, e);
notifyBackupFailure(backupId, e);
}
}
public BackupResult executeFullBackup(String backupId) {
BackupResult result = BackupResult.builder()
.backupId(backupId)
.startTime(LocalDateTime.now())
.build();
try {
// 第一步:备份向量数据库快照
log.info("[{}] Step 1: Creating vector DB snapshot", backupId);
List<SnapshotInfo> snapshots = qdrantBackupService.createSnapshots(
List.of("customer_service_kb", "product_kb", "faq_kb")
);
result.setVectorSnapshotCount(snapshots.size());
// 第二步:上传向量快照到 OSS
log.info("[{}] Step 2: Uploading vector snapshots to OSS", backupId);
for (SnapshotInfo snapshot : snapshots) {
String ossPath = String.format("backups/%s/vectors/%s/%s",
backupId, snapshot.getCollectionName(), snapshot.getSnapshotName());
ossStorageService.upload(snapshot.getLocalPath(), ossPath);
log.info("[{}] Uploaded snapshot: {} -> {}", backupId, snapshot.getSnapshotName(), ossPath);
}
// 第三步:备份原始文档(增量)
log.info("[{}] Step 3: Backing up source documents (incremental)", backupId);
IncrementalBackupResult docBackup = documentBackupService.incrementalBackup(backupId);
result.setDocumentCount(docBackup.getDocumentCount());
// 第四步:保存备份元数据
BackupMeta meta = BackupMeta.builder()
.backupId(backupId)
.type(BackupType.FULL)
.status(BackupStatus.SUCCESS)
.vectorSnapshotCount(result.getVectorSnapshotCount())
.documentCount(result.getDocumentCount())
.completedAt(LocalDateTime.now())
.embeddingModel(getCurrentEmbeddingModel()) // 记录当时使用的 Embedding 模型版本
.build();
backupMetaRepo.save(meta);
result.setStatus("SUCCESS");
result.setEndTime(LocalDateTime.now());
log.info("[{}] Full backup completed successfully: {} snapshots, {} documents",
backupId, result.getVectorSnapshotCount(), result.getDocumentCount());
return result;
} catch (Exception e) {
result.setStatus("FAILED");
result.setError(e.getMessage());
result.setEndTime(LocalDateTime.now());
// 更新元数据为失败状态
backupMetaRepo.save(BackupMeta.builder()
.backupId(backupId)
.status(BackupStatus.FAILED)
.errorMessage(e.getMessage())
.build());
throw new RuntimeException("Backup failed: " + backupId, e);
}
}
}4.2 Qdrant 快照服务
@Service
@Slf4j
public class QdrantBackupService {
@Value("${qdrant.host}")
private String qdrantHost;
@Value("${qdrant.port:6333}")
private int qdrantPort;
@Autowired
private RestTemplate restTemplate;
/**
* 为多个集合创建快照
*/
public List<SnapshotInfo> createSnapshots(List<String> collectionNames) {
List<SnapshotInfo> results = new ArrayList<>();
for (String collectionName : collectionNames) {
try {
SnapshotInfo info = createSnapshot(collectionName);
results.add(info);
} catch (Exception e) {
log.error("Failed to create snapshot for collection {}: {}", collectionName, e.getMessage());
throw new RuntimeException("Snapshot creation failed for: " + collectionName, e);
}
}
return results;
}
private SnapshotInfo createSnapshot(String collectionName) {
String url = String.format("http://%s:%d/collections/%s/snapshots",
qdrantHost, qdrantPort, collectionName);
ResponseEntity<Map> response = restTemplate.postForEntity(url, null, Map.class);
if (!response.getStatusCode().is2xxSuccessful()) {
throw new RuntimeException("Failed to create snapshot: " + response.getStatusCode());
}
Map<String, Object> result = (Map<String, Object>) response.getBody().get("result");
String snapshotName = (String) result.get("name");
// 下载快照到本地临时目录
String localPath = downloadSnapshot(collectionName, snapshotName);
return SnapshotInfo.builder()
.collectionName(collectionName)
.snapshotName(snapshotName)
.localPath(localPath)
.size((Long) result.get("size"))
.build();
}
private String downloadSnapshot(String collectionName, String snapshotName) {
String downloadUrl = String.format("http://%s:%d/collections/%s/snapshots/%s",
qdrantHost, qdrantPort, collectionName, snapshotName);
String localPath = "/tmp/qdrant-snapshots/" + collectionName + "/" + snapshotName;
// 创建目录
new File("/tmp/qdrant-snapshots/" + collectionName).mkdirs();
// 流式下载(避免大文件 OOM)
restTemplate.execute(downloadUrl, HttpMethod.GET, null, response -> {
try (InputStream inputStream = response.getBody();
FileOutputStream outputStream = new FileOutputStream(localPath)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
}
return null;
});
log.info("Snapshot downloaded: {} -> {}", snapshotName, localPath);
return localPath;
}
/**
* 从快照恢复集合
*/
public void restoreFromSnapshot(String targetCollection, String snapshotLocalPath) {
String url = String.format("http://%s:%d/collections/%s/snapshots/upload",
qdrantHost, qdrantPort, targetCollection);
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("snapshot", new FileSystemResource(snapshotLocalPath));
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
ResponseEntity<Map> response = restTemplate.postForEntity(url, requestEntity, Map.class);
if (!response.getStatusCode().is2xxSuccessful()) {
throw new RuntimeException("Failed to restore snapshot: " + response.getStatusCode());
}
log.info("Collection {} restored from snapshot {}", targetCollection, snapshotLocalPath);
}
}4.3 原始文档增量备份
@Service
@Slf4j
public class DocumentBackupService {
@Autowired
private KnowledgeDocumentRepository documentRepo;
@Autowired
private OssStorageService ossStorageService;
@Autowired
private BackupMetaRepository backupMetaRepo;
/**
* 增量备份:只备份上次备份之后新增/修改的文档
*/
public IncrementalBackupResult incrementalBackup(String backupId) throws IOException {
// 找到上次成功备份的时间
Optional<BackupMeta> lastBackup = backupMetaRepo.findLastSuccessful();
LocalDateTime since = lastBackup
.map(BackupMeta::getCompletedAt)
.orElse(LocalDateTime.of(2000, 1, 1, 0, 0)); // 没有历史备份则全量
log.info("Incremental backup since: {}", since);
// 查询变更文档
List<KnowledgeDocument> changedDocs = documentRepo.findByUpdatedAtAfter(since);
log.info("Found {} changed documents to backup", changedDocs.size());
if (changedDocs.isEmpty()) {
return IncrementalBackupResult.builder()
.backupId(backupId)
.documentCount(0)
.build();
}
// 按批次写入 JSONL 文件并上传
String ossPath = String.format("backups/%s/documents/incremental.jsonl", backupId);
Path tempFile = Files.createTempFile("doc-backup-", ".jsonl");
try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
for (KnowledgeDocument doc : changedDocs) {
Map<String, Object> row = new LinkedHashMap<>();
row.put("id", doc.getId());
row.put("kb_name", doc.getKbName());
row.put("content", doc.getContent());
row.put("metadata", doc.getMetadata());
row.put("created_at", doc.getCreatedAt().toString());
row.put("updated_at", doc.getUpdatedAt().toString());
row.put("is_deleted", doc.isDeleted());
writer.write(objectMapper.writeValueAsString(row));
writer.newLine();
}
}
ossStorageService.upload(tempFile.toString(), ossPath);
Files.deleteIfExists(tempFile);
log.info("Document backup completed: {} docs -> {}", changedDocs.size(), ossPath);
return IncrementalBackupResult.builder()
.backupId(backupId)
.documentCount(changedDocs.size())
.ossPath(ossPath)
.build();
}
}五、RTO/RPO 实际规划
RPO(Recovery Point Objective):最多能接受丢失多少数据?
对于 AI 知识库:
- 核心知识库(产品说明、规则文档):RPO ≤ 24 小时(每天全量备份)
- 用户上传的文档:RPO ≤ 1 小时(每小时增量备份)
- 对话历史(向量形式):RPO ≤ 24 小时,如果丢失可以从关系型数据库重建
RTO(Recovery Time Objective):出了问题最多允许多久恢复?
| 故障类型 | 恢复方式 | 预期 RTO |
|---|---|---|
| 单个集合数据损坏 | 从向量快照恢复 | 15-30 分钟 |
| Embedding 模型变更后需重建 | 从原始文档重建 | 2-8 小时(取决于文档量) |
| 整个向量数据库宕机 | 从快照重建 + 重新索引 | 1-3 小时 |
| 原始文档和向量都丢失 | 从 OSS 备份恢复 | 4-12 小时 |
要达到这个 RTO,关键是:
- 备份在 OSS 上,恢复时能快速下载
- 恢复流程文档化,并定期演练(至少每季度一次)
- 恢复脚本是自动化的,不需要人工逐步操作
恢复演练脚本
@Service
@Slf4j
public class BackupRestorationDrillService {
@Autowired
private QdrantBackupService qdrantBackupService;
@Autowired
private OssStorageService ossStorageService;
/**
* 恢复演练:从指定备份恢复到测试集合,验证数据完整性
*/
public DrillResult runRestorationDrill(String backupId, String testCollectionSuffix) {
DrillResult result = DrillResult.builder()
.backupId(backupId)
.startTime(LocalDateTime.now())
.build();
try {
// 下载快照
String ossPath = String.format("backups/%s/vectors/customer_service_kb", backupId);
String localPath = ossStorageService.download(ossPath, "/tmp/drill-restore/");
// 恢复到测试集合(名称加后缀,避免影响生产)
String testCollection = "customer_service_kb_drill_" + testCollectionSuffix;
qdrantBackupService.restoreFromSnapshot(testCollection, localPath);
// 验证:随机抽样几条记录,检查向量维度和元数据完整性
boolean valid = validateRestoredCollection(testCollection);
result.setSuccess(valid);
result.setEndTime(LocalDateTime.now());
long durationMinutes = Duration.between(result.getStartTime(), result.getEndTime()).toMinutes();
log.info("Restoration drill completed: backupId={}, valid={}, duration={}min",
backupId, valid, durationMinutes);
// 清理测试集合
qdrantBackupService.deleteCollection(testCollection);
return result;
} catch (Exception e) {
result.setSuccess(false);
result.setError(e.getMessage());
result.setEndTime(LocalDateTime.now());
log.error("Restoration drill failed: {}", e.getMessage(), e);
return result;
}
}
private boolean validateRestoredCollection(String collectionName) {
// 执行几条测试查询,验证能正常检索
List<String> testQueries = List.of(
"产品退换货政策",
"配送时间说明",
"会员积分规则"
);
for (String query : testQueries) {
List<SearchResult> results = qdrantClient.search(collectionName, query, 3);
if (results.isEmpty()) {
log.warn("Validation failed: no results for query '{}'", query);
return false;
}
}
return true;
}
}总结
向量数据库备份的核心要点:
- 双备份原则:原始文档 + 向量快照同时备份,两者互为冗余
- 记录 Embedding 模型版本:备份元数据里必须记录当时用的是哪个 Embedding 模型,否则恢复时不知道能不能直接用向量备份
- 定期恢复演练:备份没有经过验证的恢复,等于没有备份
- RTO/RPO 要对齐业务期望:核心知识库和用户上传内容的备份频率应该不一样
向量数据库的数据是你的 AI 应用的「记忆」,丢了就要花时间和钱重建,做好备份是最划算的保险。
