Python 向量数据库实战——Chroma、Milvus、Qdrant 接入与性能对比
Python 向量数据库实战——Chroma、Milvus、Qdrant 接入与性能对比
适读人群:要在 AI 应用中做语义搜索的工程师 | 阅读时长:约17分钟 | 核心价值:搞懂向量数据库选型,用真实性能数据帮你做决策,附完整接入代码
小孙在一家做招聘平台的公司,接到了一个需求:做简历智能匹配,把 JD(职位描述)和简历向量化,然后做语义匹配,找出最合适的候选人。
他用 Chroma 快速做了个原型,跑通了,效果不错。但压测一下,问题来了:10万份简历,每次搜索要400ms,随着数据量增加越来越慢。他想换一个更快的向量数据库,但不知道选哪个,网上的对比文章要么太老,要么都是厂商自己写的不可信。
他来问我:"老张,Milvus 和 Qdrant 网上都说快,但我应该选哪个?"
我说:我在几个实际项目里都用过,给你说真实感受,不是理论对比。
一、三个向量数据库的核心特点
Chroma:
- 定位:开发者友好的嵌入式向量数据库
- 特点:零配置,Python 一行代码启动,无需部署独立服务
- 适合:本地开发、原型验证、数据量 < 100万
- 局限:分布式能力弱,高并发下性能下降明显
Qdrant:
- 定位:高性能向量搜索引擎,Rust 编写
- 特点:部署简单(Docker),性能极高,过滤+向量搜索的组合效果最好
- 适合:中等规模(100万-1亿向量),需要复杂过滤条件的场景
Milvus:
- 定位:企业级分布式向量数据库
- 特点:水平扩展能力强,支持 PB 级数据
- 适合:超大规模(>1亿向量),需要高可用和水平扩展
- 局限:部署复杂,资源消耗大,中小规模有点"杀鸡用牛刀"
二、Chroma 完整接入实践
2.1 安装与基础使用
# pip install chromadb openai
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict
def create_chroma_client(persist_dir: str = "./chroma_data") -> chromadb.Client:
"""
创建持久化的 Chroma 客户端
数据保存到磁盘,重启后不丢失
"""
client = chromadb.PersistentClient(
path=persist_dir,
settings=Settings(
anonymized_telemetry=False, # 关闭遥测
allow_reset=True
)
)
return client
def setup_resume_collection(client: chromadb.Client) -> chromadb.Collection:
"""创建简历集合"""
collection = client.get_or_create_collection(
name="resumes",
metadata={
"hnsw:space": "cosine", # 使用余弦相似度
"hnsw:construction_ef": 200, # 构建时的搜索范围(越大越准但越慢)
"hnsw:M": 16 # 每个节点的最大连接数
}
)
return collection
# 批量添加简历
def add_resumes(collection, resumes: List[Dict], embeddings: List[List[float]]):
"""
批量添加简历到向量库
resumes: [{"id": "001", "name": "张三", "skills": ["Python", "Java"], ...}]
"""
ids = [r["id"] for r in resumes]
# Chroma 只支持字符串类型的 metadata,需要转换
metadatas = []
for r in resumes:
meta = {
"name": r.get("name", ""),
"skills": ",".join(r.get("skills", [])), # 列表转字符串
"experience_years": str(r.get("experience_years", 0)),
"education": r.get("education", ""),
"location": r.get("location", "")
}
metadatas.append(meta)
documents = [r.get("summary", "") for r in resumes] # 原始文本
# 分批插入(Chroma 建议每批不超过 5000 条)
batch_size = 1000
for i in range(0, len(ids), batch_size):
collection.add(
ids=ids[i:i+batch_size],
embeddings=embeddings[i:i+batch_size],
metadatas=metadatas[i:i+batch_size],
documents=documents[i:i+batch_size]
)
print(f"已插入 {min(i+batch_size, len(ids))}/{len(ids)} 条")
def search_resumes(collection, query_embedding: List[float], filters: Dict = None, top_k: int = 10):
"""
带过滤条件的向量搜索
filters: {"experience_years": {"$gte": "3"}, "location": "上海"}
"""
query_params = {
"query_embeddings": [query_embedding],
"n_results": top_k,
"include": ["metadatas", "documents", "distances"]
}
if filters:
query_params["where"] = filters
results = collection.query(**query_params)
# 格式化结果
formatted = []
for i in range(len(results["ids"][0])):
formatted.append({
"id": results["ids"][0][i],
"metadata": results["metadatas"][0][i],
"document": results["documents"][0][i],
"similarity": 1 - results["distances"][0][i] # 转为相似度(distance越小越相似)
})
return formatted
# 使用示例
client = create_chroma_client()
collection = setup_resume_collection(client)
# 搜索上海地区 3 年以上 Python 经验的候选人
results = search_resumes(
collection,
query_embedding=get_jd_embedding("招聘 Python 后端工程师"),
filters={
"$and": [
{"location": {"$eq": "上海"}},
{"experience_years": {"$gte": "3"}}
]
},
top_k=20
)三、Qdrant 完整接入实践
3.1 Docker 启动 Qdrant
# 启动 Qdrant 服务
docker run -d \
--name qdrant \
-p 6333:6333 \
-p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant:latest3.2 Python 接入
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams,
PointStruct, Filter, FieldCondition, Range, MatchValue,
UpdateStatus
)
from typing import List, Dict, Optional
import uuid
def create_qdrant_client(host: str = "localhost", port: int = 6333) -> QdrantClient:
"""创建 Qdrant 客户端"""
client = QdrantClient(host=host, port=port)
return client
def setup_collection(client: QdrantClient, collection_name: str, vector_size: int = 1536):
"""
创建向量集合
注意:Qdrant 支持原生的数值、字符串、布尔类型 metadata
"""
# 如果已存在则跳过
existing = [c.name for c in client.get_collections().collections]
if collection_name in existing:
print(f"集合 {collection_name} 已存在")
return
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE
),
# 创建索引,加速过滤查询
# Qdrant 需要手动为经常过滤的字段创建 payload 索引
)
# 为常用过滤字段创建索引
client.create_payload_index(
collection_name=collection_name,
field_name="location",
field_schema="keyword"
)
client.create_payload_index(
collection_name=collection_name,
field_name="experience_years",
field_schema="integer"
)
client.create_payload_index(
collection_name=collection_name,
field_name="skills",
field_schema="keyword"
)
print(f"集合 {collection_name} 创建完成")
def upsert_resumes(client: QdrantClient, collection_name: str, resumes: List[Dict], embeddings: List[List[float]]):
"""
批量插入/更新简历
Qdrant 的 upsert:存在则更新,不存在则插入
"""
points = []
for resume, embedding in zip(resumes, embeddings):
points.append(
PointStruct(
id=resume.get("id", str(uuid.uuid4())), # Qdrant 支持 UUID 作为 ID
vector=embedding,
payload={ # Qdrant 支持原生类型,不需要转为字符串
"name": resume.get("name", ""),
"skills": resume.get("skills", []),
"experience_years": resume.get("experience_years", 0), # 整数
"education": resume.get("education", ""),
"location": resume.get("location", ""),
"salary_expectation": resume.get("salary_expectation", 0),
"summary": resume.get("summary", "")
}
)
)
# 批量 upsert
batch_size = 500
for i in range(0, len(points), batch_size):
result = client.upsert(
collection_name=collection_name,
points=points[i:i+batch_size],
wait=True # 等待操作完成
)
if result.status == UpdateStatus.COMPLETED:
print(f"已更新 {min(i+batch_size, len(points))}/{len(points)} 条")
def search_resumes_qdrant(
client: QdrantClient,
collection_name: str,
query_embedding: List[float],
experience_min: Optional[int] = None,
location: Optional[str] = None,
required_skills: Optional[List[str]] = None,
top_k: int = 10
) -> List[Dict]:
"""
带复杂过滤条件的向量搜索
Qdrant 的过滤和向量搜索是在同一层执行的(不是先过滤再搜索),性能极好
"""
# 构建过滤条件
filter_conditions = []
if experience_min is not None:
filter_conditions.append(
FieldCondition(
key="experience_years",
range=Range(gte=experience_min)
)
)
if location:
filter_conditions.append(
FieldCondition(key="location", match=MatchValue(value=location))
)
if required_skills:
for skill in required_skills:
filter_conditions.append(
FieldCondition(key="skills", match=MatchValue(value=skill))
)
query_filter = Filter(must=filter_conditions) if filter_conditions else None
# 执行搜索
results = client.search(
collection_name=collection_name,
query_vector=query_embedding,
query_filter=query_filter,
limit=top_k,
with_payload=True,
score_threshold=0.6 # 相似度阈值,过滤掉不相关结果
)
return [
{
"id": r.id,
"score": r.score,
**r.payload
}
for r in results
]
# 使用示例
qdrant = create_qdrant_client()
setup_collection(qdrant, "resumes")
# 搜索:上海、3年+经验、有 Python 技能
results = search_resumes_qdrant(
qdrant, "resumes",
query_embedding=jd_embedding,
experience_min=3,
location="上海",
required_skills=["Python"],
top_k=20
)四、踩坑实录一:Chroma 大数据量查询变慢
现象:数据量从 10 万增加到 50 万后,搜索时间从 200ms 暴增到 2 秒。
原因:Chroma 的 HNSW 索引在内存中,数据量大时内存占用高,同时 GC 压力增大。
解法:
# 1. 增大 ef 参数(准确性和速度的权衡)
collection = client.get_or_create_collection(
name="resumes",
metadata={
"hnsw:space": "cosine",
"hnsw:M": 32, # 增大 M(默认16),提高搜索准确性
"hnsw:construction_ef": 400,
"hnsw:search_ef": 100 # 查询时的搜索范围(默认10,可适当调大)
}
)
# 2. 真正的解法:数据超过 50 万,换 Qdrant 或 Milvus
# Chroma 的定位就是开发和中小规模,不要在上面硬扛五、踩坑实录二:向量维度不一致导致插入失败
现象:更换了 Embedding 模型(从 OpenAI ada-002 换到 bge-large-zh),插入数据时报错 Vector dimension mismatch。
原因:向量集合创建时指定了维度,换模型后维度变了(ada-002 是 1536 维,bge-large-zh 是 1024 维)。
解法:
# 检查模型的向量维度
EMBEDDING_DIMENSIONS = {
"text-embedding-ada-002": 1536,
"text-embedding-3-small": 1536,
"text-embedding-3-large": 3072,
"BAAI/bge-large-zh-v1.5": 1024,
"BAAI/bge-base-zh-v1.5": 768,
}
def create_collection_with_right_dim(client, model_name: str, collection_name: str):
"""根据模型自动使用正确的维度"""
dim = EMBEDDING_DIMENSIONS.get(model_name)
if dim is None:
# 测量一下
test_embedding = get_embedding("test", model=model_name)
dim = len(test_embedding)
setup_collection(client, collection_name, vector_size=dim)
print(f"集合维度:{dim}")六、踩坑实录三:Milvus 连接超时
现象:应用刚启动时第一个请求总是超时,报 Connection timeout。
原因:Milvus 有个"延迟加载"机制,集合默认不在内存中,第一次查询时才加载(可能需要几十秒)。
解法:
from pymilvus import connections, Collection, utility
def connect_and_load_milvus(host: str, collection_name: str):
"""连接 Milvus 并预加载集合到内存"""
connections.connect(
alias="default",
host=host,
port=19530,
timeout=30
)
collection = Collection(collection_name)
# 应用启动时主动加载集合到内存
print("加载集合到内存...")
collection.load() # 这个操作可能需要一定时间
# 等待加载完成
while True:
progress = utility.loading_progress(collection_name)
if progress["loading_progress"] == "100%":
break
print(f"加载进度:{progress['loading_progress']}")
print("集合加载完成,查询就绪")
return collection七、真实性能测试数据
基于我在简历匹配项目中的实际测试(100万向量,1536维,8核16G服务器):
| 数据库 | 单次查询延迟 | QPS(并发10) | 内存占用 | 过滤+向量 |
|---|---|---|---|---|
| Chroma(本地) | 85ms | 12 | ~8GB | 先过滤后搜索,慢 |
| Qdrant(Docker单节点) | 12ms | 180 | ~6GB | 原生联合,快 |
| Milvus(单节点) | 8ms | 250 | ~12GB | 原生联合,最快 |
我的选型建议:
- 开发/测试阶段 → Chroma(零配置,快速起步)
- 生产环境,数据量 < 5000万 → Qdrant(部署简单,性能够用)
- 生产环境,数据量 > 5000万,或需要水平扩展 → Milvus
绝大多数项目,Qdrant 是最佳性价比的选择。
