第2389篇:多知识库联合检索——跨多个RAG系统搜索和融合答案
大约 6 分钟
第2389篇:多知识库联合检索——跨多个RAG系统搜索和融合答案
适读人群:需要整合多个知识库的AI工程师 | 阅读时长:约18分钟 | 核心价值:掌握跨知识库检索的路由、融合和冲突处理工程方案
大型企业里,知识从来不是存在一个地方的。我们遇到的典型情况:
- 产品知识库:产品规格、说明书
- 运营知识库:SOP、流程规范
- 法规知识库:合规要求、政策文件
- 历史案例库:过往案例、解决方案
用户的问题可能跨越这几个领域:"这个产品能不能出口到欧盟?需要什么认证?"这个问题需要从产品知识库(产品规格)和法规知识库(欧盟认证要求)同时拿信息。
单知识库的RAG无法解决这个问题。需要多知识库联合检索。
多知识库的架构设计
/**
* 多知识库联合检索的架构层次
*
* 层次1:知识库路由层
* 接收用户查询,决定检索哪些知识库
* 策略:规则路由、分类器路由、全量路由
*
* 层次2:并行检索层
* 同时向多个知识库发起检索请求
* 优化:根据知识库的响应时间做超时控制
*
* 层次3:结果融合层
* 合并来自不同知识库的检索结果
* 策略:简单合并、相关度重排、冲突处理
*
* 层次4:答案生成层
* 基于融合后的上下文,生成最终答案
* 注意:要标注信息来自哪个知识库
*/知识库路由策略
@Service
public class KnowledgeBaseRouter {
private final Map<String, KnowledgeBase> knowledgeBases;
private final ChatClient chatClient;
/**
* 路由策略1:关键词规则路由(最快,成本低)
*/
public List<String> routeByKeywords(String query) {
List<String> targetKBs = new ArrayList<>();
// 产品相关
if (containsAny(query, "产品", "规格", "参数", "型号", "功能")) {
targetKBs.add("product-kb");
}
// 法规合规相关
if (containsAny(query, "合规", "法规", "认证", "出口", "许可", "标准")) {
targetKBs.add("regulation-kb");
}
// 操作流程相关
if (containsAny(query, "流程", "步骤", "如何操作", "怎么做", "SOP")) {
targetKBs.add("operations-kb");
}
// 如果没有匹配到,全部检索(保底)
if (targetKBs.isEmpty()) {
targetKBs.addAll(knowledgeBases.keySet());
}
return targetKBs;
}
/**
* 路由策略2:LLM分类路由(准确,但有延迟和成本)
*/
public List<String> routeByLLM(String query) {
String prompt = """
请分析以下问题,判断需要检索哪些知识库。
可用知识库:
- product-kb: 产品规格、参数、功能说明
- regulation-kb: 法规、合规要求、认证标准
- operations-kb: 操作流程、SOP、工作规范
- case-kb: 历史案例、解决方案、经验总结
问题:%s
输出JSON数组,列出需要检索的知识库ID:
["product-kb", "regulation-kb"]
只输出JSON数组,不要其他内容。
""".formatted(query);
String response = chatClient.prompt(prompt).call().content();
return parseJsonArray(response);
}
/**
* 路由策略3:向量相似度路由
*
* 每个知识库有一个"主题向量"(代表该知识库的主要内容)
* 用查询向量与主题向量的相似度来决定路由
*/
public List<String> routeByTopicSimilarity(String query) {
float[] queryVector = embeddingModel.embed(query);
Map<String, Double> similarities = new HashMap<>();
for (Map.Entry<String, KnowledgeBase> entry : knowledgeBases.entrySet()) {
float[] topicVector = entry.getValue().getTopicVector();
double similarity = cosineSimilarity(queryVector, topicVector);
similarities.put(entry.getKey(), similarity);
}
// 相似度超过阈值的知识库都检索
return similarities.entrySet().stream()
.filter(e -> e.getValue() > 0.6)
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.limit(3) // 最多3个
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
/**
* 组合路由:快速规则 + 兜底向量相似度
*/
public List<String> route(String query) {
// 先用规则路由(快)
List<String> ruleResult = routeByKeywords(query);
// 规则只匹配到1个,用向量相似度补充
if (ruleResult.size() == 1) {
List<String> vectorResult = routeByTopicSimilarity(query);
Set<String> combined = new LinkedHashSet<>(ruleResult);
combined.addAll(vectorResult);
return new ArrayList<>(combined);
}
return ruleResult;
}
}并行检索和结果融合
@Service
public class MultiKBRetriever {
private final Map<String, VectorStore> vectorStores;
private final ExecutorService executor;
/**
* 并行检索多个知识库
*/
public MultiKBResult retrieve(String query, List<String> targetKBIds) {
Map<String, CompletableFuture<KBSearchResult>> futures = new HashMap<>();
for (String kbId : targetKBIds) {
VectorStore vectorStore = vectorStores.get(kbId);
if (vectorStore == null) {
log.warn("Knowledge base not found: {}", kbId);
continue;
}
CompletableFuture<KBSearchResult> future = CompletableFuture.supplyAsync(() -> {
try {
List<Document> docs = vectorStore.similaritySearch(
SearchRequest.query(query).withTopK(5)
);
return KBSearchResult.success(kbId, docs);
} catch (Exception e) {
log.error("Failed to search KB: {}", kbId, e);
return KBSearchResult.failed(kbId, e.getMessage());
}
}, executor);
futures.put(kbId, future);
}
// 等待所有检索完成(最多3秒)
Map<String, KBSearchResult> results = new HashMap<>();
for (Map.Entry<String, CompletableFuture<KBSearchResult>> entry : futures.entrySet()) {
try {
KBSearchResult result = entry.getValue().get(3, TimeUnit.SECONDS);
results.put(entry.getKey(), result);
} catch (TimeoutException e) {
results.put(entry.getKey(), KBSearchResult.failed(entry.getKey(), "Timeout"));
log.warn("KB search timeout: {}", entry.getKey());
} catch (Exception e) {
results.put(entry.getKey(), KBSearchResult.failed(entry.getKey(), e.getMessage()));
}
}
return MultiKBResult.of(results);
}
/**
* 多知识库结果融合
*
* 策略:
* 1. 保留每个文档的来源知识库信息
* 2. 按相关度重排(Cross-KB排序)
* 3. 去重(不同知识库可能有相同内容)
*/
public List<EnrichedDocument> mergeResults(MultiKBResult multiResult) {
List<EnrichedDocument> allDocs = new ArrayList<>();
for (KBSearchResult kbResult : multiResult.getResults().values()) {
if (!kbResult.isSuccess()) continue;
for (Document doc : kbResult.getDocs()) {
allDocs.add(EnrichedDocument.builder()
.document(doc)
.sourceKBId(kbResult.getKbId())
.sourceKBName(getKBName(kbResult.getKbId()))
.relevanceScore((Float) doc.getMetadata().getOrDefault("distance", 0.5f))
.build()
);
}
}
// 去重(基于内容hash)
List<EnrichedDocument> deduplicated = deduplicateByContent(allDocs);
// 按相关度排序
deduplicated.sort(Comparator.comparing(EnrichedDocument::getRelevanceScore).reversed());
// 保留Top-10
return deduplicated.stream().limit(10).collect(Collectors.toList());
}
}跨知识库冲突检测和处理
@Service
public class CrossKBConflictHandler {
/**
* 检测不同知识库的文档是否存在矛盾
*
* 常见的矛盾类型:
* - 数字不一致(A库说"不超过5个",B库说"最多10个")
* - 规则互斥(A库的规定和B库的规定不能同时满足)
* - 时效不同(A库是旧标准,B库是新标准)
*/
public ConflictAnalysis detectConflicts(List<EnrichedDocument> docs) {
if (docs.size() <= 1) {
return ConflictAnalysis.noConflict();
}
// 快速检查:来自不同知识库的文档才可能有冲突
List<EnrichedDocument> multiSourceDocs = filterMultiSourceDocs(docs);
if (multiSourceDocs.size() <= 1) {
return ConflictAnalysis.noConflict();
}
// 用LLM检测是否有实质性冲突
String conflictCheckPrompt = buildConflictCheckPrompt(docs);
String response = chatClient.prompt(conflictCheckPrompt).call().content();
return parseConflictAnalysis(response);
}
private String buildConflictCheckPrompt(List<EnrichedDocument> docs) {
StringBuilder docsText = new StringBuilder();
for (EnrichedDocument doc : docs) {
docsText.append(String.format(
"【来源:%s】\n%s\n\n",
doc.getSourceKBName(),
doc.getDocument().getContent()
));
}
return """
请分析以下来自不同知识库的文档,判断是否有相互矛盾的信息。
%s
分析要求:
1. 找出有实质性矛盾的地方(数字不同、规定相悖等)
2. 区分"表述不同但意思相同"和"真正矛盾"
输出JSON:
{
"has_conflict": true/false,
"conflicts": [
{
"description": "冲突描述",
"source1": "知识库A的说法",
"source2": "知识库B的说法",
"severity": "high/medium/low"
}
]
}
""".formatted(docsText.toString());
}
}跨知识库的答案生成
@Service
public class MultiKBAnswerGenerator {
/**
* 生成引用多个知识库的综合答案
*/
public String generateAnswer(String question, List<EnrichedDocument> docs,
ConflictAnalysis conflictAnalysis) {
String contextSection = buildMultiKBContext(docs);
String conflictGuidance = buildConflictGuidance(conflictAnalysis);
return chatClient.prompt("""
你的任务是基于多个知识库的内容,综合回答用户问题。
===参考内容===
%s
===参考内容结束===
%s
回答规则:
1. 综合不同知识库的信息,给出完整回答
2. 引用来源时注明知识库名称(如"根据产品手册..."、"根据合规法规...")
3. 如果不同知识库的信息互补,整合成完整答案
4. 如果不同知识库有冲突,明确指出并建议用户以权威来源为准
问题:%s
""".formatted(contextSection, conflictGuidance, question))
.call().content();
}
private String buildMultiKBContext(List<EnrichedDocument> docs) {
// 按知识库分组展示
Map<String, List<EnrichedDocument>> byKB = docs.stream()
.collect(Collectors.groupingBy(EnrichedDocument::getSourceKBId));
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, List<EnrichedDocument>> entry : byKB.entrySet()) {
sb.append(String.format("\n【%s 知识库】\n", getKBName(entry.getKey())));
entry.getValue().forEach(d -> sb.append(d.getDocument().getContent()).append("\n\n"));
}
return sb.toString();
}
}多知识库联合检索的工程复杂度确实更高,但对于信息分散在多个系统里的大型企业,这是绕不过去的。路由策略的准确性是整个系统的关键——路由错了,检索到无关知识库,整个答案就偏了。建议用一批有标注的测试问题来评估路由准确率,至少要达到85%以上再上线。
