第1706篇:Spring Data与向量数据库的集成——自定义Repository支持向量查询
第1706篇:Spring Data与向量数据库的集成——自定义Repository支持向量查询
做RAG的时候,向量数据库的选型和集成是个让很多人头疼的事。
我见过不少项目,直接用HTTP客户端裸调向量数据库的REST API,然后把这些代码散落在Service层各处。代码量不多的时候还好,一旦项目规模上来,向量查询和普通SQL查询混在一起,维护起来很混乱。
Spring Data的设计理念是:统一数据访问抽象,让你切换底层数据库时,业务代码几乎不需要改动。这个理念同样可以用在向量数据库上。今天来聊聊怎么用Spring Data的方式来封装向量查询,让向量操作和普通Repository操作风格一致。
一、先理解向量数据库的访问模式
向量数据库的操作和关系型数据库有本质区别:
关系型数据库(精确查询):
SELECT * FROM documents WHERE category = 'tech' AND created_at > '2024-01-01'向量数据库(相似性查询):
给我找10条和这个向量最相似的文档,附加过滤条件:category = 'tech'向量查询的核心参数:
- 查询向量:通常是把用户问题转成embedding向量
- Top-K:返回最相似的K条记录
- 相似度阈值:过滤掉相似度低于阈值的结果
- 元数据过滤:在向量相似的基础上再加业务过滤条件
Spring AI内置了对多个向量数据库的支持(Chroma、Pinecone、Qdrant、Milvus、pgvector等),并且提供了统一的 VectorStore 接口。但原生的 VectorStore 接口比较简单,不够灵活。今天我们在它的基础上自己封装一层。
二、基础:Spring AI的VectorStore接口
先看看Spring AI原生的 VectorStore 接口长什么样:
// Spring AI的VectorStore接口(简化版)
public interface VectorStore {
void add(List<Document> documents);
Optional<Boolean> delete(List<String> idList);
List<Document> similaritySearch(String query);
List<Document> similaritySearch(SearchRequest request);
}
// 使用
List<Document> results = vectorStore.similaritySearch(
SearchRequest.query("Java编程技巧")
.withTopK(5)
.withSimilarityThreshold(0.7)
.withFilterExpression("category == 'java'")
);这个接口够用,但不够"Spring Data"。我希望能有这样的用法:
// 我想要的用法
@Repository
public interface KnowledgeBaseRepository extends VectorRepository<KnowledgeDoc, String> {
List<KnowledgeDoc> findTop10BySimilarityTo(float[] queryVector);
List<KnowledgeDoc> findByCategoryAndSimilarityTo(String category, float[] queryVector);
Page<KnowledgeDoc> searchByKeyword(String keyword, float[] queryVector, Pageable pageable);
}做到这个需要自己实现一些基础设施,但并不复杂。
三、设计自定义VectorRepository接口
首先定义一个基础的向量仓库接口:
// 文档实体
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class KnowledgeDoc {
private String id;
private String content;
private String category;
private String source;
private List<String> tags;
private float[] embedding; // 向量
private double similarityScore; // 查询时的相似度分数
private Instant createdAt;
private Map<String, Object> metadata;
}
// 向量查询请求(Builder模式)
public record VectorSearchRequest(
float[] queryVector,
int topK,
double similarityThreshold,
Map<String, Object> filters,
boolean includeMetadata
) {
public VectorSearchRequest {
Objects.requireNonNull(queryVector, "queryVector required");
if (topK <= 0) topK = 10;
if (similarityThreshold < 0 || similarityThreshold > 1) {
similarityThreshold = 0.7;
}
filters = filters != null ? Map.copyOf(filters) : Map.of();
}
public static Builder builder(float[] queryVector) {
return new Builder(queryVector);
}
public static class Builder {
private final float[] queryVector;
private int topK = 10;
private double similarityThreshold = 0.7;
private Map<String, Object> filters = new HashMap<>();
private boolean includeMetadata = true;
private Builder(float[] queryVector) {
this.queryVector = queryVector;
}
public Builder topK(int topK) { this.topK = topK; return this; }
public Builder threshold(double t) { this.similarityThreshold = t; return this; }
public Builder filter(String key, Object value) {
this.filters.put(key, value); return this;
}
public Builder excludeMetadata() { this.includeMetadata = false; return this; }
public VectorSearchRequest build() {
return new VectorSearchRequest(
queryVector, topK, similarityThreshold, filters, includeMetadata);
}
}
}
// 向量搜索结果
public record VectorSearchResult<T>(
T document,
double score,
int rank
) {}
// 基础向量仓库接口
public interface VectorRepository<T, ID> {
// 存储
void save(T document);
void saveAll(List<T> documents);
// 删除
void deleteById(ID id);
void deleteByIds(List<ID> ids);
// 向量相似搜索
List<VectorSearchResult<T>> search(VectorSearchRequest request);
// 简便方法
default List<VectorSearchResult<T>> search(float[] queryVector, int topK) {
return search(VectorSearchRequest.builder(queryVector).topK(topK).build());
}
// 带过滤的搜索
default List<VectorSearchResult<T>> searchWithFilter(
float[] queryVector, int topK, Map<String, Object> filters) {
VectorSearchRequest.Builder builder = VectorSearchRequest.builder(queryVector).topK(topK);
filters.forEach(builder::filter);
return search(builder.build());
}
// 统计
long count();
boolean existsById(ID id);
}四、基于Spring AI VectorStore的实现
实现这个接口,底层对接Spring AI的VectorStore:
@Repository
public class KnowledgeDocRepository implements VectorRepository<KnowledgeDoc, String> {
private final VectorStore vectorStore;
private final EmbeddingModel embeddingModel;
private final ObjectMapper objectMapper;
// 元数据字段名常量
private static final String FIELD_CATEGORY = "category";
private static final String FIELD_SOURCE = "source";
private static final String FIELD_TAGS = "tags";
private static final String FIELD_CREATED_AT = "createdAt";
@Override
public void save(KnowledgeDoc doc) {
vectorStore.add(List.of(toDocument(doc)));
}
@Override
public void saveAll(List<KnowledgeDoc> docs) {
List<Document> documents = docs.stream()
.map(this::toDocument)
.toList();
vectorStore.add(documents);
}
@Override
public void deleteById(String id) {
vectorStore.delete(List.of(id));
}
@Override
public void deleteByIds(List<String> ids) {
vectorStore.delete(ids);
}
@Override
public List<VectorSearchResult<KnowledgeDoc>> search(VectorSearchRequest request) {
// 构建Spring AI的SearchRequest
SearchRequest.Builder searchBuilder = SearchRequest.defaults()
.withTopK(request.topK())
.withSimilarityThreshold(request.similarityThreshold());
// 构建过滤表达式
if (!request.filters().isEmpty()) {
String filterExpr = buildFilterExpression(request.filters());
searchBuilder.withFilterExpression(filterExpr);
}
// 把float[]向量转成查询文本(Spring AI的一种方式)
// 实际上更好的方式是直接传向量,取决于底层VectorStore的实现
// 这里以Spring AI的pgvector为例,它支持向量直接搜索
List<Document> rawResults = vectorStore.similaritySearch(
searchBuilder.build()
);
return IntStream.range(0, rawResults.size())
.mapToObj(i -> new VectorSearchResult<>(
fromDocument(rawResults.get(i)),
extractScore(rawResults.get(i)),
i + 1
))
.toList();
}
// 转换:KnowledgeDoc -> Spring AI Document
private Document toDocument(KnowledgeDoc doc) {
Map<String, Object> metadata = new HashMap<>();
metadata.put(FIELD_CATEGORY, doc.getCategory());
metadata.put(FIELD_SOURCE, doc.getSource());
metadata.put(FIELD_CREATED_AT, doc.getCreatedAt().toString());
if (doc.getTags() != null) {
metadata.put(FIELD_TAGS, String.join(",", doc.getTags()));
}
if (doc.getMetadata() != null) {
metadata.putAll(doc.getMetadata());
}
return new Document(
doc.getId(),
doc.getContent(),
metadata
);
}
// 转换:Spring AI Document -> KnowledgeDoc
private KnowledgeDoc fromDocument(Document doc) {
String tagsStr = (String) doc.getMetadata().get(FIELD_TAGS);
return KnowledgeDoc.builder()
.id(doc.getId())
.content(doc.getContent())
.category((String) doc.getMetadata().get(FIELD_CATEGORY))
.source((String) doc.getMetadata().get(FIELD_SOURCE))
.tags(tagsStr != null ? Arrays.asList(tagsStr.split(",")) : List.of())
.createdAt(parseInstant((String) doc.getMetadata().get(FIELD_CREATED_AT)))
.build();
}
// 构建过滤表达式(pgvector/Qdrant风格)
private String buildFilterExpression(Map<String, Object> filters) {
return filters.entrySet().stream()
.map(entry -> {
Object value = entry.getValue();
if (value instanceof String s) {
return entry.getKey() + " == '" + s + "'";
} else if (value instanceof Number) {
return entry.getKey() + " == " + value;
} else if (value instanceof List<?> list) {
String inClause = list.stream()
.map(v -> "'" + v + "'")
.collect(Collectors.joining(", "));
return entry.getKey() + " in [" + inClause + "]";
}
return entry.getKey() + " == " + value;
})
.collect(Collectors.joining(" && "));
}
private double extractScore(Document doc) {
Object score = doc.getMetadata().get("distance");
if (score instanceof Number n) {
return 1.0 - n.doubleValue(); // 距离转相似度
}
return 0.0;
}
private Instant parseInstant(String s) {
try {
return s != null ? Instant.parse(s) : Instant.now();
} catch (Exception e) {
return Instant.now();
}
}
@Override
public long count() {
// 具体实现取决于底层VectorStore
throw new UnsupportedOperationException("count不被所有VectorStore支持");
}
@Override
public boolean existsById(String id) {
throw new UnsupportedOperationException("existsById不被所有VectorStore支持");
}
}五、高级特性:混合检索(Hybrid Search)
纯向量检索有时候不够好。比如用户搜索一个专有名词(产品型号、人名),纯语义相似可能找不到,但关键词匹配能找到。混合检索把向量搜索和关键词搜索结合:
@Service
public class HybridSearchService {
private final KnowledgeDocRepository vectorRepo;
private final JdbcTemplate jdbcTemplate; // 或者任何支持全文检索的数据库
// 混合检索:向量 + 关键词,按RRF(Reciprocal Rank Fusion)合并
public List<KnowledgeDoc> hybridSearch(
float[] queryVector,
String keywords,
int topK) {
// 并行执行向量搜索和关键词搜索
CompletableFuture<List<VectorSearchResult<KnowledgeDoc>>> vectorFuture =
CompletableFuture.supplyAsync(() ->
vectorRepo.search(queryVector, topK * 2) // 取2倍,合并后再截取
);
CompletableFuture<List<KnowledgeDoc>> keywordFuture =
CompletableFuture.supplyAsync(() ->
keywordSearch(keywords, topK * 2)
);
CompletableFuture.allOf(vectorFuture, keywordFuture).join();
List<VectorSearchResult<KnowledgeDoc>> vectorResults = vectorFuture.join();
List<KnowledgeDoc> keywordResults = keywordFuture.join();
// RRF合并:计算每个文档的综合分数
return mergeWithRRF(vectorResults, keywordResults, topK);
}
private List<KnowledgeDoc> keywordSearch(String keywords, int topK) {
// pgvector + PostgreSQL全文检索示例
String sql = """
SELECT id, content, category, source, tags, created_at
FROM knowledge_docs
WHERE to_tsvector('chinese', content) @@ plainto_tsquery('chinese', ?)
LIMIT ?
""";
return jdbcTemplate.query(sql, (rs, rowNum) -> {
KnowledgeDoc doc = new KnowledgeDoc();
doc.setId(rs.getString("id"));
doc.setContent(rs.getString("content"));
doc.setCategory(rs.getString("category"));
// ...
return doc;
}, keywords, topK);
}
// Reciprocal Rank Fusion算法
// RRF score = sum(1 / (k + rank)) for each result list
private List<KnowledgeDoc> mergeWithRRF(
List<VectorSearchResult<KnowledgeDoc>> vectorResults,
List<KnowledgeDoc> keywordResults,
int topK) {
Map<String, Double> scoreMap = new HashMap<>();
final int k = 60; // RRF常数,通常取60
// 向量结果的分数
for (int i = 0; i < vectorResults.size(); i++) {
String id = vectorResults.get(i).document().getId();
scoreMap.merge(id, 1.0 / (k + i + 1), Double::sum);
}
// 关键词结果的分数
for (int i = 0; i < keywordResults.size(); i++) {
String id = keywordResults.get(i).getId();
scoreMap.merge(id, 1.0 / (k + i + 1), Double::sum);
}
// 收集所有文档(去重)
Map<String, KnowledgeDoc> allDocs = new HashMap<>();
vectorResults.forEach(r -> allDocs.put(r.document().getId(), r.document()));
keywordResults.forEach(d -> allDocs.putIfAbsent(d.getId(), d));
// 按RRF分数排序,取topK
return scoreMap.entrySet().stream()
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.limit(topK)
.map(entry -> {
KnowledgeDoc doc = allDocs.get(entry.getKey());
doc.setSimilarityScore(entry.getValue());
return doc;
})
.toList();
}
}六、向量存储与更新的批量处理
往向量数据库插入大量文档时,需要批量处理:
@Service
public class DocumentIngestionService {
private final KnowledgeDocRepository repository;
private final EmbeddingModel embeddingModel;
private final TextSplitter textSplitter;
// 批量导入文档
public IngestResult ingestDocuments(List<RawDocument> rawDocs) {
AtomicInteger success = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
List<String> failedIds = Collections.synchronizedList(new ArrayList<>());
// 分批处理,每批100条
List<List<RawDocument>> batches = partition(rawDocs, 100);
for (List<RawDocument> batch : batches) {
try {
List<KnowledgeDoc> docsToSave = batch.stream()
.flatMap(raw -> splitAndEmbed(raw).stream())
.toList();
repository.saveAll(docsToSave);
success.addAndGet(docsToSave.size());
log.info("批次完成,本批次{}条", docsToSave.size());
// 每批之间稍作停顿,避免embedding API限流
Thread.sleep(200);
} catch (Exception e) {
log.error("批次处理失败", e);
batch.forEach(raw -> failedIds.add(raw.id()));
failed.addAndGet(batch.size());
}
}
return new IngestResult(success.get(), failed.get(), failedIds);
}
// 把原始文档切分并生成embedding
private List<KnowledgeDoc> splitAndEmbed(RawDocument raw) {
// 文本分割(按段落、按长度等)
List<String> chunks = textSplitter.split(raw.content(), 1000); // 每块1000字符
return IntStream.range(0, chunks.size())
.mapToObj(i -> {
String chunk = chunks.get(i);
// 生成embedding
float[] embedding = embeddingModel.embed(chunk).getOutput();
return KnowledgeDoc.builder()
.id(raw.id() + "_chunk_" + i)
.content(chunk)
.category(raw.category())
.source(raw.source())
.tags(raw.tags())
.embedding(embedding)
.createdAt(Instant.now())
.metadata(Map.of(
"originalId", raw.id(),
"chunkIndex", i,
"totalChunks", chunks.size()
))
.build();
})
.toList();
}
private <T> List<List<T>> partition(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
public record RawDocument(
String id, String content, String category,
String source, List<String> tags
) {}
public record IngestResult(int success, int failed, List<String> failedIds) {}
}七、多租户向量隔离
在SaaS场景里,不同租户的数据必须隔离。向量数据库的多租户方案:
@Service
public class MultiTenantVectorService {
private final VectorStore vectorStore;
private final TenantContext tenantContext; // 从请求上下文获取租户ID
// 所有操作都自动注入租户过滤
public List<VectorSearchResult<KnowledgeDoc>> searchForCurrentTenant(
float[] queryVector, int topK) {
String tenantId = tenantContext.getCurrentTenantId();
Map<String, Object> filters = new HashMap<>();
filters.put("tenantId", tenantId);
VectorSearchRequest request = VectorSearchRequest.builder(queryVector)
.topK(topK)
.filter("tenantId", tenantId)
.build();
// ... 执行查询
return List.of();
}
// 保存时自动打上租户标签
public void saveForCurrentTenant(KnowledgeDoc doc) {
String tenantId = tenantContext.getCurrentTenantId();
// 确保文档有租户标签
Map<String, Object> metadata = new HashMap<>(
doc.getMetadata() != null ? doc.getMetadata() : Map.of()
);
metadata.put("tenantId", tenantId);
// 创建带租户信息的副本
KnowledgeDoc tenantDoc = KnowledgeDoc.builder()
.id(tenantId + "_" + doc.getId())
.content(doc.getContent())
.category(doc.getCategory())
.source(doc.getSource())
.metadata(metadata)
.build();
// 保存
}
}八、整体集成架构图
九、踩坑总结
坑1:embedding维度不一致
不同的embedding模型输出维度不同(1536、768、3072……)。如果换了模型但没重建索引,会出现维度不匹配错误。建议把模型名和维度一起存在元数据里,换模型前做完整的重新嵌入。
坑2:向量数据库索引失效
往向量数据库里批量插入数据时,某些数据库(比如pgvector)的HNSW/IVFFlat索引在数据量增加后需要重建才能保持查询效率。需要定期监控查询延迟,并在必要时安排维护窗口重建索引。
坑3:相似度阈值设置不当
默认的0.7阈值在某些场景下太高(找不到结果)或者太低(返回太多不相关结果)。不同的业务场景、不同的embedding模型,需要分别调整阈值。建议做A/B测试,用真实查询样本来确定阈值。
坑4:文档分块策略影响检索质量
分块太小(50字):语义信息不完整,检索出来的片段没有足够上下文 分块太大(3000字):一块里包含多个主题,被检索出来但实际相关内容很少
我通常从500-800字开始,然后根据实际检索效果调整。
小结
Spring Data风格的向量Repository封装,带来的价值:
- 统一抽象:业务代码不依赖具体的向量数据库实现
- 类型安全:强类型的
VectorSearchRequest替代字符串拼接的过滤条件 - 易于切换:底层换成其他向量数据库,Repository层不需要改变
- 混合检索:在Repository层封装RRF合并逻辑,Service层无感知
- 多租户隔离:统一在Repository层处理,不会遗漏
向量数据库的访问层虽然不复杂,但值得认真设计。一个好的抽象层,能让上层业务代码保持干净,也让后期的数据库迁移和性能优化有很大的操作空间。
