Qdrant 部署和使用实战——换向量数据库是否值得
Qdrant 部署和使用实战——换向量数据库是否值得
适读人群:正在使用 pgvector 但感觉不够用、考虑迁移到专用向量数据库的开发者 | 阅读时长:约 15 分钟 | 核心价值:一次真实迁移的完整记录,包括数据和踩坑
迁移这个决定不是一拍脑袋做的。
事情的起点是某个周五下午,线上系统的监控开始报警。P99 响应时间从平时的 80ms 一路涨到 1200ms,用户开始反馈"好慢"。
我打开监控面板,发现是向量查询把 PostgreSQL 的 CPU 打满了。那时候我们的向量数据量大概是 420 万条,HNSW 索引占了将近 60GB 内存,服务器内存已经吃到 85%。
我知道,pgvector 的边界到了。
为什么选 Qdrant
我调研了三个候选:Milvus、Weaviate、Qdrant。
Milvus:功能最全,支持分布式,但部署复杂,依赖多(需要 etcd 和 MinIO),维护成本高。对我们这种三人小团队不适合。
Weaviate:内置了一些智能特性(自动向量化),但配置比较复杂,文档里很多概念和普通向量数据库不一样,学习曲线陡。
Qdrant:Rust 写的,性能好,部署简单(单个二进制文件或 Docker 镜像),文档清晰,API 设计符合直觉。
选 Qdrant 的核心理由:简单。能跑起来、跑得稳、运维简单,对我们比功能全重要得多。
部署
用 Docker Compose 部署,生产环境加上持久化和资源限制:
# docker-compose.yml
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:v1.9.0
restart: unless-stopped
ports:
- "6333:6333" # REST API
- "6334:6334" # gRPC
volumes:
- ./qdrant_storage:/qdrant/storage
- ./qdrant_config.yaml:/qdrant/config/production.yaml
environment:
- QDRANT__SERVICE__API_KEY=${QDRANT_API_KEY}
deploy:
resources:
limits:
memory: 24G # 根据你的机器配置
cpus: '8'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6333/health"]
interval: 30s
timeout: 10s
retries: 3Qdrant 的配置文件:
# qdrant_config.yaml
storage:
# 向量存储在内存还是磁盘
# on_disk: false 表示全内存,性能最好
# on_disk: true 表示使用磁盘存储,内存占用小但慢
on_disk_payload: true # payload(元数据)存磁盘,节省内存
hnsw_index:
m: 16
ef_construct: 100
full_scan_threshold: 10000 # 数据量小于这个时用全扫描而非索引
service:
max_request_size_mb: 32
# 开启 gRPC(比 REST 快约 30%)
service:
grpc_port: 6334
enable_cors: false数据迁移
从 pgvector 迁移到 Qdrant,核心是三步:导出 PG 数据、批量写入 Qdrant、验证数据完整性。
import psycopg2
from pgvector.psycopg2 import register_vector
from qdrant_client import QdrantClient
from qdrant_client.models import (
VectorParams, Distance, PointStruct,
HnswConfigDiff, OptimizersConfigDiff
)
import numpy as np
from tqdm import tqdm
import json
class PgvectorToQdrantMigrator:
def __init__(self, pg_config: dict, qdrant_url: str, qdrant_api_key: str):
self.pg_conn = psycopg2.connect(**pg_config)
register_vector(self.pg_conn)
self.qdrant = QdrantClient(
url=qdrant_url,
api_key=qdrant_api_key,
timeout=60
)
def create_collection(
self,
collection_name: str,
vector_size: int = 1536
):
"""创建 Qdrant collection"""
# 检查是否已存在
existing = [c.name for c in self.qdrant.get_collections().collections]
if collection_name in existing:
print(f"Collection {collection_name} 已存在,跳过创建")
return
self.qdrant.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE,
on_disk=False # 向量放内存,查询快
),
# HNSW 索引配置
hnsw_config=HnswConfigDiff(
m=16,
ef_construct=100,
full_scan_threshold=10000,
on_disk=False
),
# 优化器配置
optimizers_config=OptimizersConfigDiff(
indexing_threshold=20000, # 超过这个数量才建索引
memmap_threshold=50000
)
)
print(f"Collection {collection_name} 创建完成")
def migrate(
self,
pg_table: str,
collection_name: str,
batch_size: int = 500
):
"""执行迁移"""
# 获取总数
with self.pg_conn.cursor() as cur:
cur.execute(f"SELECT COUNT(*) FROM {pg_table}")
total = cur.fetchone()[0]
print(f"开始迁移,共 {total} 条记录")
migrated = 0
errors = 0
# 分批读取 PG 数据
with self.pg_conn.cursor("migrate_cursor") as cur:
cur.execute(f"""
SELECT id, content, metadata, embedding
FROM {pg_table}
ORDER BY id
""")
batch = []
with tqdm(total=total) as pbar:
for row in cur:
doc_id, content, metadata, embedding = row
if embedding is None:
errors += 1
pbar.update(1)
continue
# 构建 Qdrant Point
# Qdrant 的 ID 必须是 UUID 或正整数
point = PointStruct(
id=int(doc_id),
vector=embedding.tolist(),
payload={
"content": content,
**(metadata if isinstance(metadata, dict) else {})
}
)
batch.append(point)
if len(batch) >= batch_size:
try:
self.qdrant.upsert(
collection_name=collection_name,
points=batch,
wait=True # 等待写入确认
)
migrated += len(batch)
pbar.update(len(batch))
batch = []
except Exception as e:
print(f"\n批次写入失败: {e}")
errors += len(batch)
batch = []
# 处理最后一批
if batch:
self.qdrant.upsert(
collection_name=collection_name,
points=batch,
wait=True
)
migrated += len(batch)
pbar.update(len(batch))
print(f"\n迁移完成:成功 {migrated} 条,失败 {errors} 条")
return migrated, errors
def verify(self, pg_table: str, collection_name: str, sample_size: int = 100):
"""抽样验证迁移正确性"""
with self.pg_conn.cursor() as cur:
cur.execute(f"""
SELECT id, embedding
FROM {pg_table}
ORDER BY RANDOM()
LIMIT {sample_size}
""")
samples = cur.fetchall()
correct = 0
for doc_id, embedding in tqdm(samples, desc="验证中"):
# 用原始向量在 Qdrant 里查自己
results = self.qdrant.search(
collection_name=collection_name,
query_vector=embedding.tolist(),
limit=1,
with_payload=False
)
if results and results[0].id == int(doc_id):
correct += 1
accuracy = correct / len(samples) * 100
print(f"\n验证完成:{correct}/{len(samples)} 正确 ({accuracy:.1f}%)")
return accuracyQdrant 的查询 API
迁移完成后,日常查询的 Python 代码:
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue, Range
from openai import OpenAI
openai_client = OpenAI()
qdrant = QdrantClient(url="http://localhost:6333", api_key="your-api-key")
def search_documents(
query: str,
collection: str = "documents",
filters: dict = None,
top_k: int = 5,
score_threshold: float = 0.7
) -> list:
# 生成查询向量
response = openai_client.embeddings.create(
input=query,
model="text-embedding-3-small"
)
query_vector = response.data[0].embedding
# 构建过滤条件
qdrant_filter = None
if filters:
conditions = []
for key, value in filters.items():
if isinstance(value, (str, bool)):
conditions.append(
FieldCondition(key=key, match=MatchValue(value=value))
)
elif isinstance(value, dict) and ("gte" in value or "lte" in value):
conditions.append(
FieldCondition(
key=key,
range=Range(
gte=value.get("gte"),
lte=value.get("lte")
)
)
)
if conditions:
from qdrant_client.models import Filter, Must
qdrant_filter = Filter(must=conditions)
# 执行查询
results = qdrant.search(
collection_name=collection,
query_vector=query_vector,
query_filter=qdrant_filter,
limit=top_k,
score_threshold=score_threshold,
with_payload=True
)
return [
{
"id": r.id,
"content": r.payload.get("content", ""),
"metadata": {k: v for k, v in r.payload.items() if k != "content"},
"score": r.score
}
for r in results
]迁移前后的性能对比
迁移完成后,同一台服务器(8核16GB),420 万条向量,Top-5 查询:
| 指标 | pgvector (HNSW) | Qdrant | 提升 |
|---|---|---|---|
| 平均响应时间 | 143ms | 18ms | 7.9x |
| P99 响应时间 | 289ms | 41ms | 7.0x |
| 并发50 QPS 下响应时间 | 628ms | 52ms | 12.1x |
| 内存占用(向量索引) | 59GB | 21GB | 节省 64% |
| CPU 使用率(50 QPS) | 85% | 23% | 大幅降低 |
Qdrant 在内存占用和查询速度上的优势非常明显,主要原因:
- Rust 的内存管理比 PG 的 C 更高效
- Qdrant 专为向量搜索优化,没有关系型数据库的通用开销
- Qdrant 的 HNSW 实现对多核并发优化更好
迁移过程中的坑
坑一:ID 类型
pgvector 里的 ID 是 BIGINT,Qdrant 的 ID 只支持 UUID 或无符号 64 位整数。
如果你的 PG ID 都是正数,可以直接用。如果有负数 ID 或者 UUID 格式的,要做转换,并且要记录映射关系。
坑二:批量写入的并发限制
Qdrant 默认的 gRPC message size 是 4MB,如果 batch_size 太大(比如 2000 条),每条向量 1536 维 = 6KB,2000 条 = 12MB,会超出限制报错。
我用的 batch_size=500,每批约 3MB,比较安全。
坑三:索引构建时间
Qdrant 在数据量超过 indexing_threshold(我设的 20000)之前不建 HNSW 索引,用全扫描。刚导入数据时查询很快(数据少),数据多了之后触发索引构建,这段时间查询会变慢,CPU 飙升。
解决办法:先完成全量数据导入,再调用 API 触发优化:
qdrant.update_collection(
collection_name="documents",
optimizer_config=OptimizersConfigDiff(indexing_threshold=0)
)
# 等待优化完成(可以轮询状态)
import time
while True:
info = qdrant.get_collection("documents")
if info.status == "green":
break
print(f"等待优化完成... 状态: {info.status}")
time.sleep(10)值不值得换
如果你现在用 pgvector,我的建议是:先不要换,除非遇到了实际问题。
pgvector 在 100 万条以内、低并发场景下完全够用,而且维护成本低。
当你开始遇到以下情况,就该认真考虑迁移了:
- P99 响应时间持续 > 300ms
- 向量索引把服务器内存用到 > 60%
- 高峰期并发查询让 PG CPU 持续 > 70%
迁移 Qdrant 的收益是真实的,但迁移本身也有工程成本(迁移脚本、验证、切流量、双写期的代码维护)。只有当问题足够痛,迁移才值得。
