第2312篇:Adaptive RAG——根据问题类型动态选择检索策略
大约 7 分钟
第2312篇:Adaptive RAG——根据问题类型动态选择检索策略
适读人群:RAG系统架构师、AI应用工程师 | 阅读时长:约17分钟 | 核心价值:掌握Adaptive RAG的设计思路,构建能根据问题复杂度自动选择最优检索策略的智能RAG系统
我们的RAG系统服务三类用户:普通员工查公司政策("年假多少天")、业务经理分析季度报表("Q3各区域销售数据对比")、研究人员做竞品分析("我们和竞争对手的技术差距在哪里")。
最初我们用一套统一策略:固定chunk,固定top-k,固定生成方式。问题很快就暴露出来:查政策太慢(简单问题用了复杂检索),做竞品分析太浅(复杂问题用了简单检索)。
Adaptive RAG的核心思想就是:不同类型的问题,最优的检索策略是不同的,要让系统能自适应地选择。
问题类型分类框架
Adaptive RAG的第一步是对问题进行分类:
简单事实查询:"我们公司的差旅报销上限是多少?" — 一次精准检索即可
多跳推理查询:"A客户的主要采购决策者是谁,他之前在哪些公司工作过?" — 需要先找A客户,再找决策者,再查背景,多步骤
聚合分析查询:"过去半年所有超过50万的合同,分布在哪些行业?" — 需要宽泛检索大量文档,聚合分析
比较对比查询:"我们的产品和B竞品在技术架构上有什么差异?" — 需要分别检索两个主题,对比分析
问题分类器实现
@Component
public class QueryClassifier {
private final ChatClient chatClient;
private static final String CLASSIFICATION_PROMPT = """
将用户问题分类为以下四种类型之一:
1. SIMPLE_FACTUAL:简单事实查询,一段文本就能回答
例如:"公司假期政策是什么"、"某个术语的定义"
2. MULTI_HOP:多跳推理,需要从多个相关实体逐步推理
例如:"A的负责人的上级是谁"、"这个项目用到的技术框架的最新版本"
3. AGGREGATION:聚合分析,需要从大量文档中统计汇总
例如:"所有满足X条件的记录有多少"、"过去一年的趋势分析"
4. COMPARATIVE:对比查询,需要并行检索多个主题进行比较
例如:"A和B有什么区别"、"哪个方案更好"
输出格式(JSON):
{
"type": "SIMPLE_FACTUAL/MULTI_HOP/AGGREGATION/COMPARATIVE",
"confidence": 0-100,
"reasoning": "分类理由",
"subTopics": ["如果是比较查询,列出需要分别检索的子主题"]
}
""";
public QueryClassification classify(String question) {
// 快速规则判断
QueryType ruleType = ruleBasedClassify(question);
if (ruleType != null) {
return new QueryClassification(ruleType, 90, "规则匹配", List.of());
}
String response = chatClient.prompt()
.system(CLASSIFICATION_PROMPT)
.user("问题:" + question)
.call()
.content();
return parseClassification(response);
}
private QueryType ruleBasedClassify(String question) {
// 明显的聚合词,快速判断
if (AGGREGATION_PATTERNS.stream().anyMatch(p -> question.contains(p))) {
return QueryType.AGGREGATION;
}
// 明显的比较词
if (COMPARISON_PATTERNS.stream()
.anyMatch(p -> question.contains(p))) {
return QueryType.COMPARATIVE;
}
return null;
}
private static final List<String> AGGREGATION_PATTERNS = List.of(
"有多少", "统计", "汇总", "总共", "所有", "列出所有", "哪些", "分布"
);
private static final List<String> COMPARISON_PATTERNS = List.of(
"有什么区别", "和...相比", "vs", "对比", "哪个更", "异同"
);
}各类型的检索策略实现
/**
* 简单事实查询的检索策略
* 精准、快速、单次检索
*/
@Component
public class SimpleFactualRetrievalStrategy implements RetrievalStrategy {
private final VectorSearchService vectorSearch;
@Override
public RetrievalResult retrieve(String question, QueryClassification classification) {
// top-3足够,要精准不要多
List<RetrievedDocument> docs = vectorSearch.search(question, 3);
return RetrievalResult.single(docs, "SIMPLE_FACTUAL");
}
}
/**
* 多跳推理的检索策略
* 迭代检索,每一跳基于上一跳的结果
*/
@Component
public class MultiHopRetrievalStrategy implements RetrievalStrategy {
private final VectorSearchService vectorSearch;
private final ChatClient chatClient;
private static final int MAX_HOPS = 3;
@Override
public RetrievalResult retrieve(String question, QueryClassification classification) {
List<RetrievedDocument> allDocs = new ArrayList<>();
String currentQuery = question;
List<String> hops = new ArrayList<>();
for (int hop = 0; hop < MAX_HOPS; hop++) {
List<RetrievedDocument> hopDocs = vectorSearch.search(currentQuery, 3);
allDocs.addAll(hopDocs);
hops.add(currentQuery);
// 判断是否还需要继续检索
String nextQuery = determineNextHop(question, hops, hopDocs);
if (nextQuery == null || nextQuery.equals("DONE")) {
break;
}
currentQuery = nextQuery;
log.info("多跳检索,第{}跳: {}", hop + 1, currentQuery);
}
return RetrievalResult.multiHop(allDocs, hops);
}
private String determineNextHop(String originalQuestion,
List<String> previousHops,
List<RetrievedDocument> lastHopDocs) {
String docsContent = lastHopDocs.stream()
.map(RetrievedDocument::content)
.collect(Collectors.joining("\n\n"));
String previousHopsText = String.join(" -> ", previousHops);
String response = chatClient.prompt()
.system("""
你在帮助回答一个需要多步推理的问题。
基于当前检索到的信息,判断是否还需要进一步检索,以及下一步检索什么。
如果已有足够信息回答问题,输出 DONE。
否则输出下一步的检索查询(一句话)。
""")
.user("""
原始问题:%s
已检索路径:%s
最新检索到的内容:%s
下一步检索什么(或DONE):
""".formatted(originalQuestion, previousHopsText, docsContent))
.call()
.content()
.trim();
return response;
}
}
/**
* 聚合分析的检索策略
* 宽泛检索大量文档,不要求每篇都精准相关
*/
@Component
public class AggregationRetrievalStrategy implements RetrievalStrategy {
private final VectorSearchService vectorSearch;
private final ChatClient chatClient;
@Override
public RetrievalResult retrieve(String question, QueryClassification classification) {
// 聚合查询需要更多文档,top-20
List<RetrievedDocument> broadDocs = vectorSearch.search(question, 20);
// 额外生成几个变体查询,捕捉更多相关文档
List<String> variantQueries = generateVariantQueries(question);
for (String variant : variantQueries) {
broadDocs.addAll(vectorSearch.search(variant, 10));
}
// 去重
List<RetrievedDocument> uniqueDocs = deduplicateByContent(broadDocs);
log.info("聚合检索:共获取{}个唯一文档", uniqueDocs.size());
return RetrievalResult.aggregation(uniqueDocs);
}
private List<String> generateVariantQueries(String question) {
String response = chatClient.prompt()
.system("生成3个与原始问题语义相关的变体查询,用于扩大检索覆盖面。输出JSON数组。")
.user(question)
.call()
.content();
return parseJsonArray(response);
}
}
/**
* 比较对比的检索策略
* 并行检索多个主题,保持各主题文档的标签
*/
@Component
public class ComparativeRetrievalStrategy implements RetrievalStrategy {
private final VectorSearchService vectorSearch;
@Override
public RetrievalResult retrieve(String question, QueryClassification classification) {
List<String> subTopics = classification.subTopics();
if (subTopics.isEmpty()) {
// 没有提取到子主题,回退到简单检索
return new SimpleFactualRetrievalStrategy(vectorSearch).retrieve(question, classification);
}
// 并行检索各个子主题
Map<String, List<RetrievedDocument>> topicDocs = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = subTopics.stream()
.map(topic -> CompletableFuture.runAsync(() -> {
List<RetrievedDocument> docs = vectorSearch.search(topic, 5);
topicDocs.put(topic, docs);
}))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return RetrievalResult.comparative(topicDocs);
}
}自适应检索调度器
把分类器和各策略组合起来:
@Service
public class AdaptiveRAGOrchestrator {
private final QueryClassifier queryClassifier;
private final Map<QueryType, RetrievalStrategy> strategyMap;
private final ChatClient generationClient;
public AdaptiveRAGOrchestrator(QueryClassifier queryClassifier,
SimpleFactualRetrievalStrategy simpleStrategy,
MultiHopRetrievalStrategy multiHopStrategy,
AggregationRetrievalStrategy aggregationStrategy,
ComparativeRetrievalStrategy comparativeStrategy,
ChatClient generationClient) {
this.queryClassifier = queryClassifier;
this.strategyMap = Map.of(
QueryType.SIMPLE_FACTUAL, simpleStrategy,
QueryType.MULTI_HOP, multiHopStrategy,
QueryType.AGGREGATION, aggregationStrategy,
QueryType.COMPARATIVE, comparativeStrategy
);
this.generationClient = generationClient;
}
public AdaptiveRAGResult answer(String question) {
// 1. 分类问题
QueryClassification classification = queryClassifier.classify(question);
log.info("问题分类: type={}, confidence={}",
classification.type(), classification.confidence());
// 2. 选择检索策略
RetrievalStrategy strategy = strategyMap.getOrDefault(
classification.type(), strategyMap.get(QueryType.SIMPLE_FACTUAL)
);
// 3. 执行检索
RetrievalResult retrievalResult = strategy.retrieve(question, classification);
// 4. 根据检索类型选择生成策略
String answer = switch (classification.type()) {
case SIMPLE_FACTUAL -> generateSimpleAnswer(question, retrievalResult);
case MULTI_HOP -> generateMultiHopAnswer(question, retrievalResult);
case AGGREGATION -> generateAggregationAnswer(question, retrievalResult);
case COMPARATIVE -> generateComparativeAnswer(question, retrievalResult);
};
return AdaptiveRAGResult.of(answer, classification, retrievalResult);
}
private String generateComparativeAnswer(String question, RetrievalResult result) {
// 对比查询:先分别总结各主题,再对比
Map<String, List<RetrievedDocument>> topicDocs = result.getTopicDocs();
Map<String, String> topicSummaries = topicDocs.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> summarizeTopic(e.getKey(), e.getValue())
));
String summariesContext = topicSummaries.entrySet().stream()
.map(e -> "【%s】\n%s".formatted(e.getKey(), e.getValue()))
.collect(Collectors.joining("\n\n"));
return generationClient.prompt()
.system("基于以下各主题的信息,回答用户的对比查询问题,要清晰地展示异同点。")
.user("各主题信息:\n%s\n\n对比问题:%s".formatted(summariesContext, question))
.call()
.content();
}
private String summarizeTopic(String topic, List<RetrievedDocument> docs) {
String docsText = docs.stream()
.map(RetrievedDocument::content)
.collect(Collectors.joining("\n"));
return generationClient.prompt()
.system("基于以下文档,简洁总结关于「" + topic + "」的核心信息。")
.user(docsText)
.call()
.content();
}
}反馈驱动的策略优化
Adaptive RAG的最大价值来自持续学习:记录每种策略的效果,持续优化分类规则:
@Component
public class StrategyPerformanceTracker {
private final MeterRegistry meterRegistry;
public void recordStrategyResult(QueryType type, boolean userSatisfied,
long latencyMs) {
// 记录各策略的用户满意度
Counter.builder("adaptive_rag.strategy.result")
.tag("type", type.name())
.tag("satisfied", String.valueOf(userSatisfied))
.register(meterRegistry)
.increment();
// 记录延迟
Timer.builder("adaptive_rag.strategy.latency")
.tag("type", type.name())
.register(meterRegistry)
.record(latencyMs, TimeUnit.MILLISECONDS);
}
/**
* 定期分析策略效果,生成优化建议
*/
@Scheduled(cron = "0 0 8 * * MON") // 每周一上午
public void analyzeAndReport() {
// 这里可以接入数据分析,看哪些分类频繁出错,对应调整规则
log.info("每周Adaptive RAG策略效果分析...");
}
}Adaptive RAG的本质是:一把锤子打不了所有钉子,不同的问题需要不同的工具。这个看起来简单的道理,在工程实现上需要仔细设计分类机制和各类策略的边界。实际上,我们在引入分类后,整体用户满意度提升了约22%,而且复杂查询的响应时间反而因为策略的针对性而有所降低。
