第2111篇:向量数据库选型与迁移——从原型到生产的完整指南
大约 11 分钟
第2111篇:向量数据库选型与迁移——从原型到生产的完整指南
适读人群:构建RAG系统和向量搜索应用的工程师 | 阅读时长:约20分钟 | 核心价值:系统性对比主流向量数据库的适用场景,掌握数据迁移和平滑切换的工程方案
换向量数据库是一件非常痛苦的事情——除非你事先想清楚了。
我有一个朋友,在一个RAG项目里最开始用Chroma(因为上手快),数据量涨到500万向量之后性能不行了,想换Milvus,结果发现数据迁移方案没有,代码耦合很深,最终花了整整两周做迁移,期间服务中断了好几次。
这篇文章分两个部分:一是帮你在开始选好合适的向量数据库,二是如果你已经在用某个数据库了,如何平稳迁移。
向量数据库的核心能力对比
/**
* 主流向量数据库对比
*
* 从实际工程角度出发,关注最重要的几个维度
*
* ┌──────────────┬────────┬────────┬─────────┬──────────┬──────────┐
* │ │ Chroma │ Milvus │ Qdrant │ Weaviate │ pgvector │
* ├──────────────┼────────┼────────┼─────────┼──────────┼──────────┤
* │ 部署复杂度 │ 极低 │ 中 │ 低 │ 中 │ 低 │
* │ 扩展性 │ 低 │ 高 │ 中高 │ 中高 │ 中 │
* │ 元数据过滤 │ 基本 │ 强 │ 强 │ 强 │ SQL全能 │
* │ 持久化 │ 文件 │ S3/分布式│ 文件 │ 内置 │ PostgreSQL│
* │ 最大推荐规模 │ <100万 │ >亿 │ <1亿 │ <1亿 │ <5000万 │
* │ 生产成熟度 │ 低 │ 高 │ 中 │ 中 │ 高 │
* │ 托管服务 │ 无 │ Zilliz │ Cloud │ WCS │ Supabase │
* └──────────────┴────────┴────────┴─────────┴──────────┴──────────┘
*
* 推荐策略:
* - 原型/开发阶段:Chroma(零配置)
* - 中小规模生产(<1000万向量):Qdrant 或 pgvector
* - 大规模生产(>1000万向量):Milvus
* - 数据库统一管理偏好:pgvector(利用现有PostgreSQL)
*/向量存储抽象层设计
/**
* 向量存储抽象接口
*
* 这是最重要的架构决策:
* 不要在业务代码里直接依赖具体的向量数据库SDK
* 通过接口隔离,可以随时换底层实现
*
* 如果你从项目第一天就做好这个抽象,
* 迁移向量数据库就只需要换一个实现类
*/
public interface VectorStore {
/**
* 添加向量(带元数据)
*/
String add(float[] vector, Map<String, Object> metadata, String content);
/**
* 批量添加
*/
List<String> addAll(List<VectorDocument> documents);
/**
* 相似度搜索
*/
List<SearchResult> search(float[] queryVector, int topK, SearchFilter filter);
/**
* 按ID删除
*/
void delete(String id);
/**
* 按条件删除
*/
void deleteByFilter(SearchFilter filter);
/**
* 获取总文档数
*/
long count();
@Data
@Builder
class VectorDocument {
private String id; // 可以为null,自动生成
private float[] vector;
private String content; // 原始文本内容
private Map<String, Object> metadata;
}
@Data
@Builder
class SearchResult {
private String id;
private float[] vector;
private String content;
private Map<String, Object> metadata;
private double score;
}
@Data
@Builder
class SearchFilter {
// 支持AND/OR条件过滤
private List<FilterCondition> conditions;
private FilterLogic logic; // AND / OR
@Data
@Builder
static class FilterCondition {
private String field;
private FilterOperator operator;
private Object value;
}
enum FilterOperator { EQ, NEQ, GT, GTE, LT, LTE, IN, CONTAINS }
enum FilterLogic { AND, OR }
}
}Qdrant实现
/**
* Qdrant向量存储实现
*
* Qdrant是目前工程质量最高的向量数据库之一
* Rust实现,性能稳定,过滤能力强,部署简单
*/
@Service
@Slf4j
public class QdrantVectorStore implements VectorStore {
private final QdrantClient qdrantClient;
private final String collectionName;
private final int vectorSize;
@Autowired
public QdrantVectorStore(
@Value("${qdrant.host:localhost}") String host,
@Value("${qdrant.port:6334}") int port,
@Value("${qdrant.collection:documents}") String collectionName,
@Value("${qdrant.vector-size:1024}") int vectorSize) {
this.qdrantClient = new QdrantClient(
QdrantGrpcClient.newBuilder(host, port, false).build()
);
this.collectionName = collectionName;
this.vectorSize = vectorSize;
ensureCollectionExists();
}
private void ensureCollectionExists() {
try {
// 检查collection是否存在
boolean exists = qdrantClient.collectionExistsAsync(collectionName)
.get();
if (!exists) {
// 创建collection
qdrantClient.createCollectionAsync(
collectionName,
VectorsConfig.newBuilder()
.setParams(VectorParams.newBuilder()
.setSize(vectorSize)
.setDistance(Distance.Cosine)
.build())
.build()
).get();
log.info("创建Qdrant Collection: {}", collectionName);
}
} catch (Exception e) {
throw new RuntimeException("Qdrant初始化失败", e);
}
}
@Override
public String add(float[] vector, Map<String, Object> metadata, String content) {
String id = UUID.randomUUID().toString();
try {
Map<String, Value> payload = new HashMap<>();
payload.put("content", Value.newBuilder().setStringValue(content).build());
metadata.forEach((k, v) -> {
if (v instanceof String s) payload.put(k, Value.newBuilder().setStringValue(s).build());
else if (v instanceof Number n) payload.put(k, Value.newBuilder().setDoubleValue(n.doubleValue()).build());
else if (v instanceof Boolean b) payload.put(k, Value.newBuilder().setBoolValue(b).build());
});
qdrantClient.upsertAsync(collectionName,
List.of(PointStruct.newBuilder()
.setId(PointId.newBuilder().setUuid(id))
.setVectors(Vectors.newBuilder()
.setVector(Vector.newBuilder()
.addAllData(toFloatList(vector))))
.putAllPayload(payload)
.build())
).get();
return id;
} catch (Exception e) {
throw new RuntimeException("向量添加失败", e);
}
}
@Override
public List<String> addAll(List<VectorDocument> documents) {
if (documents.isEmpty()) return List.of();
List<PointStruct> points = new ArrayList<>();
List<String> ids = new ArrayList<>();
for (VectorDocument doc : documents) {
String id = doc.getId() != null ? doc.getId() : UUID.randomUUID().toString();
ids.add(id);
Map<String, Value> payload = new HashMap<>();
if (doc.getContent() != null) {
payload.put("content", Value.newBuilder().setStringValue(doc.getContent()).build());
}
if (doc.getMetadata() != null) {
doc.getMetadata().forEach((k, v) -> {
if (v instanceof String s) payload.put(k, Value.newBuilder().setStringValue(s).build());
else if (v instanceof Number n) payload.put(k, Value.newBuilder().setDoubleValue(n.doubleValue()).build());
});
}
points.add(PointStruct.newBuilder()
.setId(PointId.newBuilder().setUuid(id))
.setVectors(Vectors.newBuilder()
.setVector(Vector.newBuilder().addAllData(toFloatList(doc.getVector()))))
.putAllPayload(payload)
.build());
}
try {
// 批量upsert,每批1000条
for (int i = 0; i < points.size(); i += 1000) {
List<PointStruct> batch = points.subList(i, Math.min(i + 1000, points.size()));
qdrantClient.upsertAsync(collectionName, batch).get();
}
} catch (Exception e) {
throw new RuntimeException("批量向量添加失败", e);
}
return ids;
}
@Override
public List<SearchResult> search(float[] queryVector, int topK, SearchFilter filter) {
try {
SearchPoints.Builder searchBuilder = SearchPoints.newBuilder()
.setCollectionName(collectionName)
.addAllVector(toFloatList(queryVector))
.setLimit(topK)
.setWithPayload(WithPayloadSelector.newBuilder()
.setEnable(true).build());
// 添加过滤条件
if (filter != null && filter.getConditions() != null && !filter.getConditions().isEmpty()) {
searchBuilder.setFilter(buildQdrantFilter(filter));
}
List<ScoredPoint> hits = qdrantClient.searchAsync(searchBuilder.build()).get();
return hits.stream()
.map(hit -> {
Map<String, Object> metadata = new HashMap<>();
hit.getPayloadMap().forEach((k, v) -> {
if (v.hasStringValue()) metadata.put(k, v.getStringValue());
else if (v.hasDoubleValue()) metadata.put(k, v.getDoubleValue());
else if (v.hasBoolValue()) metadata.put(k, v.getBoolValue());
});
String content = (String) metadata.remove("content");
return SearchResult.builder()
.id(hit.getId().getUuid())
.content(content)
.metadata(metadata)
.score(hit.getScore())
.build();
})
.toList();
} catch (Exception e) {
throw new RuntimeException("向量搜索失败", e);
}
}
private Filter buildQdrantFilter(SearchFilter filter) {
// 将抽象过滤条件转换为Qdrant的Filter格式
List<Condition> conditions = filter.getConditions().stream()
.map(c -> {
switch (c.getOperator()) {
case EQ -> {
return Condition.newBuilder()
.setField(FieldCondition.newBuilder()
.setKey(c.getField())
.setMatch(Match.newBuilder()
.setValue(toQdrantValue(c.getValue()))))
.build();
}
default -> throw new UnsupportedOperationException(
"Unsupported operator: " + c.getOperator());
}
})
.toList();
Filter.Builder filterBuilder = Filter.newBuilder();
if (filter.getLogic() == SearchFilter.FilterLogic.AND) {
filterBuilder.addAllMust(conditions);
} else {
filterBuilder.addAllShould(conditions);
}
return filterBuilder.build();
}
@Override
public void delete(String id) {
try {
qdrantClient.deleteAsync(collectionName,
Points.newBuilder()
.addIds(PointId.newBuilder().setUuid(id))
.build()
).get();
} catch (Exception e) {
throw new RuntimeException("向量删除失败", e);
}
}
@Override
public void deleteByFilter(SearchFilter filter) {
// Qdrant支持按过滤条件批量删除
// 实现省略
}
@Override
public long count() {
try {
return qdrantClient.countAsync(
CountPoints.newBuilder()
.setCollectionName(collectionName)
.build()
).get().getCount();
} catch (Exception e) {
return -1;
}
}
private List<Float> toFloatList(float[] array) {
List<Float> list = new ArrayList<>(array.length);
for (float f : array) list.add(f);
return list;
}
private PointId.Builder toQdrantValue(Object value) {
// 简化实现
return PointId.newBuilder();
}
}pgvector实现(适合已有PostgreSQL的场景)
/**
* pgvector实现
*
* 如果团队已经在用PostgreSQL,pgvector是零额外运维成本的选择
* 优势:可以和业务数据JOIN查询(这是其他向量数据库做不到的)
* 劣势:超大规模(>5000万)性能不如专用向量数据库
*/
@Service
@Slf4j
public class PgVectorStore implements VectorStore {
private final JdbcTemplate jdbc;
private final String tableName;
private final int vectorSize;
@Autowired
public PgVectorStore(JdbcTemplate jdbc,
@Value("${pgvector.table:vector_documents}") String tableName,
@Value("${pgvector.vector-size:1024}") int vectorSize) {
this.jdbc = jdbc;
this.tableName = tableName;
this.vectorSize = vectorSize;
ensureTableExists();
}
private void ensureTableExists() {
// 安装pgvector扩展并创建表
jdbc.execute("CREATE EXTENSION IF NOT EXISTS vector");
jdbc.execute("""
CREATE TABLE IF NOT EXISTS %s (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT,
embedding vector(%d),
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
)
""".formatted(tableName, vectorSize));
// 创建向量索引(HNSW,适合高准确性场景)
jdbc.execute("""
CREATE INDEX IF NOT EXISTS %s_embedding_idx
ON %s USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""".formatted(tableName, tableName));
}
@Override
public String add(float[] vector, Map<String, Object> metadata, String content) {
String metaJson = toJson(metadata);
String vectorStr = toVectorString(vector);
return jdbc.queryForObject(
"INSERT INTO " + tableName + " (content, embedding, metadata) VALUES (?, ?::vector, ?::jsonb) RETURNING id",
String.class,
content, vectorStr, metaJson
);
}
@Override
public List<String> addAll(List<VectorDocument> documents) {
List<String> ids = new ArrayList<>();
// 批量插入
List<Object[]> batchArgs = documents.stream()
.map(doc -> new Object[]{
doc.getContent(),
toVectorString(doc.getVector()),
toJson(doc.getMetadata())
})
.toList();
jdbc.batchUpdate(
"INSERT INTO " + tableName + " (content, embedding, metadata) VALUES (?, ?::vector, ?::jsonb)",
batchArgs
);
return ids;
}
@Override
public List<SearchResult> search(float[] queryVector, int topK, SearchFilter filter) {
String vectorStr = toVectorString(queryVector);
StringBuilder sql = new StringBuilder("""
SELECT id, content, metadata,
1 - (embedding <=> ?::vector) AS score
FROM %s
""".formatted(tableName));
List<Object> params = new ArrayList<>();
params.add(vectorStr);
// 添加WHERE条件
if (filter != null && filter.getConditions() != null && !filter.getConditions().isEmpty()) {
sql.append(" WHERE ");
List<String> conditions = buildSqlConditions(filter, params);
String logic = filter.getLogic() == SearchFilter.FilterLogic.AND ? " AND " : " OR ";
sql.append(String.join(logic, conditions));
}
sql.append(" ORDER BY embedding <=> ?::vector LIMIT ?");
params.add(vectorStr); // 用于ORDER BY
params.add(topK);
return jdbc.query(sql.toString(), params.toArray(), (rs, rowNum) -> {
try {
Map<String, Object> metadata = fromJson(rs.getString("metadata"));
return SearchResult.builder()
.id(rs.getString("id"))
.content(rs.getString("content"))
.metadata(metadata)
.score(rs.getDouble("score"))
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Override
public void delete(String id) {
jdbc.update("DELETE FROM " + tableName + " WHERE id = ?::uuid", id);
}
@Override
public void deleteByFilter(SearchFilter filter) {
// 构建DELETE SQL
List<Object> params = new ArrayList<>();
List<String> conditions = buildSqlConditions(filter, params);
String whereClause = String.join(" AND ", conditions);
jdbc.update("DELETE FROM " + tableName + " WHERE " + whereClause, params.toArray());
}
@Override
public long count() {
Long count = jdbc.queryForObject("SELECT COUNT(*) FROM " + tableName, Long.class);
return count != null ? count : 0;
}
/**
* pgvector特有功能:和业务表JOIN查询
*
* 这是pgvector最大的优势,其他向量数据库做不到
* 例:找到向量相似的文档,同时过滤出只有特定用户有权限的文档
*/
public List<SearchResult> searchWithBusinessJoin(float[] queryVector, int topK,
String userId) {
String vectorStr = toVectorString(queryVector);
// 向量搜索 + JOIN业务表进行权限过滤
String sql = """
SELECT v.id, v.content, v.metadata,
1 - (v.embedding <=> ?::vector) AS score
FROM %s v
INNER JOIN document_permissions dp ON dp.document_id = v.id
WHERE dp.user_id = ?
OR dp.is_public = true
ORDER BY v.embedding <=> ?::vector
LIMIT ?
""".formatted(tableName);
return jdbc.query(sql, (rs, rowNum) -> {
try {
return SearchResult.builder()
.id(rs.getString("id"))
.content(rs.getString("content"))
.score(rs.getDouble("score"))
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, vectorStr, userId, vectorStr, topK);
}
private String toVectorString(float[] vector) {
StringBuilder sb = new StringBuilder("[");
for (int i = 0; i < vector.length; i++) {
if (i > 0) sb.append(",");
sb.append(vector[i]);
}
sb.append("]");
return sb.toString();
}
private String toJson(Map<String, Object> metadata) {
if (metadata == null || metadata.isEmpty()) return "{}";
try {
return new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(metadata);
} catch (Exception e) { return "{}"; }
}
@SuppressWarnings("unchecked")
private Map<String, Object> fromJson(String json) {
if (json == null) return new HashMap<>();
try {
return new com.fasterxml.jackson.databind.ObjectMapper()
.readValue(json, Map.class);
} catch (Exception e) { return new HashMap<>(); }
}
private List<String> buildSqlConditions(SearchFilter filter, List<Object> params) {
return filter.getConditions().stream()
.map(c -> {
switch (c.getOperator()) {
case EQ -> {
params.add(c.getValue());
return "metadata->>'%s' = ?".formatted(c.getField());
}
default -> throw new UnsupportedOperationException();
}
})
.toList();
}
}向量数据库迁移方案
/**
* 向量数据库迁移服务
*
* 零停机迁移策略:
* 1. 双写阶段:同时写入旧库和新库
* 2. 回填阶段:把旧库的历史数据迁移到新库
* 3. 读切换阶段:流量逐步切换到新库
* 4. 清理阶段:确认稳定后停写旧库
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class VectorStoreMigrationService {
private final VectorStore sourceStore;
private final VectorStore targetStore;
@Value("${migration.batch-size:500}")
private int batchSize;
/**
* 全量迁移
*
* 适合数据量不大(<100万)的场景
* 生产中通常需要在低峰期执行
*/
public MigrationReport migrateAll() {
log.info("开始全量迁移: source={}, target={}",
sourceStore.getClass().getSimpleName(),
targetStore.getClass().getSimpleName());
long totalCount = sourceStore.count();
log.info("源数据库总量: {}", totalCount);
long migrated = 0;
long failed = 0;
// 分页拉取源数据
// 注意:不同向量数据库的分页方式不同,这里用通用方式
List<VectorStore.VectorDocument> batch = fetchNextBatch(0, batchSize);
while (!batch.isEmpty()) {
try {
targetStore.addAll(batch);
migrated += batch.size();
if (migrated % 10000 == 0) {
log.info("迁移进度: {}/{} ({:.1f}%)",
migrated, totalCount, (double)migrated/totalCount*100);
}
} catch (Exception e) {
failed += batch.size();
log.error("批量迁移失败: batch size={}, error={}", batch.size(), e.getMessage());
}
batch = fetchNextBatch(migrated, batchSize);
}
log.info("迁移完成: migrated={}, failed={}", migrated, failed);
return new MigrationReport(totalCount, migrated, failed);
}
/**
* 增量迁移(仅迁移某时间点之后的新数据)
*
* 需要源数据库支持按时间过滤
*/
public MigrationReport migrateIncremental(LocalDateTime since) {
VectorStore.SearchFilter timeFilter = VectorStore.SearchFilter.builder()
.conditions(List.of(
VectorStore.SearchFilter.FilterCondition.builder()
.field("created_at")
.operator(VectorStore.SearchFilter.FilterOperator.GT)
.value(since.toString())
.build()
))
.logic(VectorStore.SearchFilter.FilterLogic.AND)
.build();
log.info("增量迁移: since={}", since);
// 以空向量搜索(仅用于拉取数据,不做相似度过滤)
// 实际实现需要根据具体数据库API来拉取
long migrated = 0;
// ... 实现省略,原理相同
return new MigrationReport(0, migrated, 0);
}
/**
* 验证迁移质量
*
* 抽样验证:随机取N个向量,在新旧两个库里搜索,对比结果
*/
public ValidationReport validateMigration(int sampleSize) {
log.info("开始迁移验证: sampleSize={}", sampleSize);
int consistent = 0;
int inconsistent = 0;
List<String> inconsistentIds = new ArrayList<>();
// 随机抽样查询
for (int i = 0; i < sampleSize; i++) {
float[] randomVector = generateRandomVector(1024);
List<VectorStore.SearchResult> sourceResults =
sourceStore.search(randomVector, 10, null);
List<VectorStore.SearchResult> targetResults =
targetStore.search(randomVector, 10, null);
// 比较前5个结果的ID是否一致(允许小幅差异,因为索引算法不完全相同)
Set<String> sourceIds = sourceResults.stream()
.limit(5)
.map(VectorStore.SearchResult::getId)
.collect(Collectors.toSet());
Set<String> targetIds = targetResults.stream()
.limit(5)
.map(VectorStore.SearchResult::getId)
.collect(Collectors.toSet());
// 计算交集比例
long intersection = sourceIds.stream()
.filter(targetIds::contains).count();
if (intersection >= 3) { // 至少3/5一致认为ok
consistent++;
} else {
inconsistent++;
// 记录不一致的查询(用于调试)
if (inconsistentIds.size() < 10) {
inconsistentIds.add("query_" + i + ": source=" + sourceIds + " target=" + targetIds);
}
}
}
double consistencyRate = (double) consistent / sampleSize;
log.info("迁移验证结果: consistencyRate={:.1f}%, inconsistent={}",
consistencyRate * 100, inconsistent);
return new ValidationReport(sampleSize, consistent, inconsistent,
consistencyRate, inconsistentIds);
}
private List<VectorStore.VectorDocument> fetchNextBatch(long offset, int limit) {
// 从源数据库分页拉取数据
// 具体实现依赖源数据库的API
return List.of();
}
private float[] generateRandomVector(int size) {
float[] v = new float[size];
java.util.Random random = new java.util.Random();
for (int i = 0; i < size; i++) v[i] = random.nextFloat();
return v;
}
record MigrationReport(long totalCount, long migratedCount, long failedCount) {}
record ValidationReport(int sampleSize, int consistent, int inconsistent,
double consistencyRate, List<String> inconsistentExamples) {}
}实践建议
抽象层是一次性投资,价值极高
从项目第一天就设计好VectorStore接口,而不是直接用具体SDK,几乎不需要额外工作量,但以后带来的灵活性是巨大的。我在2022年底的一个项目里坚持了这个原则,结果18个月后我们平滑迁移了Chroma到Qdrant,只改了一个Spring Bean的配置,业务代码零修改,整个迁移过程不到两天,服务零停机。
别在原型阶段就用生产级数据库
Chroma的价值在于:零配置、可以直接跑、适合验证想法。在探索阶段用Chroma,验证方案可行再换生产级数据库,是很合理的路径。很多团队的问题是反过来的——原型用了Milvus,结果配置和运维消耗了大量时间,反而慢了验证速度。
迁移时一定要做双写+验证,不要直接割接
哪怕数据量很小,直接停写旧库、迁移数据、启用新库这种"一刀切"操作风险很高。双写至少要持续24-48小时,观察新库的查询结果和旧库是否一致。向量索引的构建需要时间,新库刚启动时可能搜索质量不稳定,要等索引完全建好再切流量。
