第2082篇:Spring AI的VectorStore集成——从pgvector到Qdrant的迁移实践
第2082篇:Spring AI的VectorStore集成——从pgvector到Qdrant的迁移实践
适读人群:正在选型或迁移向量数据库的Java工程师 | 阅读时长:约20分钟 | 核心价值:深入理解Spring AI VectorStore抽象层,掌握pgvector和Qdrant的集成差异,以及生产环境无缝迁移方案
去年我们团队的RAG系统用的是pgvector——选择它的原因很简单,已经有PostgreSQL了,加个插件就行,运维成本几乎为零。
但随着向量数据量增长到3000万条,问题开始暴露:查询P99延迟从原来的80ms飙升到了450ms,加索引、调参数、升配置,能做的都做了,还是压不下来。
最终决定迁移到Qdrant。迁移过程踩了不少坑,这篇把完整的过程记录下来。
Spring AI的VectorStore抽象
Spring AI设计了一个统一的VectorStore接口,理论上换底层实现不需要改业务代码:
/**
* Spring AI VectorStore核心接口
* 统一抽象屏蔽底层向量DB差异
*/
public interface VectorStore {
/**
* 添加文档(自动向量化)
*/
void add(List<Document> documents);
/**
* 删除文档
*/
Optional<Boolean> delete(List<String> idList);
/**
* 相似度检索
*/
List<Document> similaritySearch(SearchRequest request);
/**
* 带过滤的相似度检索(Spring AI 1.0+)
*/
default List<Document> similaritySearch(String query) {
return similaritySearch(SearchRequest.query(query));
}
}
/**
* SearchRequest:构建检索条件
*/
SearchRequest request = SearchRequest.query("如何配置连接池")
.withTopK(5) // 返回前5条
.withSimilarityThreshold(0.75) // 最低相似度
.withFilterExpression("category == 'java'"); // 元数据过滤接口很简洁,但背后的差异很大。pgvector和Qdrant在过滤语法、索引策略、元数据存储上有本质不同——这就是迁移时踩坑的根源。
pgvector集成:老朋友的局限
/**
* pgvector配置
* 适合:已有PostgreSQL、数据量<500万、延迟要求不极端
*/
@Configuration
public class PgVectorConfig {
@Bean
public VectorStore pgVectorStore(
JdbcTemplate jdbcTemplate,
EmbeddingModel embeddingModel) {
return PgVectorStore.builder()
.jdbcTemplate(jdbcTemplate)
.embeddingModel(embeddingModel)
// 表名(默认vector_store)
.tableName("knowledge_vectors")
// 向量维度——必须和EmbeddingModel匹配!
.dimensions(1536)
// 索引类型:IVFFLAT(快速但精度略低)或 HNSW(高精度但内存大)
.indexType(PgVectorStore.PgIndexType.HNSW)
// 距离度量(和嵌入模型保持一致)
.distanceType(PgVectorStore.PgDistanceType.COSINE_DISTANCE)
// 是否自动建表和索引
.initializeSchema(true)
.build();
}
}pgvector的HNSW索引参数需要手动调:
-- 建索引时的参数
CREATE INDEX ON knowledge_vectors
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
-- 查询时的参数
SET hnsw.ef_search = 100; -- 越大越精确但越慢这里有个坑:pgvector的hnsw.ef_search是session级别的配置,如果用连接池,不同连接的ef_search可能不一样。我们曾经出现过部分查询精度明显下降的问题,排查了半天才发现是连接池里有些连接的ef_search还是默认值40。
解决方案:
/**
* 自定义JdbcTemplate,每次执行查询前设置ef_search
* 解决连接池导致的配置不一致问题
*/
@Configuration
public class PgVectorJdbcConfig {
@Bean
@Primary
public JdbcTemplate vectorJdbcTemplate(DataSource dataSource) {
JdbcTemplate template = new JdbcTemplate(dataSource) {
@Override
public <T> T query(String sql, ResultSetExtractor<T> rse, Object... args) {
// 查询前确保ef_search设置正确
execute("SET hnsw.ef_search = 100");
return super.query(sql, rse, args);
}
};
return template;
}
}pgvector的元数据过滤实现
Spring AI的过滤表达式 category == 'java' AND year >= 2024 在pgvector里会被转成PostgreSQL的WHERE条件。
/**
* pgvector的元数据存储方式:JSONB列
*
* 表结构:
* - id: UUID
* - content: TEXT
* - metadata: JSONB ← 存所有自定义字段
* - embedding: vector(1536)
*/
// Spring AI会自动把元数据存进JSONB
Document doc = new Document(
"Spring AI是一个AI工程框架",
Map.of(
"category", "java",
"year", 2024,
"author", "老张",
"tags", List.of("spring", "ai", "java") // 注意:列表类型的过滤支持有限
)
);
vectorStore.add(List.of(doc));
// 检索时过滤
List<Document> results = vectorStore.similaritySearch(
SearchRequest.query("Spring AI如何配置")
.withFilterExpression("category == 'java' AND year >= 2024")
);但JSONB的GIN索引对向量检索的加速效果有限——pgvector本质上还是用向量索引找候选集,然后在结果集上做元数据过滤(post-filter),数据量大时过滤效率不高。
Qdrant集成:专业向量数据库的优势
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-qdrant-store-spring-boot-starter</artifactId>
</dependency>spring:
ai:
vectorstore:
qdrant:
host: localhost
port: 6334
collection-name: knowledge_base
use-tls: false
api-key: ${QDRANT_API_KEY:}
embedding:
openai:
api-key: ${OPENAI_API_KEY}/**
* Qdrant的Collection需要提前创建,或者用Spring AI自动创建
* 自动创建时需要指定向量维度
*/
@Configuration
public class QdrantVectorStoreConfig {
@Bean
public VectorStore qdrantVectorStore(
QdrantClient qdrantClient,
EmbeddingModel embeddingModel) {
return QdrantVectorStore.builder()
.qdrantClient(qdrantClient)
.embeddingModel(embeddingModel)
.collectionName("knowledge_base")
// Qdrant支持多种距离度量
.initializeSchema(true)
.build();
}
@Bean
public QdrantClient qdrantClient() {
return new QdrantClient(
QdrantGrpcClient.newBuilder("localhost", 6334, false)
.build()
);
}
}Qdrant的核心优势:payload过滤是在索引层做的(pre-filter),不是查完再过滤。这意味着过滤条件越精确,查询越快——和pgvector刚好相反。
/**
* Qdrant的过滤表达式和pgvector语法完全一样
* Spring AI统一了过滤DSL
*
* 但底层实现完全不同:
* - pgvector: WHERE (metadata->>'category')::text = 'java' AND (metadata->>'year')::int >= 2024
* - Qdrant: payload filter,在HNSW图遍历时就排除不符合条件的节点
*/
List<Document> results = vectorStore.similaritySearch(
SearchRequest.query("Spring AI如何配置")
.withTopK(5)
.withSimilarityThreshold(0.75)
.withFilterExpression("category == 'java' AND year >= 2024")
);迁移方案:双写 + 流量切换
直接停机迁移风险太大,我们用的是双写方案:
/**
* 双写VectorStore
* 迁移期间同时写入pgvector和Qdrant
* 读流量逐步从pgvector切换到Qdrant
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class MigrationVectorStore implements VectorStore {
private final VectorStore pgVectorStore;
private final VectorStore qdrantVectorStore;
// 迁移阶段:0=全走pgvector,1=双写pgvector主读,2=双写Qdrant主读,3=全走Qdrant
@Value("${migration.phase:1}")
private int migrationPhase;
// Qdrant读流量比例(0-100)
@Value("${migration.qdrant-read-percentage:0}")
private int qdrantReadPercentage;
@Override
public void add(List<Document> documents) {
// 双写阶段:同时写入两个存储
if (migrationPhase >= 1 && migrationPhase <= 2) {
// pgvector写入(同步)
try {
pgVectorStore.add(documents);
} catch (Exception e) {
log.error("pgvector写入失败: {}", e.getMessage());
throw e; // 主存储失败,整体失败
}
// Qdrant写入(异步,不影响主链路)
CompletableFuture.runAsync(() -> {
try {
qdrantVectorStore.add(documents);
} catch (Exception e) {
log.error("Qdrant异步写入失败,需要补偿: documentIds={}",
documents.stream().map(Document::getId).toList(), e);
// 记录失败,后续补偿
recordFailedDocuments(documents);
}
});
} else if (migrationPhase == 3) {
// 全量迁移完成,只写Qdrant
qdrantVectorStore.add(documents);
}
}
@Override
public List<Document> similaritySearch(SearchRequest request) {
// 根据流量比例决定从哪里读
if (shouldReadFromQdrant()) {
try {
List<Document> qdrantResults = qdrantVectorStore.similaritySearch(request);
log.debug("Qdrant检索: {} 条结果", qdrantResults.size());
return qdrantResults;
} catch (Exception e) {
log.warn("Qdrant检索失败,回退到pgvector: {}", e.getMessage());
return pgVectorStore.similaritySearch(request);
}
}
return pgVectorStore.similaritySearch(request);
}
@Override
public Optional<Boolean> delete(List<String> idList) {
// 双写期间,两边都删除
Optional<Boolean> pgResult = pgVectorStore.delete(idList);
if (migrationPhase >= 1 && migrationPhase <= 2) {
CompletableFuture.runAsync(() -> {
try {
qdrantVectorStore.delete(idList);
} catch (Exception e) {
log.error("Qdrant异步删除失败: ids={}", idList, e);
}
});
}
return pgResult;
}
private boolean shouldReadFromQdrant() {
if (migrationPhase == 3) return true;
if (migrationPhase < 2) return false;
// phase 2:按比例分流
return ThreadLocalRandom.current().nextInt(100) < qdrantReadPercentage;
}
private void recordFailedDocuments(List<Document> documents) {
// 写入Redis记录失败的文档ID,后续补偿任务处理
// 实际实现略
}
}历史数据迁移工具
双写只能保证新数据,历史数据需要批量迁移:
/**
* 历史数据从pgvector迁移到Qdrant
* 支持断点续传、并发控制、进度监控
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class VectorStoreMigrationJob {
private final JdbcTemplate jdbcTemplate;
private final VectorStore qdrantStore;
private final RedisTemplate<String, String> redisTemplate;
private static final String PROGRESS_KEY = "migration:pgvector_to_qdrant:progress";
private static final int BATCH_SIZE = 500;
private static final int CONCURRENT_BATCHES = 4;
/**
* 执行迁移
* 支持断点续传:从Redis读取上次迁移到的位置
*/
public MigrationResult migrate() {
long startOffset = getLastSuccessOffset();
long totalCount = getTotalCount();
log.info("开始迁移: 总条数={}, 从offset={}开始", totalCount, startOffset);
AtomicLong successCount = new AtomicLong(0);
AtomicLong failCount = new AtomicLong(0);
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_BATCHES);
try {
long offset = startOffset;
while (offset < totalCount) {
long batchOffset = offset;
// 提交批次任务
executor.submit(() -> {
try {
migrateBatch(batchOffset, BATCH_SIZE);
successCount.addAndGet(BATCH_SIZE);
// 更新进度
updateProgress(batchOffset + BATCH_SIZE);
log.info("迁移进度: {}/{} ({:.1f}%)",
batchOffset + BATCH_SIZE, totalCount,
(batchOffset + BATCH_SIZE) * 100.0 / totalCount);
} catch (Exception e) {
failCount.addAndGet(BATCH_SIZE);
log.error("批次迁移失败: offset={}, error={}", batchOffset, e.getMessage());
}
});
offset += BATCH_SIZE;
// 控制并发,避免过载Qdrant
if ((offset / BATCH_SIZE) % CONCURRENT_BATCHES == 0) {
Thread.sleep(100); // 小停顿
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return new MigrationResult(totalCount, successCount.get(), failCount.get());
}
private void migrateBatch(long offset, int limit) {
// 从pgvector查询原始数据(包括已有的embedding向量)
List<Map<String, Object>> rows = jdbcTemplate.queryForList(
"SELECT id, content, metadata, embedding::text as embedding_str " +
"FROM knowledge_vectors ORDER BY id OFFSET ? LIMIT ?",
offset, limit
);
List<Document> documents = rows.stream()
.map(row -> {
String id = row.get("id").toString();
String content = row.get("content").toString();
// 解析元数据
Map<String, Object> metadata = parseJsonbMetadata(
row.get("metadata").toString());
metadata.put("id", id); // Spring AI Document需要id在metadata里
return new Document(id, content, metadata);
})
.toList();
// 注意:这里传入的Document包含原始向量,Qdrant直接存储,不重新向量化
// 但Spring AI的VectorStore接口会重新embed!
// 解决方案:直接用Qdrant原生API写入,绕过Spring AI的embed过程
insertDirectlyToQdrant(rows);
}
/**
* 直接调用Qdrant客户端写入,避免重复向量化(节省大量OpenAI费用)
*/
@Autowired
private QdrantClient qdrantClient;
private void insertDirectlyToQdrant(List<Map<String, Object>> rows) {
List<PointStruct> points = rows.stream()
.map(row -> {
String id = row.get("id").toString();
float[] embedding = parseEmbeddingVector(row.get("embedding_str").toString());
Map<String, Object> metadata = parseJsonbMetadata(row.get("metadata").toString());
// 构建Qdrant点
return PointStruct.newBuilder()
.setId(PointId.newBuilder().setUuid(id))
.setVectors(Vectors.newBuilder()
.setVector(Vector.newBuilder()
.addAllData(toFloatList(embedding))))
.putAllPayload(convertToQdrantPayload(metadata,
row.get("content").toString()))
.build();
})
.toList();
// 批量写入Qdrant
qdrantClient.upsertAsync("knowledge_base", points).get();
}
private float[] parseEmbeddingVector(String vectorStr) {
// pgvector格式:[0.1,0.2,0.3,...]
String cleaned = vectorStr.replace("[", "").replace("]", "");
String[] parts = cleaned.split(",");
float[] result = new float[parts.length];
for (int i = 0; i < parts.length; i++) {
result[i] = Float.parseFloat(parts[i].trim());
}
return result;
}
private Map<String, Value> convertToQdrantPayload(
Map<String, Object> metadata, String content) {
Map<String, Value> payload = new HashMap<>();
payload.put("content", Value.newBuilder().setStringValue(content).build());
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Value value = convertToQdrantValue(entry.getValue());
if (value != null) {
payload.put(entry.getKey(), value);
}
}
return payload;
}
private Value convertToQdrantValue(Object obj) {
if (obj instanceof String s) {
return Value.newBuilder().setStringValue(s).build();
} else if (obj instanceof Number n) {
return Value.newBuilder().setDoubleValue(n.doubleValue()).build();
} else if (obj instanceof Boolean b) {
return Value.newBuilder().setBoolValue(b).build();
}
return null;
}
private long getTotalCount() {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM knowledge_vectors", Long.class);
}
private long getLastSuccessOffset() {
String progress = redisTemplate.opsForValue().get(PROGRESS_KEY);
return progress != null ? Long.parseLong(progress) : 0L;
}
private void updateProgress(long offset) {
redisTemplate.opsForValue().set(PROGRESS_KEY, String.valueOf(offset));
}
private Map<String, Object> parseJsonbMetadata(String jsonb) {
// 解析JSONB格式的元数据,略
return new HashMap<>();
}
private List<Float> toFloatList(float[] array) {
List<Float> list = new ArrayList<>(array.length);
for (float f : array) list.add(f);
return list;
}
public record MigrationResult(long total, long success, long failed) {}
}迁移验证:结果一致性检查
数据迁移完不等于完成,需要验证两边的检索结果是否一致:
/**
* 迁移验证:对比pgvector和Qdrant的检索结果
* 用真实查询案例做一致性检验
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class MigrationValidator {
private final VectorStore pgVectorStore;
private final VectorStore qdrantVectorStore;
/**
* 并行查询两个存储,计算结果重叠率
*/
public ValidationReport validate(List<String> testQueries) {
List<QueryComparison> comparisons = testQueries.stream()
.map(this::compareQuery)
.toList();
// 统计各个重叠率区间的分布
double avgOverlap = comparisons.stream()
.mapToDouble(QueryComparison::topKOverlapRate)
.average()
.orElse(0.0);
long lowOverlapCount = comparisons.stream()
.filter(c -> c.topKOverlapRate() < 0.6)
.count();
log.info("迁移验证结果: 平均TopK重叠率={:.2f}%, 低重叠查询数={}",
avgOverlap * 100, lowOverlapCount);
return new ValidationReport(comparisons, avgOverlap, lowOverlapCount);
}
private QueryComparison compareQuery(String query) {
SearchRequest request = SearchRequest.query(query).withTopK(10);
List<Document> pgResults = pgVectorStore.similaritySearch(request);
List<Document> qdrantResults = qdrantVectorStore.similaritySearch(request);
// 计算ID集合的交集
Set<String> pgIds = pgResults.stream()
.map(Document::getId)
.collect(Collectors.toSet());
Set<String> qdrantIds = qdrantResults.stream()
.map(Document::getId)
.collect(Collectors.toSet());
long intersection = pgIds.stream().filter(qdrantIds::contains).count();
double overlapRate = (double) intersection /
Math.max(pgIds.size(), qdrantIds.size());
if (overlapRate < 0.6) {
log.warn("低重叠率查询: query='{}', pgCount={}, qdrantCount={}, overlap={:.2f}",
query, pgIds.size(), qdrantIds.size(), overlapRate);
}
return new QueryComparison(query, pgIds.size(), qdrantIds.size(),
(int) intersection, overlapRate);
}
public record QueryComparison(
String query,
int pgCount,
int qdrantCount,
int intersection,
double topKOverlapRate
) {}
public record ValidationReport(
List<QueryComparison> comparisons,
double averageOverlapRate,
long lowOverlapQueryCount
) {}
}我们的迁移验证标准:TopK=10的平均重叠率 ≥ 85%。第一次跑出来只有79%,排查发现是Qdrant的默认ef参数比pgvector低很多,调大之后达到了91%。
性能对比(实测数据)
迁移完成后,用相同的测试集对比了两边的性能:
/**
* 性能基准测试
* 模拟生产流量进行压测对比
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class VectorStoreBenchmark {
private final VectorStore pgVectorStore;
private final VectorStore qdrantVectorStore;
public BenchmarkResult runBenchmark(
List<String> testQueries,
int concurrency,
int durationSeconds) {
return BenchmarkResult.builder()
.pgvectorStats(benchmark(pgVectorStore, testQueries, concurrency, durationSeconds))
.qdrantStats(benchmark(qdrantVectorStore, testQueries, concurrency, durationSeconds))
.build();
}
private LatencyStats benchmark(
VectorStore store,
List<String> queries,
int concurrency,
int durationSeconds) {
List<Long> latencies = new CopyOnWriteArrayList<>();
AtomicLong requestCount = new AtomicLong(0);
AtomicLong errorCount = new AtomicLong(0);
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
long endTime = System.currentTimeMillis() + durationSeconds * 1000L;
for (int i = 0; i < concurrency; i++) {
executor.submit(() -> {
Random random = new Random();
while (System.currentTimeMillis() < endTime) {
String query = queries.get(random.nextInt(queries.size()));
long start = System.currentTimeMillis();
try {
store.similaritySearch(
SearchRequest.query(query).withTopK(5));
latencies.add(System.currentTimeMillis() - start);
requestCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
}
}
});
}
executor.shutdown();
try {
executor.awaitTermination(durationSeconds + 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 计算统计指标
List<Long> sorted = latencies.stream().sorted().toList();
int size = sorted.size();
return new LatencyStats(
sorted.get((int)(size * 0.50)), // P50
sorted.get((int)(size * 0.90)), // P90
sorted.get((int)(size * 0.99)), // P99
sorted.stream().mapToLong(Long::longValue).average().orElse(0),
requestCount.get(),
errorCount.get(),
(double) requestCount.get() / durationSeconds // QPS
);
}
public record LatencyStats(
long p50Ms, long p90Ms, long p99Ms, double avgMs,
long totalRequests, long errors, double qps
) {}
@Builder
public record BenchmarkResult(
LatencyStats pgvectorStats,
LatencyStats qdrantStats
) {}
}实测结果(3000万条数据,并发50,topK=5,带过滤条件):
| 指标 | pgvector | Qdrant | 提升 |
|---|---|---|---|
| P50 | 45ms | 12ms | 73%↑ |
| P90 | 180ms | 28ms | 84%↑ |
| P99 | 450ms | 65ms | 86%↑ |
| QPS | 620 | 2100 | 239%↑ |
有过滤条件时Qdrant的优势更明显,因为它的payload filter是pre-filter而不是post-filter。
踩坑总结
坑1:向量维度锁定
Qdrant的Collection一旦创建,向量维度就不能改了。如果后来换了Embedding模型(比如从text-embedding-ada-002的1536维换到text-embedding-3-small的1536维虽然维度一样,但语义空间不同),所有数据都需要重新向量化。
建议:Collection名字带上模型版本,knowledge_base_ada002_v1。
坑2:Spring AI的过滤语法有差异
IN 运算符在pgvector和Qdrant的支持不同:
// 这个在pgvector可以,在Qdrant可能需要换写法
.withFilterExpression("category IN ['java', 'python']")
// Qdrant更可靠的写法
.withFilterExpression("category == 'java' OR category == 'python'")坑3:Qdrant的payload字段类型要一致
Qdrant对payload字段的类型推断比较严格,同一个字段不能混用字符串和数字。如果历史数据里有些文档的 year 字段存的是字符串 "2024",有些是整数 2024,过滤查询会出问题。
迁移前要做数据清洗:
/**
* 迁移前数据清洗:统一字段类型
*/
private Map<String, Object> normalizeMetadata(Map<String, Object> metadata) {
Map<String, Object> normalized = new HashMap<>(metadata);
// 统一year为整数
if (normalized.containsKey("year")) {
Object year = normalized.get("year");
if (year instanceof String s) {
try {
normalized.put("year", Integer.parseInt(s));
} catch (NumberFormatException e) {
normalized.remove("year"); // 无效的year字段,删除
}
}
}
// 统一boolean字段
if (normalized.containsKey("isPublic")) {
Object isPublic = normalized.get("isPublic");
if (isPublic instanceof String s) {
normalized.put("isPublic", "true".equalsIgnoreCase(s));
}
}
return normalized;
}坑4:迁移期间的内存占用
批量迁移时同时加载大量向量数据会撑爆内存。BATCH_SIZE=500是比较安全的值,对于1536维的float向量,每条约6KB,500条约3MB,加上其他开销在30MB以内。
整个迁移过程花了两周:一周双写观察稳定性,一周跑历史数据迁移和验证,最后两天灰度切流量。
最终效果:P99从450ms降到65ms,基本上消除了用户感知到的"卡顿"。pgvector不是不好,只是它不是为向量检索专门设计的——当数据量和并发都上来之后,专业工具还是有明显优势。
