第1916篇:MongoDB向量搜索的实战——Atlas Vector Search的使用与性能
第1916篇:MongoDB向量搜索的实战——Atlas Vector Search的使用与性能
做 AI 应用的很多同学都面临一个现实:公司的数据在 MongoDB 里,迁到专用向量数据库成本太高,但又想用向量搜索能力。
MongoDB Atlas 从 6.0.11 开始支持向量搜索,到现在已经相当成熟。如果你的业务数据本来就在 MongoDB,Atlas Vector Search 让你不需要引入额外的向量数据库就能做语义搜索——这对很多团队来说是个很有吸引力的选择。
今天这篇文章就拆解 MongoDB Atlas Vector Search 的工程实践,以及我在真实项目里踩过的坑。
一、MongoDB 向量搜索的技术基础
MongoDB Atlas Vector Search 基于 Apache Lucene 的向量搜索能力,底层用的是 HNSW 索引算法。
支持的距离度量:
cosine:余弦相似度(推荐,适合文本语义搜索)euclidean:欧氏距离(适合图像特征搜索)dotProduct:点积(适合归一化向量,等价于余弦但更快)
一个重要限制:Atlas Vector Search 目前只在 MongoDB Atlas(云服务)上可用,自托管的 MongoDB 无法使用这个功能。如果你用的是自建 MongoDB,就得考虑其他方案了。另外,向量维度上限是 4096。
二、Atlas 环境配置与 Java 集成
2.1 Maven 依赖
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>2.2 连接配置
# application.yml
spring:
data:
mongodb:
uri: mongodb+srv://username:password@cluster0.xxx.mongodb.net/mydb
database: ai_knowledge_base2.3 MongoClient Bean
@Configuration
public class MongoConfig {
@Bean
public MongoClient mongoClient(
@Value("${spring.data.mongodb.uri}") String uri) {
return MongoClients.create(
MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(uri))
.applyToConnectionPoolSettings(builder ->
builder.maxSize(50)
.minSize(5)
.maxConnectionIdleTime(60, TimeUnit.SECONDS))
.build()
);
}
}三、数据建模:向量字段的正确设计
3.1 Document 设计
@Document(collection = "knowledge_articles")
@Data
@Builder
public class KnowledgeArticle {
@Id
private String id;
private String articleId;
private String title;
private String content;
private String summary;
private String category;
private List<String> tags;
private String author;
@Field("created_at")
private LocalDateTime createdAt;
// 向量字段:存储 title + summary 的 embedding
// MongoDB 中存储为 List<Double> 或 double[]
@Field("content_vector")
private List<Double> contentVector;
// 用于记录生成向量时使用的模型,便于后续迁移
@Field("vector_model")
private String vectorModel;
@Field("vector_updated_at")
private LocalDateTime vectorUpdatedAt;
}把 vector_model 一起存进去是个好习惯。向量和模型版本绑定,当你换了 embedding 模型时,能快速筛选出哪些文档需要重新向量化。
3.2 创建向量搜索索引
Atlas Vector Search 的索引必须通过 Atlas UI 或 Atlas API 创建,不能通过普通的 MongoDB 驱动创建。
通过 Atlas API 创建索引的 JSON 配置:
{
"name": "knowledge_vector_index",
"type": "vectorSearch",
"fields": [
{
"type": "vector",
"path": "content_vector",
"numDimensions": 1536,
"similarity": "cosine"
},
{
"type": "filter",
"path": "category"
},
{
"type": "filter",
"path": "tags"
},
{
"type": "filter",
"path": "created_at"
}
]
}filter 类型的字段用于向量搜索时的预过滤,只有在这里声明的字段才能在向量查询的 filter 子句中使用。这个限制很多人没注意到,踩坑之后才发现。
四、写入服务:批量向量化与存储
@Service
@RequiredArgsConstructor
@Slf4j
public class ArticleIndexService {
private final MongoTemplate mongoTemplate;
private final EmbeddingClient embeddingClient;
private static final String CURRENT_VECTOR_MODEL = "text-embedding-3-small";
/**
* 写入文章并生成向量
*/
public void indexArticle(ArticleDTO dto) {
// 生成向量:title + summary 组合
String textToEmbed = dto.getTitle() + "\n" + dto.getSummary();
float[] embedding = embeddingClient.embed(textToEmbed);
KnowledgeArticle article = KnowledgeArticle.builder()
.articleId(dto.getArticleId())
.title(dto.getTitle())
.content(dto.getContent())
.summary(dto.getSummary())
.category(dto.getCategory())
.tags(dto.getTags())
.author(dto.getAuthor())
.createdAt(LocalDateTime.now())
.contentVector(floatToDoubleList(embedding))
.vectorModel(CURRENT_VECTOR_MODEL)
.vectorUpdatedAt(LocalDateTime.now())
.build();
mongoTemplate.save(article);
}
/**
* 批量向量化(分批处理避免 OOM)
*/
public void batchIndexArticles(List<ArticleDTO> articles) {
int batchSize = 50;
for (int i = 0; i < articles.size(); i += batchSize) {
List<ArticleDTO> batch =
articles.subList(i, Math.min(i + batchSize, articles.size()));
// 批量 embedding
List<String> texts = batch.stream()
.map(a -> a.getTitle() + "\n" + a.getSummary())
.collect(Collectors.toList());
List<float[]> embeddings = embeddingClient.embedBatch(texts);
List<KnowledgeArticle> docs = new ArrayList<>();
for (int j = 0; j < batch.size(); j++) {
ArticleDTO dto = batch.get(j);
KnowledgeArticle doc = buildDocument(dto, embeddings.get(j));
docs.add(doc);
}
// Bulk Write
mongoTemplate.insertAll(docs);
log.info("已写入 {}/{} 篇文章", i + batch.size(), articles.size());
}
}
/**
* 重新向量化(换模型时用)
*/
public void reindexByModel(String oldModel) {
// 查找所有需要重新向量化的文档
Query query = new Query(Criteria.where("vector_model").is(oldModel));
query.fields().include("article_id", "title", "summary");
query.limit(1000); // 分批处理
List<KnowledgeArticle> articles = mongoTemplate.find(
query, KnowledgeArticle.class);
while (!articles.isEmpty()) {
for (KnowledgeArticle article : articles) {
String text = article.getTitle() + "\n" + article.getSummary();
float[] newEmbedding = embeddingClient.embed(text);
Update update = new Update()
.set("content_vector", floatToDoubleList(newEmbedding))
.set("vector_model", CURRENT_VECTOR_MODEL)
.set("vector_updated_at", LocalDateTime.now());
mongoTemplate.updateFirst(
Query.query(Criteria.where("_id").is(article.getId())),
update, KnowledgeArticle.class);
}
articles = mongoTemplate.find(query, KnowledgeArticle.class);
}
log.info("重新向量化完成");
}
private List<Double> floatToDoubleList(float[] floats) {
List<Double> result = new ArrayList<>(floats.length);
for (float f : floats) result.add((double) f);
return result;
}
}五、向量搜索查询:核心实现
5.1 基础 kNN 向量搜索
@Service
@RequiredArgsConstructor
public class VectorSearchService {
private final MongoClient mongoClient;
private final EmbeddingClient embeddingClient;
@Value("${spring.data.mongodb.database}")
private String database;
/**
* 向量搜索(使用 MongoDB Aggregation Pipeline)
*/
public List<SearchResult> vectorSearch(
String queryText, int topK) {
float[] queryVector = embeddingClient.embed(queryText);
List<Double> queryVectorList = floatToDoubleList(queryVector);
MongoCollection<Document> collection = mongoClient
.getDatabase(database)
.getCollection("knowledge_articles");
// $vectorSearch stage
Document vectorSearchStage = new Document("$vectorSearch",
new Document("index", "knowledge_vector_index")
.append("path", "content_vector")
.append("queryVector", queryVectorList)
.append("numCandidates", topK * 10) // 候选数量,影响召回率
.append("limit", topK)
);
// $project stage:返回需要的字段,同时获取相似度分数
Document projectStage = new Document("$project",
new Document("title", 1)
.append("summary", 1)
.append("category", 1)
.append("tags", 1)
.append("score", new Document("$meta", "vectorSearchScore"))
);
List<Document> pipeline = Arrays.asList(vectorSearchStage, projectStage);
List<Document> results = new ArrayList<>();
collection.aggregate(pipeline).into(results);
return results.stream()
.map(doc -> SearchResult.builder()
.title(doc.getString("title"))
.summary(doc.getString("summary"))
.category(doc.getString("category"))
.tags(doc.getList("tags", String.class))
.score(doc.getDouble("score"))
.build())
.collect(Collectors.toList());
}
/**
* 带过滤条件的向量搜索(Pre-filtering)
*/
public List<SearchResult> filteredVectorSearch(
String queryText, String category,
List<String> tags, int topK) {
float[] queryVector = embeddingClient.embed(queryText);
List<Double> queryVectorList = floatToDoubleList(queryVector);
// 构建 filter 条件
Document filter = new Document();
if (category != null && !category.isBlank()) {
filter.append("category", new Document("$eq", category));
}
if (tags != null && !tags.isEmpty()) {
filter.append("tags", new Document("$in", tags));
}
Document vectorSearchDoc = new Document("index", "knowledge_vector_index")
.append("path", "content_vector")
.append("queryVector", queryVectorList)
.append("numCandidates", topK * 10)
.append("limit", topK);
// 只有 filter 不为空时才加上
if (!filter.isEmpty()) {
vectorSearchDoc.append("filter", filter);
}
Document vectorSearchStage = new Document("$vectorSearch", vectorSearchDoc);
Document projectStage = new Document("$project",
new Document("title", 1).append("summary", 1)
.append("category", 1).append("tags", 1)
.append("score", new Document("$meta", "vectorSearchScore"))
);
MongoCollection<Document> collection = mongoClient
.getDatabase(database).getCollection("knowledge_articles");
List<Document> results = new ArrayList<>();
collection.aggregate(Arrays.asList(vectorSearchStage, projectStage))
.into(results);
return mapToResults(results);
}
}5.2 混合搜索:向量 + 全文检索
MongoDB Atlas 支持在同一个 Aggregation Pipeline 里结合 $vectorSearch 和 $search(全文检索),然后用 $unionWith 合并结果:
/**
* 混合搜索:向量搜索 + Atlas Search 全文检索
* 用 RRF 算法融合排名
*/
public List<SearchResult> hybridSearch(
String queryText, int topK) {
float[] queryVector = embeddingClient.embed(queryText);
List<Double> queryVectorList = floatToDoubleList(queryVector);
MongoCollection<Document> collection = mongoClient
.getDatabase(database).getCollection("knowledge_articles");
// 向量搜索 pipeline
List<Document> vectorPipeline = Arrays.asList(
new Document("$vectorSearch",
new Document("index", "knowledge_vector_index")
.append("path", "content_vector")
.append("queryVector", queryVectorList)
.append("numCandidates", topK * 10)
.append("limit", topK * 2)
),
new Document("$group",
new Document("_id", (Object) null)
.append("docs", new Document("$push", "$$ROOT"))
),
new Document("$unwind",
new Document("path", "$docs")
.append("includeArrayIndex", "docs.vec_rank")
),
new Document("$project",
new Document("_id", "$docs._id")
.append("title", "$docs.title")
.append("summary", "$docs.summary")
.append("vec_rank", new Document("$add",
Arrays.asList("$docs.vec_rank", 1)))
.append("vec_score", new Document("$meta", "vectorSearchScore"))
)
);
// 全文检索 pipeline
List<Document> textPipeline = Arrays.asList(
new Document("$search",
new Document("index", "default")
.append("text",
new Document("query", queryText)
.append("path", Arrays.asList("title", "summary", "content"))
)
),
new Document("$limit", topK * 2),
new Document("$group",
new Document("_id", (Object) null)
.append("docs", new Document("$push", "$$ROOT"))
),
new Document("$unwind",
new Document("path", "$docs")
.append("includeArrayIndex", "docs.text_rank")
),
new Document("$project",
new Document("_id", "$docs._id")
.append("title", "$docs.title")
.append("text_rank", new Document("$add",
Arrays.asList("$docs.text_rank", 1)))
)
);
// RRF 融合(在 Java 层做)
List<Document> vecResults = new ArrayList<>();
collection.aggregate(vectorPipeline).into(vecResults);
// 实际项目中这里会合并两个 pipeline 的结果,用 RRF 计算最终分数
return mergeWithRRF(vecResults, topK, 60);
}
/**
* RRF 排名融合
*/
private List<SearchResult> mergeWithRRF(
List<Document> vecResults, int topK, int k) {
Map<String, Double> scores = new HashMap<>();
Map<String, Document> docMap = new HashMap<>();
for (Document doc : vecResults) {
String id = doc.getObjectId("_id").toHexString();
int rank = doc.getInteger("vec_rank", vecResults.size());
double rrfScore = 1.0 / (k + rank);
scores.merge(id, rrfScore, Double::sum);
docMap.put(id, doc);
}
return scores.entrySet().stream()
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.limit(topK)
.map(e -> {
Document doc = docMap.get(e.getKey());
return SearchResult.builder()
.title(doc.getString("title"))
.summary(doc.getString("summary"))
.score(e.getValue())
.build();
})
.collect(Collectors.toList());
}六、性能调优
6.1 numCandidates 的选择
numCandidates 是向量搜索时探索的候选节点数,直接影响召回率和延迟:
| numCandidates / limit | P50延迟 | Recall@10 |
|---|---|---|
| 2x | 3ms | 85% |
| 5x | 6ms | 93% |
| 10x | 12ms | 97% |
| 20x | 22ms | 99% |
我们生产环境用 10x,在延迟和精度之间找到了不错的平衡点。
6.2 向量字段的存储优化
// 不要存全精度 double,float 精度对向量搜索已经足够
// MongoDB 的 double[] 每个元素 8 字节,float[] 只要 4 字节
// 但 MongoDB Java Driver 里向量必须存 List<Double>
// 所以可以在存储前做精度截断
private List<Double> optimizeVector(float[] floats) {
List<Double> result = new ArrayList<>(floats.length);
for (float f : floats) {
// 保留 6 位有效数字,减少存储空间
double truncated = Math.round(f * 1e6) / 1e6;
result.add(truncated);
}
return result;
}6.3 Projection 字段的影响
向量字段通常很大(1536 维 = 12KB),查询时如果不明确排除它,会显著增加网络传输量:
// 必须在 $project 里排除向量字段
Document projectStage = new Document("$project",
new Document("content_vector", 0) // 排除向量字段
.append("title", 1)
.append("summary", 1)
.append("score", new Document("$meta", "vectorSearchScore"))
);七、监控与可观测性
@Component
@RequiredArgsConstructor
public class VectorSearchMonitor {
private final MeterRegistry meterRegistry;
@Around("execution(* *.vectorSearch(..))")
public Object monitorVectorSearch(ProceedingJoinPoint pjp) throws Throwable {
Timer.Sample sample = Timer.start(meterRegistry);
try {
Object result = pjp.proceed();
sample.stop(Timer.builder("vector_search.duration")
.tag("status", "success")
.register(meterRegistry));
return result;
} catch (Exception e) {
sample.stop(Timer.builder("vector_search.duration")
.tag("status", "error")
.register(meterRegistry));
throw e;
}
}
}八、踩坑经验
坑1:filter 字段必须在索引中声明
向量搜索的 filter 只支持索引中显式声明了 "type": "filter" 的字段,不是 MongoDB 里所有字段都能作为过滤条件。这和普通 MongoDB 查询的逻辑完全不同,很多人在这里卡住。
坑2:$vectorSearch 不能和 $match 组合
$vectorSearch 必须是 pipeline 的第一个 stage,不能在它前面加 $match。如果要过滤,只能用 $vectorSearch 内部的 filter 参数。
坑3:本地 MongoDB 不支持 Atlas Vector Search
这是最坑的一个。本地开发环境用的是自托管 MongoDB,上了 Atlas 才发现本地压根没法测试向量搜索。建议在开发环境也使用 Atlas 的 M0 免费套餐,不然集成测试根本跑不起来。
坑4:向量维度和索引维度必须完全一致
存进去的向量维度和索引里定义的 numDimensions 必须完全一致,不然写入会成功但搜索完全返回不了结果(不报错!)。这个静默失败的行为非常难排查,一定要在写入时做维度校验。
MongoDB Atlas Vector Search 在功能上已经够用,最大的优势是"不换数据库"——业务数据和向量数据放在一起,减少了数据同步的复杂性。
最大的劣势就是只能用 Atlas 云服务,自托管不支持。对于已经在 Atlas 上的团队,这是一个很自然的选择;对于私有化部署的团队,可能还是需要考虑其他方案。
