向量数据库生产实战——Milvus 集群部署、索引优化、性能调优
向量数据库生产实战——Milvus 集群部署、索引优化、性能调优
适读人群:AI 工程师、DBA、架构师 | 阅读时长:约20分钟 | 核心价值:Milvus 从单机到集群的完整生产部署方案,含真实性能数据
年初接了一个项目,客户要求向量检索 P99 延迟不超过 50ms,数据量 1亿条向量,并发 500 QPS。
开始用的 pgvector,512 维向量,1000 万条时 P99 就到了 120ms,完全不够用。换 Milvus 集群之后,1亿条数据 P99 延迟 28ms,轻松满足要求。
这篇文章把这次优化的完整过程记录下来。
向量数据库选型
先说为什么选 Milvus 而不是其他方案:
| 特点 | pgvector | Milvus | Qdrant | Weaviate |
|---|---|---|---|---|
| 数据规模 | <1000万 | 亿级 | 千万级 | 千万级 |
| 查询延迟 | 一般 | 优秀 | 优秀 | 良好 |
| 集群支持 | 有限 | 原生支持 | 支持 | 支持 |
| 运维复杂度 | 低(PostgreSQL) | 高 | 中 | 中 |
| 成熟度 | 高 | 高 | 中 | 中 |
如果数据量在 1000 万以内,pgvector 完全够用,不需要引入 Milvus。超过这个量级,Milvus 是更合适的选择。
Milvus 核心概念
- Collection:类似关系数据库的表
- Segment:数据分片,Milvus 的基本存储单元
- Index:向量索引,决定搜索性能
- Partition:Collection 内的分区,用于数据隔离和加速查询
- Shard:分片数,决定写入并发能力
单机部署(开发/测试)
# docker-compose.yml
version: '3.8'
services:
etcd:
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- /data/milvus/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
minio:
image: minio/minio:RELEASE.2023-03-13T19-46-17Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- /data/milvus/minio:/minio_data
command: minio server /minio_data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
milvus-standalone:
image: milvusdb/milvus:v2.4.0
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- /data/milvus/data:/var/lib/milvus
ports:
- "19530:19530"
- "9091:9091" # metrics
depends_on:
- etcd
- minio生产集群部署
生产环境推荐 Kubernetes 部署,用官方 Helm Chart:
# 添加 Milvus Helm 仓库
helm repo add milvus https://zilliztech.github.io/milvus-helm/
helm repo update
# 创建 values.yaml(生产配置)
cat > milvus-values.yaml << 'EOF'
cluster:
enabled: true
# 组件副本数
proxy:
replicas: 2
resources:
requests:
cpu: 2
memory: 4Gi
limits:
cpu: 4
memory: 8Gi
queryNode:
replicas: 4 # 查询节点,影响查询性能
resources:
requests:
cpu: 4
memory: 16Gi
limits:
cpu: 8
memory: 32Gi
dataNode:
replicas: 2
resources:
requests:
cpu: 2
memory: 8Gi
indexNode:
replicas: 2 # 索引构建节点
resources:
requests:
cpu: 8 # 索引构建 CPU 密集
memory: 16Gi
# 外部 etcd
externalEtcd:
enabled: true
endpoints:
- etcd-0.etcd:2379
- etcd-1.etcd:2379
- etcd-2.etcd:2379
# 外部 MinIO/S3
externalS3:
enabled: true
host: your-minio-host
port: 9000
accessKey: your-access-key
secretKey: your-secret-key
bucketName: milvus-data
EOF
# 部署
helm install milvus milvus/milvus \
--namespace milvus \
--create-namespace \
-f milvus-values.yamlCollection 和索引设计
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
# 连接
connections.connect("default", host="milvus-proxy.milvus.svc", port="19530")
# 定义 Schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=32),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000),
FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=1536 # text-embedding-3-small 维度
)
]
schema = CollectionSchema(
fields=fields,
description="知识库文档向量",
enable_dynamic_field=True # 允许动态字段,方便扩展元数据
)
# 创建 Collection
# num_shards:写入分片数,一般设为 CPU 核数或 dataNode 节点数
collection = Collection(
name="knowledge_base",
schema=schema,
num_shards=4
)
# 创建向量索引
# HNSW 是最常用的高性能索引,适合大多数场景
index_params = {
"index_type": "HNSW",
"metric_type": "COSINE", # 文本 Embedding 用余弦相似度
"params": {
"M": 16, # 每个节点的连接数,越大精度越高但内存和构建时间增加
"efConstruction": 200 # 索引构建时的探索范围,越大精度越高
}
}
collection.create_index("embedding", index_params)
# 创建标量索引(用于过滤)
collection.create_index("category", index_name="category_index")
collection.create_index("doc_id", index_name="doc_id_index")
# 加载到内存(查询前必须 load)
collection.load()
print("Collection 创建完成")
print(f"向量数量:{collection.num_entities}")批量插入和查询
import numpy as np
from pymilvus import Collection
collection = Collection("knowledge_base")
def batch_insert(documents: list[dict], batch_size: int = 1000):
"""批量插入,每批 1000 条"""
total = len(documents)
inserted = 0
for i in range(0, total, batch_size):
batch = documents[i:i+batch_size]
data = [
[doc["doc_id"] for doc in batch],
[doc["category"] for doc in batch],
[doc["content"] for doc in batch],
[doc["embedding"] for doc in batch], # list of float vectors
]
collection.insert(data)
inserted += len(batch)
# 每 10 批 flush 一次
if inserted % (batch_size * 10) == 0:
collection.flush()
print(f"已插入 {inserted}/{total}")
collection.flush() # 最终 flush
print(f"插入完成,总计 {inserted} 条")
def search(query_embedding: list[float],
category: str = None,
top_k: int = 5) -> list[dict]:
"""向量检索,支持标量过滤"""
search_params = {
"metric_type": "COSINE",
"params": {
"ef": 100 # 搜索时的探索范围,越大精度越高但越慢
}
}
# 构建过滤表达式
expr = f'category == "{category}"' if category else None
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=top_k,
expr=expr,
output_fields=["doc_id", "category", "content"]
)
hits = []
for hit in results[0]:
hits.append({
"id": hit.id,
"score": hit.score,
"doc_id": hit.entity.get("doc_id"),
"category": hit.entity.get("category"),
"content": hit.entity.get("content")
})
return hits性能调优
关键参数调优:
# 查询时的 ef 参数对性能影响最大
# ef=64:召回率约 95%,延迟 15ms
# ef=128:召回率约 98%,延迟 28ms
# ef=256:召回率约 99%,延迟 55ms
# 根据业务对准确率和延迟的要求选择
search_params_fast = {"metric_type": "COSINE", "params": {"ef": 64}} # 延迟优先
search_params_accurate = {"metric_type": "COSINE", "params": {"ef": 256}} # 精度优先Partition 优化:
# 对于有明显分类的数据,用 Partition 减少搜索范围
collection.create_partition("tech_docs")
collection.create_partition("product_docs")
collection.create_partition("faq_docs")
# 插入时指定分区
collection.insert(data, partition_name="tech_docs")
# 搜索时只搜指定分区,速度提升 3-5 倍(取决于分区数量和数据分布)
collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=5,
partition_names=["tech_docs"] # 只搜这个分区
)踩坑实录
坑一:Collection 未 Load 就查询
现象:部署后查询接口一直报 collection not loaded。
原因:Milvus 的 Collection 需要显式加载到内存才能查询,服务重启后加载状态不保留。
解法:在应用启动时检查并加载:
@PostConstruct # Spring 应用启动时执行(Java)
def ensure_loaded():
collection = Collection("knowledge_base")
load_state = utility.load_state("knowledge_base")
if load_state != LoadState.Loaded:
collection.load()
# 等待加载完成
utility.wait_for_loading_complete("knowledge_base", timeout=300)
print("Collection 已加载")坑二:大批量插入后查询不到数据
现象:插入了 10 万条数据,查询时找不到刚插入的内容。
原因:没有调用 flush(),数据还在内存 buffer 里,没有持久化到 Segment,查询时不可见。
解法:大批量插入后必须 flush,或者定时 flush:
collection.insert(data)
collection.flush() # 强制持久化,插入后立即可查坑三:内存不足导致查询 OOM
现象:查询节点频繁 OOM 重启,同时在线查询失败率高。
原因:queryNode 内存配置不足,大量 Collection load 后内存耗尽。
解法:
- 增加 queryNode 内存配置(推荐向量数 × 维度 × 4字节 × 3 作为估算值)
- 不需要实时查询的 Collection 及时
release() - 开启 Memory Mapping(MMap)减少内存占用:
# 开启 MMap,用磁盘换内存
collection.set_properties({"mmap.enabled": True})性能基准(实测数据)
环境:4个 queryNode(各 8核32GB),HNSW 索引,1亿条 1536维向量
| 场景 | ef | QPS | P50 | P99 |
|---|---|---|---|---|
| 纯向量检索 | 64 | 850 | 12ms | 28ms |
| 向量 + 标量过滤 | 64 | 620 | 18ms | 42ms |
| 向量 + Partition | 64 | 1200 | 8ms | 19ms |
| 高精度模式 | 256 | 280 | 45ms | 95ms |
Partition 策略效果显著,强烈建议在数据有明显分类属性时使用。
数据删除与更新的正确姿势
向量数据库里的数据删除和更新,是很多工程师处理不好的地方。
删除操作:
from pymilvus import Collection
collection = Collection("knowledge_base")
# 方式一:按主键删除(需要知道 ID)
collection.delete(expr="id in [123456, 789012]")
# 方式二:按元数据条件删除(更常用)
# 删除某个文档的所有向量(文档更新时用)
collection.delete(expr=f'doc_id == "{doc_id}"')
# 删除某个分类的所有向量
collection.delete(expr='category == "outdated_products"')
# 删除后 flush,确保持久化
collection.flush()注意:Milvus 的删除操作是软删除,并不立刻释放磁盘空间。需要定期做 Compaction 来回收空间:
# 手动触发 Compaction(通常不需要,Milvus 会自动做)
collection.compact()
# 等待 Compaction 完成
import time
while collection.get_compaction_state().state != CompactionState.Completed:
time.sleep(1)"更新"操作:
Milvus 不支持直接更新向量,必须先删后插:
def update_document(doc_id: str, new_content: str, new_embedding: list, metadata: dict):
"""更新文档"""
collection = Collection("knowledge_base")
# 1. 删除旧向量
collection.delete(expr=f'doc_id == "{doc_id}"')
collection.flush()
# 2. 插入新向量
collection.insert([
[metadata.get("title", "")], # 标题
[doc_id], # doc_id
[new_content], # 内容
[new_embedding] # 向量
])
collection.flush()备份与容灾
生产环境的 Milvus 集群一定要有备份方案,向量数据构建成本很高(Embedding 计算费时费钱),丢失了重建代价很大。
MinIO 备份(Milvus 数据存在 MinIO/S3 中):
# 配置 MinIO Client
mc alias set myminio http://minio:9000 minioadmin minioadmin
# 创建备份桶
mc mb myminio/milvus-backup
# 定时备份(crontab,每天凌晨 3 点)
0 3 * * * mc cp --recursive myminio/milvus-data/ myminio/milvus-backup/$(date +%Y%m%d)/使用 Milvus Backup 工具(推荐):
Milvus 官方提供了 milvus-backup 工具,支持 Collection 级别的备份和恢复:
# 安装
wget https://github.com/zilliztech/milvus-backup/releases/latest/download/milvus-backup_linux_amd64.tar.gz
tar -xzf milvus-backup_linux_amd64.tar.gz
# 备份指定 Collection
./milvus-backup create --collections knowledge_base,product_vectors --storage s3 --bucket milvus-backup --backup-name daily_backup_$(date +%Y%m%d)
# 恢复
./milvus-backup restore --backup-name daily_backup_20250115 --collection knowledge_baseMilvus 与 pgvector 的迁移
如果你的系统从 pgvector 成长到需要迁移到 Milvus,迁移过程是这样的:
import psycopg2
from pymilvus import Collection, connections
# 1. 连接 PostgreSQL(pgvector)
pg_conn = psycopg2.connect(your_pg_dsn)
cursor = pg_conn.cursor()
# 2. 连接 Milvus
connections.connect("default", host="milvus-host", port="19530")
milvus_collection = Collection("knowledge_base")
# 3. 批量迁移
batch_size = 1000
offset = 0
while True:
cursor.execute(
"SELECT id, doc_id, content, embedding FROM documents LIMIT %s OFFSET %s",
(batch_size, offset)
)
rows = cursor.fetchall()
if not rows:
break
# 插入 Milvus
doc_ids = [row[1] for row in rows]
contents = [row[2] for row in rows]
embeddings = [row[3].tolist() for row in rows] # pgvector 格式转 list
milvus_collection.insert([doc_ids, contents, embeddings])
offset += batch_size
print(f"已迁移 {offset} 条")
milvus_collection.flush()
print("迁移完成")迁移建议:先并行运行两个系统,新数据同时写入 pgvector 和 Milvus,线上流量先切 10% 到 Milvus 观察效果,没问题再逐步切全量。不要一次性全切,风险太高。
