第1883篇:向量数据库迁移实录——从Chroma迁到Milvus的完整经历
第1883篇:向量数据库迁移实录——从Chroma迁到Milvus的完整经历
迁移向量数据库这件事,在做之前感觉没什么,做完之后感觉自己老了三岁。
这篇文章把我们从Chroma迁移到Milvus的全过程写下来,包括踩过的坑、走过的弯路,以及一些到现在还觉得值得分享的技术细节。
为什么要迁移:Chroma的局限性
我们最开始选Chroma,理由很简单:快,轻量,文档清晰,本地跑一行命令就能起来。在原型阶段和早期生产阶段,Chroma表现得相当不错。
但随着数据量的增长和业务场景的扩展,Chroma开始暴露出一些让人头疼的问题:
问题一:多租户支持很弱。
我们的平台需要给不同的企业客户做数据隔离。Chroma的Collection隔离机制可以用,但跨Collection的权限管理、配额管理都需要自己做,而且性能在数据量大的时候有明显下降。
问题二:水平扩展能力不足。
Chroma的分布式模式相对简单,当单个Collection的数据量超过5000万向量之后,查询延迟开始明显升高。我们有几个客户的数据量已经接近这个阈值。
问题三:企业级特性缺失。
备份恢复、细粒度权限控制、审计日志——这些在企业销售场景里客户必问的功能,Chroma支持得都不够好。
问题四:社区和维护的不确定性。
这条有点主观,但在选型一个要长期使用的基础设施时,这很重要。Chroma是一家初创公司维护的,而Milvus背后是Zilliz,已经有了很多大型生产案例。
综合以上几点,我们决定迁往Milvus。
迁移方案设计:不能停机,不能丢数据
迁移要求只有两条,但每一条都很硬:
- 业务不停机
- 数据零丢失
这两条加在一起,决定了迁移方案不能是"倒库停服",必须是在线迁移。
我们设计的迁移方案分四个阶段:
阶段一:双写准备
这一步最关键的是"不破坏现有服务"的前提下,让新数据同时写入两个库。
我们用了一个抽象层来封装向量数据库的操作:
public interface VectorStoreClient {
void upsert(String collection, String id, float[] vector, Map<String, Object> metadata);
List<SearchResult> search(String collection, float[] queryVector, int topK, Map<String, Object> filters);
void delete(String collection, String id);
}
// Chroma实现
@Component("chromaVectorStore")
public class ChromaVectorStoreClient implements VectorStoreClient {
// ...Chroma的具体实现
}
// Milvus实现
@Component("milvusVectorStore")
public class MilvusVectorStoreClient implements VectorStoreClient {
// ...Milvus的具体实现
}
// 双写实现:写入时同时写两个库
@Primary
@Component("dualWriteVectorStore")
public class DualWriteVectorStoreClient implements VectorStoreClient {
@Qualifier("chromaVectorStore")
private final VectorStoreClient primaryStore;
@Qualifier("milvusVectorStore")
private final VectorStoreClient shadowStore;
@Value("${vector.migration.dual-write.enabled:false}")
private boolean dualWriteEnabled;
@Override
public void upsert(String collection, String id, float[] vector, Map<String, Object> metadata) {
// 主库写入必须成功
primaryStore.upsert(collection, id, vector, metadata);
// Shadow写入失败不影响主流程,但记录告警
if (dualWriteEnabled) {
try {
shadowStore.upsert(collection, id, vector, metadata);
} catch (Exception e) {
log.error("Shadow write to Milvus failed for id={}, collection={}", id, collection, e);
metrics.incrementCounter("milvus.shadow_write.failure");
}
}
}
@Override
public List<SearchResult> search(String collection, float[] queryVector, int topK, Map<String, Object> filters) {
// 读请求暂时只走主库
return primaryStore.search(collection, queryVector, topK, filters);
}
}这里有一个重要的设计决策:双写时Shadow写入失败不能阻塞主流程。Milvus是新引入的系统,稳定性还没有经过充分验证,如果让它的失败影响主业务,迁移风险会很高。
双写配置通过开关控制,可以随时开启或关闭,不需要重新部署。
阶段二:存量数据迁移——比想象中麻烦得多
双写打开之后,新增数据会自动同步到Milvus,但历史数据还在Chroma里。历史数据的迁移是整个过程中最麻烦的部分。
我们的存量数据大概是3.2亿条向量,分布在几百个Collection里。
麻烦点一:Chroma的导出接口不好用。
Chroma没有官方的批量导出工具,需要自己用Python写。而且Chroma的查询接口在大量数据时性能很差,用collection.get()批量拉取时,每批超过1万条就会很慢。
最终我们的迁移脚本用了一个折中方案:通过SQLite文件直接读取Chroma的底层存储(Chroma默认用SQLite存储元数据,向量本身存在Parquet文件里)。
import sqlite3
import numpy as np
import chromadb
def export_collection_to_parquet(collection_name: str, output_path: str):
"""直接读Chroma底层存储,绕过Chroma的查询接口"""
chroma_path = "/data/chroma"
# 1. 从SQLite读取ID和元数据
conn = sqlite3.connect(f"{chroma_path}/chroma.sqlite3")
cursor = conn.cursor()
# 查询指定collection的所有记录
cursor.execute("""
SELECT e.id, e.embedding_id, e.string_value as doc_id
FROM embeddings e
JOIN collections c ON e.collection_id = c.id
WHERE c.name = ?
""", (collection_name,))
records = cursor.fetchall()
# 2. 从Parquet文件读取向量数据
# Chroma把向量存在 {collection_id}/data_level0.bin
cursor.execute("SELECT id FROM collections WHERE name = ?", (collection_name,))
collection_id = cursor.fetchone()[0]
# 这里省略了二进制文件解析的细节...
# 实际上需要根据Chroma的存储格式解析.bin文件
conn.close()
print(f"Exported {len(records)} records from {collection_name}")说实话,直接读底层存储这个做法风险挺高的,因为Chroma的内部存储格式没有官方文档。我们做了充分的校验才敢用,后面会讲校验的方法。
麻烦点二:向量维度和数据类型必须完全一致。
我们的Embedding模型输出的是float32格式、1536维的向量。Chroma在存储时对精度做了一些处理,导入Milvus时需要确保格式匹配。
// 导入脚本的Java部分(用于写入Milvus)
public class MilvusBatchImporter {
public void importBatch(List<VectorRecord> records, String collectionName) {
List<String> ids = records.stream().map(VectorRecord::getId).collect(Collectors.toList());
// 注意:Milvus的向量必须是float列表,维度必须严格匹配Collection定义
List<List<Float>> vectors = records.stream()
.map(r -> toFloatList(r.getVector()))
.collect(Collectors.toList());
List<JSONObject> metadataList = records.stream()
.map(r -> JSONObject.parseObject(r.getMetadataJson()))
.collect(Collectors.toList());
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(collectionName)
.withFields(List.of(
new InsertParam.Field("id", ids),
new InsertParam.Field("vector", vectors),
new InsertParam.Field("metadata", metadataList)
))
.build();
R<MutationResult> result = milvusClient.insert(insertParam);
if (result.getStatus() != R.Status.Success.getCode()) {
throw new MilvusImportException(
"Batch insert failed: " + result.getMessage()
);
}
// 立即flush,确保数据持久化
milvusClient.flush(FlushParam.newBuilder()
.withCollectionNames(List.of(collectionName))
.build());
}
private List<Float> toFloatList(byte[] vectorBytes) {
// 将字节数组转为Float列表
FloatBuffer buffer = ByteBuffer.wrap(vectorBytes)
.order(ByteOrder.LITTLE_ENDIAN)
.asFloatBuffer();
List<Float> result = new ArrayList<>(buffer.capacity());
while (buffer.hasRemaining()) {
result.add(buffer.get());
}
return result;
}
}麻烦点三:迁移任务的断点续传。
3.2亿条数据,全量迁移大概要跑20多个小时。中间如果出了什么问题重头来过,不现实。所以迁移任务需要支持断点续传。
@Service
public class MigrationCheckpointService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String CHECKPOINT_PREFIX = "migration:checkpoint:";
/**
* 记录某个Collection的迁移进度
*/
public void saveCheckpoint(String collection, long lastMigratedOffset) {
redisTemplate.opsForValue().set(
CHECKPOINT_PREFIX + collection,
String.valueOf(lastMigratedOffset),
Duration.ofDays(7)
);
}
/**
* 获取断点位置,没有则从0开始
*/
public long getCheckpoint(String collection) {
String value = redisTemplate.opsForValue().get(CHECKPOINT_PREFIX + collection);
return value != null ? Long.parseLong(value) : 0L;
}
/**
* 迁移完成后清除checkpoint
*/
public void clearCheckpoint(String collection) {
redisTemplate.delete(CHECKPOINT_PREFIX + collection);
}
}
// 迁移主逻辑
public void migrateCollection(String collectionName) {
long offset = checkpointService.getCheckpoint(collectionName);
long totalCount = chromaClient.count(collectionName);
int batchSize = 10000;
log.info("Start migrating collection {}, total={}, resumeFrom={}",
collectionName, totalCount, offset);
while (offset < totalCount) {
List<VectorRecord> batch = chromaClient.getBatch(collectionName, offset, batchSize);
if (batch.isEmpty()) break;
milvusImporter.importBatch(batch, collectionName);
offset += batch.size();
// 每批完成后保存checkpoint
checkpointService.saveCheckpoint(collectionName, offset);
log.info("Collection {} progress: {}/{} ({:.1f}%)",
collectionName, offset, totalCount, (double) offset / totalCount * 100);
}
checkpointService.clearCheckpoint(collectionName);
log.info("Collection {} migration complete", collectionName);
}阶段三:双读验证——建立信任的关键步骤
数据迁移完成之后,不能直接切流量,要先做双读验证。
双读验证的目的是:用同样的查询分别打Chroma和Milvus,对比结果,确认Milvus的结果可信。
@Override
public List<SearchResult> search(String collection, float[] queryVector, int topK, Map<String, Object> filters) {
if (!dualReadEnabled) {
return primaryStore.search(collection, queryVector, topK, filters);
}
// 同时查两个库
CompletableFuture<List<SearchResult>> primaryFuture = CompletableFuture.supplyAsync(
() -> primaryStore.search(collection, queryVector, topK, filters)
);
CompletableFuture<List<SearchResult>> shadowFuture = CompletableFuture.supplyAsync(
() -> shadowStore.search(collection, queryVector, topK, filters)
);
List<SearchResult> primaryResults = primaryFuture.join();
// Shadow的结果只用于比较,不影响返回给用户的内容
shadowFuture.thenAccept(shadowResults -> {
compareAndLog(collection, primaryResults, shadowResults, topK);
});
return primaryResults; // 依然返回Chroma的结果
}
private void compareAndLog(String collection, List<SearchResult> chroma,
List<SearchResult> milvus, int topK) {
// 计算Top-K重叠率
Set<String> chromaIds = chroma.stream().map(r -> r.getId()).collect(Collectors.toSet());
Set<String> milvusIds = milvus.stream().map(r -> r.getId()).collect(Collectors.toSet());
Set<String> intersection = new HashSet<>(chromaIds);
intersection.retainAll(milvusIds);
double overlapRate = (double) intersection.size() / Math.max(chromaIds.size(), 1);
metrics.recordHistogram("migration.topk_overlap_rate", overlapRate,
Tags.of("collection", collection));
// 重叠率低于80%时告警
if (overlapRate < 0.8) {
log.warn("Low overlap rate for collection {}: {:.1f}% (chroma={} vs milvus={})",
collection, overlapRate * 100, chromaIds, milvusIds);
}
}双读跑了三天,我们持续观察各Collection的Top-K重叠率。大部分Collection稳定在92%-98%之间,这个偏差是可以接受的(向量相似度搜索本来就不是精确匹配,微小差异是正常的)。
有两个Collection的重叠率持续偏低(在65%-70%之间),排查后发现是这两个Collection在迁移时有数据损坏,需要重新迁移。
阶段四:流量切换——最紧张的时刻
流量切换分三步走:先切10%,观察24小时;再切50%,再观察24小时;最后全量。
每一步都有回滚预案:只需要把配置里的Milvus流量比例调回0,读流量就会立即全部回到Chroma。
实际切换过程出了一个小插曲:当流量切到50%时,Milvus的P99延迟从120ms升高到了380ms,超过了我们设定的阈值。
排查下来,原因是Milvus的索引参数配置不当。我们用的是HNSW索引,ef参数(查询时探索范围)默认值是64,这在小数据量时没问题,但数据量大了之后,这个参数需要调整。
// 查询时调整HNSW的ef参数
SearchParam searchParam = SearchParam.newBuilder()
.withCollectionName(collectionName)
.withVectors(Collections.singletonList(queryVector))
.withTopK(topK)
.withMetricType(MetricType.IP)
// 关键:HNSW查询时的ef参数,影响精度和速度的平衡
// ef越大精度越高但越慢,通常设为topK的2-4倍
.withParams("{\"ef\": " + Math.max(topK * 2, 64) + "}")
.build();同时,我们发现Milvus的内存配置也需要调整。默认情况下,Milvus会把部分索引数据放在磁盘上,查询时会触发磁盘IO。把常用Collection的索引固定在内存中之后,延迟显著下降。
# Milvus的queryNode配置
queryNode:
cache:
memoryLimit: 32768 # MB,根据服务器内存调整
# 把热点Collection的索引预加载到内存
gracefulTime: 5000调整之后P99延迟回到了130ms,然后继续推进流量切换。全量切换完成后,保持双写状态又跑了一周,确认没有问题后,才关闭了Chroma的Shadow写入。
迁移完成后的对比数据
经过完整的迁移和一段时间的生产运行,我来对比一下两个系统的实际表现。
| 指标 | Chroma | Milvus |
|---|---|---|
| 单Collection最大向量数 | ~5000万(开始劣化) | 经过测试超过3亿 |
| P50查询延迟(1000万向量) | 85ms | 35ms |
| P99查询延迟(1000万向量) | 320ms | 130ms |
| 多租户隔离 | Collection级别,手动管理 | 内置Partition和RBAC |
| 水平扩展 | 需要业务层分片 | 原生支持分布式扩展 |
| 备份能力 | 手动备份Parquet文件 | 内置备份恢复工具 |
从结果来看,迁移是值得的。但代价是两个月的工程投入,以及无数个手动对比结果的焦虑夜晚。
如果重来一次,我会做什么不一样
一、更早建立抽象层。
我们当初选Chroma时,直接用的是Chroma的Python客户端,没有做任何抽象封装。后来为了实现双写,花了很多时间在存量代码改造上。如果一开始就有VectorStoreClient这样的抽象层,迁移会容易得多。
二、迁移工具优先于迁移数据。
我们花了大量时间在"怎么把数据从Chroma导出来"上。正确的顺序应该是:先把迁移工具做完、测好、跑通,再开始大规模迁移。我们当时是边做工具边迁数据,结果返工了两次。
三、双读验证的时间要更长。
我们只做了三天双读验证,如果时间更长,也许能发现更多问题。向量搜索的正确性很难用简单指标衡量,重叠率只是一个近似指标,三天的样本量在某些低频场景上可能不够。
向量数据库迁移不是普通的数据库迁移,它涉及的不只是数据本身,还有索引参数、精度设置、查询策略。做之前要做好心理准备:这件事比看起来复杂两倍。
