混合检索实战——向量检索 + BM25 如何融合
混合检索实战——向量检索 + BM25 如何融合
适读人群:正在优化 RAG 检索质量的开发者 | 阅读时长:约 14 分钟 | 核心价值:用真实代码实现混合检索,讲清楚分数融合的坑
有一次客户给我发了一条反馈截图,用户问的是:"JIRA-2847 这个 bug 怎么解决的?"
系统给出的回答是一段关于通用 bug 处理流程的内容,完全没提 JIRA-2847。
问题显而易见:向量检索根本处理不好这种精确 ID 查询。"JIRA-2847"这个字符串,语义上跟什么都差不多,向量相似度匹配不到任何有用的东西。
但是知识库里确实有一篇文档,标题就是《JIRA-2847:支付回调超时问题修复记录》。如果用关键词检索,一秒就找到了。
这就是纯向量检索的硬伤,也是混合检索存在的理由。
向量检索和关键词检索,各自擅长什么
先把这两种方式的差异说清楚,不然后面的融合策略没法理解。
向量检索(Dense Retrieval)擅长:
- 语义相似:用户问"怎么付款",能找到"支付流程"
- 同义替换:用户说"报错",能匹配到"异常"、"error"
- 意图理解:问"系统变慢了",能关联到性能优化文档
BM25 关键词检索擅长:
- 精确匹配:JIRA ID、版本号、代码片段、人名
- 专有名词:不在向量模型训练集里的内部词汇
- 低频但关键的词:某个特定的函数名、配置项名称
两者的失败场景几乎互补。混合检索就是把两者结合起来,弥补各自的短板。
架构设计
一个完整的混合检索系统大概长这样:
用户查询同时走两条路:向量检索路径(Embedding + 向量数据库)和 BM25 路径(分词 + 倒排索引),两条路各自返回 Top-K 结果,然后做分数融合,最终输出合并排序后的结果。
BM25 实现
先把 BM25 这块搭起来。我用 rank_bm25 这个库,轻量、够用。
import jieba
from rank_bm25 import BM25Okapi
from typing import List, Dict, Any
import numpy as np
import pickle
import os
class BM25Retriever:
def __init__(self, use_chinese_tokenizer: bool = True):
self.bm25 = None
self.docs = []
self.use_chinese = use_chinese_tokenizer
def _tokenize(self, text: str) -> List[str]:
"""分词,中文用 jieba,英文直接按空格切"""
if self.use_chinese:
# jieba 分词 + 保留英文原词
tokens = list(jieba.cut(text))
# 过滤掉单个标点和空格
tokens = [t.strip() for t in tokens if t.strip() and len(t.strip()) > 0]
return tokens
else:
return text.lower().split()
def build_index(self, documents: List[Dict[str, Any]], text_field: str = "content"):
"""
documents: [{"id": "xxx", "content": "...", "metadata": {...}}, ...]
"""
self.docs = documents
tokenized_corpus = [self._tokenize(doc[text_field]) for doc in documents]
self.bm25 = BM25Okapi(tokenized_corpus)
print(f"BM25 索引构建完成,文档数: {len(documents)}")
def search(self, query: str, top_k: int = 10) -> List[Dict[str, Any]]:
if self.bm25 is None:
raise RuntimeError("索引未构建,请先调用 build_index")
tokenized_query = self._tokenize(query)
scores = self.bm25.get_scores(tokenized_query)
# 取 top_k 的索引
top_indices = np.argsort(scores)[::-1][:top_k]
results = []
for idx in top_indices:
if scores[idx] > 0: # 过滤掉分数为0的(完全不相关)
results.append({
"doc": self.docs[idx],
"score": float(scores[idx]),
"retriever": "bm25"
})
return results
def save(self, path: str):
"""持久化索引"""
with open(path, 'wb') as f:
pickle.dump({"bm25": self.bm25, "docs": self.docs,
"use_chinese": self.use_chinese}, f)
def load(self, path: str):
"""加载索引"""
with open(path, 'rb') as f:
data = pickle.load(f)
self.bm25 = data["bm25"]
self.docs = data["docs"]
self.use_chinese = data["use_chinese"]注意中文场景下分词质量很关键。jieba 默认词典对专业术语识别不好,如果你的文档里有大量专有词汇,要加自定义词典:
# 在 BM25Retriever 初始化前加载自定义词典
jieba.load_userdict("/path/to/custom_dict.txt")
# 自定义词典格式:每行一个词,可以加词频和词性
# 词典文件内容示例:
# 微服务架构 100 n
# JIRA-2847 100 n
# Spring AI 100 n向量检索实现
向量检索部分用 pgvector 或者 Qdrant 都行,这里用 pgvector 举例:
import psycopg2
from pgvector.psycopg2 import register_vector
import numpy as np
from openai import OpenAI
from typing import List, Dict, Any
class VectorRetriever:
def __init__(self, db_config: dict, table_name: str = "documents"):
self.db_config = db_config
self.table_name = table_name
self.client = OpenAI()
self._conn = None
def _get_conn(self):
if self._conn is None or self._conn.closed:
self._conn = psycopg2.connect(**self.db_config)
register_vector(self._conn)
return self._conn
def embed_query(self, text: str) -> np.ndarray:
response = self.client.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return np.array(response.data[0].embedding)
def search(self, query: str, top_k: int = 10) -> List[Dict[str, Any]]:
query_vector = self.embed_query(query)
conn = self._get_conn()
with conn.cursor() as cur:
# 使用余弦相似度,返回相似度分数
cur.execute(f"""
SELECT
id,
content,
metadata,
1 - (embedding <=> %s::vector) AS similarity_score
FROM {self.table_name}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_vector.tolist(), query_vector.tolist(), top_k))
rows = cur.fetchall()
results = []
for row in rows:
results.append({
"doc": {
"id": row[0],
"content": row[1],
"metadata": row[2]
},
"score": float(row[3]),
"retriever": "vector"
})
return results分数融合——最关键的部分
这里有三种常见方案,我都实测过。
方案一:RRF(Reciprocal Rank Fusion)
RRF 不用管两个检索器返回的分数是什么量纲,只看排名位置。
def reciprocal_rank_fusion(
result_lists: List[List[Dict]],
k: int = 60,
weights: List[float] = None
) -> List[Dict]:
"""
result_lists: 多个检索器的结果列表,每个列表按相关性排序
k: RRF 参数,通常取 60,越大对低排名的惩罚越小
weights: 每个检索器的权重,None 表示等权
"""
if weights is None:
weights = [1.0] * len(result_lists)
# 用文档ID作为key,累积RRF分数
rrf_scores: Dict[str, float] = {}
doc_map: Dict[str, Dict] = {}
for list_idx, results in enumerate(result_lists):
w = weights[list_idx]
for rank, result in enumerate(results):
doc_id = result["doc"]["id"]
# RRF 公式:1 / (k + rank)
rrf_score = w * (1.0 / (k + rank + 1)) # rank 从0开始,+1避免除以k
if doc_id not in rrf_scores:
rrf_scores[doc_id] = 0.0
doc_map[doc_id] = result["doc"]
rrf_scores[doc_id] += rrf_score
# 按 RRF 分数排序
sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
return [
{
"doc": doc_map[doc_id],
"score": rrf_scores[doc_id],
"fusion_method": "rrf"
}
for doc_id in sorted_ids
]RRF 的优点是稳定、不受原始分数量纲影响。缺点是丢失了分数的强弱信息——向量相似度 0.98 的结果和 0.71 的结果,在 RRF 里只差一个排名位置。
方案二:归一化线性加权
把两个检索器的分数归一化到 [0,1],然后加权求和:
def normalized_linear_fusion(
vector_results: List[Dict],
bm25_results: List[Dict],
vector_weight: float = 0.7,
bm25_weight: float = 0.3
) -> List[Dict]:
"""
对分数做 min-max 归一化后加权融合
"""
def normalize_scores(results: List[Dict]) -> Dict[str, float]:
if not results:
return {}
scores = [r["score"] for r in results]
min_s, max_s = min(scores), max(scores)
if max_s == min_s:
return {r["doc"]["id"]: 0.5 for r in results}
return {
r["doc"]["id"]: (r["score"] - min_s) / (max_s - min_s)
for r in results
}
vector_norm = normalize_scores(vector_results)
bm25_norm = normalize_scores(bm25_results)
all_ids = set(vector_norm.keys()) | set(bm25_norm.keys())
doc_map = {}
for r in vector_results + bm25_results:
doc_map[r["doc"]["id"]] = r["doc"]
fused = []
for doc_id in all_ids:
v_score = vector_norm.get(doc_id, 0.0)
b_score = bm25_norm.get(doc_id, 0.0)
final_score = vector_weight * v_score + bm25_weight * b_score
fused.append({
"doc": doc_map[doc_id],
"score": final_score,
"vector_score": v_score,
"bm25_score": b_score,
"fusion_method": "weighted"
})
return sorted(fused, key=lambda x: x["score"], reverse=True)这个方案的问题是归一化会受极端值影响。如果 BM25 有一个分数特别高的结果(比如完全精确匹配),归一化后其他结果的分数就都被压低了。
方案三:我实际用的方案——动态权重 RRF
根据查询特征动态调整两个检索器的权重:
import re
def detect_query_type(query: str) -> str:
"""
判断查询类型,决定偏向哪种检索方式
"""
# 精确 ID 类:JIRA-xxx, PR-xxx, 版本号, 函数名等
exact_patterns = [
r'[A-Z]+-\d+', # JIRA-1234, TASK-567
r'v\d+\.\d+', # v1.0, v2.3.1
r'[a-z]+\(\)', # 函数调用
r'"[^"]{3,}"', # 带引号的精确词
r'\b[A-Z][A-Z0-9_]{4,}\b' # 全大写缩写词
]
for pattern in exact_patterns:
if re.search(pattern, query):
return "exact"
# 短查询通常是关键词查询
if len(query) < 15:
return "keyword"
# 疑问句通常是语义查询
if any(query.startswith(w) for w in ["怎么", "如何", "为什么", "什么是", "how", "why", "what"]):
return "semantic"
return "mixed"
def adaptive_hybrid_search(
query: str,
vector_retriever: VectorRetriever,
bm25_retriever: BM25Retriever,
top_k: int = 10
) -> List[Dict]:
"""自适应混合检索"""
query_type = detect_query_type(query)
# 根据查询类型设置权重
weight_config = {
"exact": {"vector": 0.3, "bm25": 0.7}, # 精确查询重BM25
"keyword": {"vector": 0.4, "bm25": 0.6},
"semantic": {"vector": 0.8, "bm25": 0.2}, # 语义查询重向量
"mixed": {"vector": 0.6, "bm25": 0.4},
}
weights = weight_config[query_type]
vector_results = vector_retriever.search(query, top_k=top_k * 2)
bm25_results = bm25_retriever.search(query, top_k=top_k * 2)
# 用 RRF 融合,传入权重
fused = reciprocal_rank_fusion(
[vector_results, bm25_results],
weights=[weights["vector"], weights["bm25"]]
)
return fused[:top_k]实测效果
在一个内部技术知识库项目(约 12000 个文档 chunk)上的测试数据:
| 检索方案 | 召回率@5 | 召回率@10 | NDCG@10 |
|---|---|---|---|
| 纯向量检索 | 71.2% | 78.6% | 0.68 |
| 纯 BM25 | 63.4% | 71.1% | 0.61 |
| 等权 RRF | 77.8% | 84.3% | 0.74 |
| 动态权重 RRF | 80.5% | 86.7% | 0.77 |
精确 ID 类查询(占总查询约 15%)的单独对比更明显:
| 方案 | 精确 ID 查询召回率@5 |
|---|---|
| 纯向量检索 | 42.1% |
| 纯 BM25 | 91.3% |
| 动态权重 RRF | 89.6% |
动态权重的方案在精确查询上接近纯 BM25 的效果,在语义查询上接近纯向量检索的效果,综合表现最好。
部署时要注意的一个坑
BM25 索引是内存里的数据结构,重启服务后要重建。如果你的文档量不大(几万个 chunk 以内),加载时间可以接受;如果文档量大,需要考虑持久化方案。
另外,文档增量更新时,BM25 索引要全量重建(rank_bm25 不支持增量更新)。我们的做法是:每晚凌晨全量重建一次,白天新增的文档先只走向量检索,第二天生效。这个时间差可以接受,但要看你的业务对实时性的要求。
向量检索这边就好处理很多,支持实时插入。
