Python 实现向量相似度搜索——从零理解 HNSW,不用黑盒向量库
Python 实现向量相似度搜索——从零理解 HNSW,不用黑盒向量库
适读人群:想深入理解向量搜索原理的工程师、对 RAG 系统底层感兴趣的开发者 | 阅读时长:约15分钟 | 核心价值:拆开 HNSW 的黑盒,用 Python 实现一个简化版,然后讲清楚生产环境怎么用
用 Qdrant 或者 FAISS 做向量搜索,把数据灌进去,搜一下,结果出来了——这个流程我用了大半年,一直没有深入想过里面到底发生了什么。
直到有一次,我们的向量搜索在某个数据分布下准确率只有67%,我想调优,才发现自己根本不知道应该调哪个参数,因为我完全不懂底层算法。
然后我花了一周时间认真读了 HNSW 的论文,手写了一个简化版,才真正搞懂这些参数的含义。
为什么需要近似最近邻搜索
先理解问题背景。
给定一个 1536 维的向量(OpenAI embedding 的维度),在 100 万个向量里找最相似的 Top-10——这个问题的暴力解法是:
import numpy as np
def brute_force_search(query: np.ndarray, vectors: np.ndarray, top_k: int = 10):
"""暴力搜索:计算与所有向量的相似度,取最大的 top_k 个"""
# 计算余弦相似度:query 已经归一化的情况下,点积 = 余弦相似度
similarities = np.dot(vectors, query) # shape: (n_vectors,)
top_indices = np.argsort(similarities)[-top_k:][::-1]
return top_indices, similarities[top_indices]100 万个向量,每次搜索需要计算 100 万次点积,在 Python 里大约需要 1~2 秒。生产环境无法接受。
我们需要近似最近邻(ANN, Approximate Nearest Neighbor)搜索:牺牲一点准确性(比如找到的结果是"几乎是最近邻"而不是"精确最近邻"),换来几个数量级的速度提升。
HNSW(Hierarchical Navigable Small World)是目前最好的 ANN 算法之一。
HNSW 核心思想
HNSW 的灵感来自于"六度分隔"理论:世界上任意两个人,通过最多6个中间人就能联系上。
Small World 图: 图里的每个节点不仅连接"邻居"(相近的节点),也有少量"长程连接"(跨越远距离的节点)。这样从任意节点出发,只需要少数几跳就能到达图里任意位置。
Hierarchical(层次化): HNSW 把这个图做成多层结构:
- 底层(第0层):所有节点,只有短程连接
- 中间层:一部分节点,连接更稀疏,覆盖范围更大
- 顶层:很少的节点,相当于"远程高速通道"
搜索时从顶层开始,快速定位到大致区域,然后逐层细化,最终在底层找到精确结果。
简化版 HNSW 实现(帮助理解原理)
# 这是一个教学用的简化版,不考虑性能优化
import numpy as np
import heapq
from typing import Optional
import math
import random
class HNSWSimple:
def __init__(
self,
dim: int,
M: int = 16, # 每个节点的最大连接数
ef_construction: int = 200, # 构建时的候选集大小,越大越准但越慢
max_layers: int = 4,
):
self.dim = dim
self.M = M
self.ef_construction = ef_construction
self.max_layers = max_layers
self.ml = 1 / math.log(M) # 层概率因子
self.vectors = [] # 所有向量
self.layers = [] # 每层的邻居关系,layers[node_id] = {layer: [neighbor_ids]}
self.entry_point = None # 顶层入口节点
self.max_layer = -1 # 当前最高层
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8))
def _get_random_layer(self) -> int:
"""随机决定新节点插入到哪一层(概率随层数指数递减)"""
layer = 0
while random.random() < self.ml and layer < self.max_layers - 1:
layer += 1
return layer
def _search_layer(
self,
query: np.ndarray,
entry_points: list, # [(similarity, node_id)]
ef: int, # 候选集大小
layer: int,
) -> list:
"""在某一层里搜索最近邻"""
visited = set(id for _, id in entry_points)
candidates = [(-sim, id) for sim, id in entry_points] # 最小堆(用负相似度)
heapq.heapify(candidates)
W = list(entry_points) # 结果集(最大堆),保留最好的 ef 个
W.sort(reverse=True)
while candidates:
c_neg_sim, c = heapq.heappop(candidates)
c_sim = -c_neg_sim
worst_sim = W[-1][0] if len(W) >= ef else -1
if c_sim < worst_sim:
break
# 展开 c 的邻居
if c < len(self.layers) and layer in self.layers[c]:
for neighbor_id in self.layers[c][layer]:
if neighbor_id not in visited:
visited.add(neighbor_id)
neighbor_sim = self._cosine_similarity(query, self.vectors[neighbor_id])
if len(W) < ef or neighbor_sim > W[-1][0]:
heapq.heappush(candidates, (-neighbor_sim, neighbor_id))
W.append((neighbor_sim, neighbor_id))
W.sort(reverse=True)
if len(W) > ef:
W = W[:ef]
return W
def add(self, vector: np.ndarray) -> int:
"""插入一个向量"""
node_id = len(self.vectors)
self.vectors.append(vector.copy())
# 决定这个节点插入到哪一层
node_layer = self._get_random_layer()
# 初始化邻居关系
self.layers.append({l: [] for l in range(node_layer + 1)})
if self.entry_point is None:
self.entry_point = node_id
self.max_layer = node_layer
return node_id
entry_points = [(self._cosine_similarity(vector, self.vectors[self.entry_point]), self.entry_point)]
# 从顶层搜索到 node_layer+1,找到 entry point
for layer in range(self.max_layer, node_layer, -1):
entry_points = self._search_layer(vector, entry_points, ef=1, layer=layer)
entry_points = [entry_points[0]]
# 在 node_layer 到 0 层,连接邻居
for layer in range(min(node_layer, self.max_layer), -1, -1):
neighbors = self._search_layer(vector, entry_points, ef=self.ef_construction, layer=layer)
# 选择最多 M 个邻居
selected = neighbors[:self.M]
# 双向连接
for sim, neighbor_id in selected:
self.layers[node_id][layer].append(neighbor_id)
if len(self.layers[neighbor_id][layer]) < self.M * 2:
self.layers[neighbor_id][layer].append(node_id)
entry_points = selected
# 更新顶层
if node_layer > self.max_layer:
self.max_layer = node_layer
self.entry_point = node_id
return node_id
def search(self, query: np.ndarray, k: int = 10, ef: int = 50) -> list:
"""搜索最近邻,返回 [(similarity, node_id)]"""
if not self.vectors:
return []
entry_points = [(self._cosine_similarity(query, self.vectors[self.entry_point]), self.entry_point)]
# 从顶层往下搜
for layer in range(self.max_layer, 0, -1):
entry_points = self._search_layer(query, entry_points, ef=1, layer=layer)
entry_points = [entry_points[0]]
# 在底层精细搜索
results = self._search_layer(query, entry_points, ef=max(ef, k), layer=0)
return sorted(results, reverse=True)[:k]这个实现非常简化,不考虑性能,只是帮助理解算法流程。
踩坑实录一:M 和 ef_construction 对准确率的影响
理解了 HNSW 的结构之后,我终于明白了那个准确率只有 67% 的问题:
我们用 Qdrant 时,m=4(默认是16),ef_construct=50(默认是100)。为了节省内存把这两个参数调得很低,结果图的连接不够密集,搜索时容易"走错路",找到的不是真正最近邻。
参数含义:
M:每个节点的连接数,越大图越密,准确率越高,但内存更大ef_construction:构建索引时候选集大小,越大图质量越好,但构建越慢ef(或ef_search):搜索时候选集大小,越大越准,但越慢
调参方向:
# 准确率优先(召回率要求 > 95%):
m = 32
ef_construction = 200
ef_search = 100
# 性能优先(召回率可以接受 85%+):
m = 16
ef_construction = 100
ef_search = 50
# 验证召回率的方法:用暴力搜索作为 ground truth
def evaluate_recall(hnsw, vectors, query, k=10, ef=50):
# HNSW 结果
hnsw_results = set(id for _, id in hnsw.search(query, k=k, ef=ef))
# 暴力搜索结果(ground truth)
sims = np.dot(vectors, query)
true_results = set(np.argsort(sims)[-k:])
recall = len(hnsw_results & true_results) / k
return recall踩坑实录二:向量归一化
现象: 搜索结果不稳定,相似度分数有时候超过1,有时候为负,跟预期完全不符。
原因: 没有对向量做归一化。余弦相似度 = 点积 / (|A| × |B|),如果向量没有归一化,点积并不等于余弦相似度,搜索的实际上是欧氏距离而不是余弦相似度。
解法:
def normalize(v: np.ndarray) -> np.ndarray:
"""L2 归一化"""
norm = np.linalg.norm(v)
if norm == 0:
return v
return v / norm
# 插入前归一化
vector = normalize(embedding)
hnsw.add(vector)
# 查询前也要归一化
query_vector = normalize(query_embedding)
results = hnsw.search(query_vector, k=10)Qdrant 里要设置 distance=Distance.COSINE,它会自动处理归一化。FAISS 里用 IndexFlatIP(内积),配合手动归一化。
生产环境:用 Qdrant
教学版 HNSW 之后,生产环境当然要用 Qdrant 这类成熟的向量库:
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
HnswConfigDiff, OptimizersConfigDiff,
)
client = QdrantClient(url="http://localhost:6333")
# 创建集合,调整 HNSW 参数
client.create_collection(
collection_name="my_vectors",
vectors_config=VectorParams(
size=1536,
distance=Distance.COSINE,
),
hnsw_config=HnswConfigDiff(
m=16, # 连接数
ef_construct=100, # 构建候选集大小
full_scan_threshold=10000, # 向量数少于此值时用暴力搜索(更准)
),
optimizers_config=OptimizersConfigDiff(
memmap_threshold=100000, # 超过10万向量时用内存映射,节省内存
),
)
# 批量插入
def insert_vectors(texts: list[str], embeddings: list[list[float]], metadata: list[dict]):
points = [
PointStruct(
id=i,
vector=embedding,
payload={"text": text, **meta},
)
for i, (text, embedding, meta) in enumerate(zip(texts, embeddings, metadata))
]
# 分批插入,避免单次请求太大
batch_size = 100
for i in range(0, len(points), batch_size):
client.upsert(
collection_name="my_vectors",
points=points[i:i+batch_size],
)
# 搜索
def search_similar(query_embedding: list[float], top_k: int = 10, score_threshold: float = 0.7):
results = client.search(
collection_name="my_vectors",
query_vector=query_embedding,
limit=top_k,
score_threshold=score_threshold, # 只返回相似度超过阈值的结果
with_payload=True,
)
return [
{
"id": r.id,
"score": r.score,
"text": r.payload.get("text"),
"metadata": {k: v for k, v in r.payload.items() if k != "text"},
}
for r in results
]理解原理带来的实际收益
理解了 HNSW 之后,我把那个准确率 67% 的问题通过调整参数修复到了 94%,没有增加任何额外的硬件资源,只是把 m 从 4 改到了 16,ef_construction 从 50 改到了 200。
很多"黑盒"的调参问题,本质上是缺少对原理的理解。 知道 M 控制图的连接密度,知道 ef_construction 影响图的构建质量,参数调优就变成了有理论依据的决策,而不是瞎试。
