第2370篇:多跳问答的工程解法——需要多步推理的复杂问题如何处理
第2370篇:多跳问答的工程解法——需要多步推理的复杂问题如何处理
适读人群:构建企业级RAG系统的AI工程师 | 阅读时长:约20分钟 | 核心价值:掌握多跳问答的核心工程方案,包括问题分解、迭代检索、答案聚合的完整实现
有一次做代码评审,同事问我:"你觉得我们的RAG系统现在最大的问题是什么?"
我当时的回答是:它只能回答"是什么",不能回答"为什么"和"怎么做到的"。
举个例子。用户问:"我们公司去年的退款率为什么比行业平均高了2个百分点,主要是哪些产品线拖累的?"
这个问题需要至少三步:
- 查出我们公司去年退款率
- 查出行业平均退款率
- 按产品线拆分退款数据,找出差距来源
每一步的输入依赖上一步的输出。普通RAG一次检索,根本处理不了这种多跳推理问题。
多跳问答的核心挑战
先把问题说清楚。多跳问答(Multi-hop QA)的定义:回答问题需要结合多份文档或多条信息,且信息之间有推理依赖关系。
/**
* 几种典型的多跳问题类型
*
* 类型1:桥接型(Bridge)
* 问:[实体A]的[关系X]是谁,这个人的[关系Y]是什么?
* 例:我们的CTO是谁,他之前在哪家公司工作?
* 推理路径:找CTO → 找CTO的工作经历
*
* 类型2:比较型(Comparison)
* 问:[实体A]和[实体B]相比,哪个的[属性X]更好?
* 例:产品A和产品B相比,哪个退款率更低?
* 推理路径:分别查A和B的退款率 → 比较
*
* 类型3:聚合型(Aggregation)
* 问:所有[条件X]的[实体]中,[属性Y]最高的是哪个?
* 例:所有上线超过一年的产品中,哪个复购率最高?
* 推理路径:找出所有满足条件的产品 → 查各自复购率 → 排序
*
* 类型4:验证型(Verification)
* 问:[某个说法]是否正确?
* 例:"我们Q3收入是Q2的两倍"这个说法对吗?
* 推理路径:查Q2收入 → 查Q3收入 → 计算比值 → 验证
*/方案一:问题分解(Query Decomposition)
最直接的思路:把复杂问题拆成多个简单子问题,分别检索,再聚合。
@Service
public class QueryDecompositionService {
private final ChatClient chatClient;
private final RAGRetriever baseRetriever;
/**
* 问题分解的核心Prompt
* 关键:让LLM输出结构化的子问题列表,以及子问题之间的依赖关系
*/
public DecompositionPlan decompose(String question) {
String prompt = """
请分析以下问题,判断是否需要多步推理,如果需要,将其分解为子问题。
问题:%s
分析要求:
1. 判断这个问题能否通过一次检索直接回答
2. 如果不能,列出需要哪些子问题,每个子问题应该是可以单独检索回答的
3. 标明子问题之间的依赖关系(哪个子问题的答案是另一个的输入)
输出格式(JSON):
{
"is_simple": false,
"sub_questions": [
{
"id": "q1",
"question": "子问题文本",
"depends_on": []
},
{
"id": "q2",
"question": "子问题文本(可引用{{q1.answer}})",
"depends_on": ["q1"]
}
]
}
只输出JSON,不要任何其他内容。
""".formatted(question);
String response = chatClient.prompt(prompt).call().content();
return parseDecompositionPlan(response);
}
/**
* 按照分解计划执行多跳检索
* 处理子问题之间的依赖关系
*/
public MultiHopResult execute(String originalQuestion, DecompositionPlan plan) {
if (plan.isSimple()) {
// 简单问题直接检索
return MultiHopResult.simple(baseRetriever.retrieve(originalQuestion));
}
Map<String, String> answerCache = new HashMap<>();
List<RetrievalStep> steps = new ArrayList<>();
// 拓扑排序执行子问题
for (SubQuestion sq : topologicalSort(plan.getSubQuestions())) {
// 替换依赖问题的答案
String resolvedQuestion = resolveDependencies(sq.getQuestion(), answerCache);
// 执行检索和生成
RetrievalResult retrieval = baseRetriever.retrieve(resolvedQuestion);
String subAnswer = generateSubAnswer(resolvedQuestion, retrieval);
answerCache.put(sq.getId(), subAnswer);
steps.add(new RetrievalStep(sq, retrieval, subAnswer));
}
// 最终聚合:把所有子问题的答案汇总,生成最终回答
String finalAnswer = synthesizeFinalAnswer(originalQuestion, steps);
return MultiHopResult.multiHop(steps, finalAnswer);
}
/**
* 替换子问题中的依赖占位符
* 例如:"{{q1.answer}}的CEO是谁" → "张三的CEO是谁"
*/
private String resolveDependencies(String question, Map<String, String> answerCache) {
String resolved = question;
for (Map.Entry<String, String> entry : answerCache.entrySet()) {
resolved = resolved.replace(
"{{" + entry.getKey() + ".answer}}",
entry.getValue()
);
}
return resolved;
}
/**
* 聚合所有子问题的答案,生成最终回答
*/
private String synthesizeFinalAnswer(String originalQuestion, List<RetrievalStep> steps) {
StringBuilder context = new StringBuilder();
context.append("以下是回答原始问题的各个子问题及其答案:\n\n");
for (int i = 0; i < steps.size(); i++) {
RetrievalStep step = steps.get(i);
context.append(String.format("子问题%d:%s\n", i + 1, step.getQuestion().getQuestion()));
context.append(String.format("答案%d:%s\n\n", i + 1, step.getSubAnswer()));
}
String prompt = """
基于以下子问题和答案,请综合回答原始问题。
%s
原始问题:%s
请给出完整、准确的综合回答:
""".formatted(context.toString(), originalQuestion);
return chatClient.prompt(prompt).call().content();
}
}方案二:迭代检索(Iterative Retrieval)
问题分解需要在开始时就规划好所有子问题,但有些问题在检索之前根本不知道需要几跳。迭代检索更灵活:每次检索后,由LLM判断是否已有足够信息,如果没有,继续检索。
@Service
public class IterativeRetriever {
private static final int MAX_ITERATIONS = 5;
private final ChatClient chatClient;
private final VectorStore vectorStore;
/**
* 迭代检索主流程
*
* ReAct框架(Reasoning + Acting):
* Thought → Action → Observation → Thought → ...
*/
public IterativeResult retrieve(String question) {
List<IterationRecord> history = new ArrayList<>();
List<Document> allCollectedDocs = new ArrayList<>();
for (int iteration = 0; iteration < MAX_ITERATIONS; iteration++) {
// Thought:分析当前状态,决定下一步行动
ThoughtAction thoughtAction = think(question, history, allCollectedDocs);
if (thoughtAction.isFinished()) {
// LLM认为已有足够信息
String finalAnswer = generateFinalAnswer(question, allCollectedDocs);
return IterativeResult.success(history, finalAnswer, iteration + 1);
}
// Action:执行检索
String searchQuery = thoughtAction.getSearchQuery();
List<Document> newDocs = vectorStore.similaritySearch(
SearchRequest.query(searchQuery).withTopK(3)
);
// Observation:记录检索结果
allCollectedDocs.addAll(newDocs);
history.add(new IterationRecord(
iteration + 1,
thoughtAction.getThought(),
searchQuery,
newDocs
));
}
// 超过最大迭代次数,用已收集的信息生成回答
log.warn("Max iterations reached for question: {}", question);
return IterativeResult.maxIterations(history,
generateFinalAnswer(question, allCollectedDocs));
}
/**
* Thought步骤:让LLM分析当前情况,决定下一步
*/
private ThoughtAction think(String question,
List<IterationRecord> history,
List<Document> collectedDocs) {
StringBuilder historyText = new StringBuilder();
for (IterationRecord record : history) {
historyText.append(String.format(
"第%d次检索:\n查询:%s\n找到:%s\n\n",
record.getIteration(),
record.getSearchQuery(),
summarizeDocs(record.getDocs())
));
}
String prompt = """
你正在回答一个问题,通过多轮检索收集必要信息。
原始问题:%s
%s
目前已收集到的信息:
%s
请分析:
1. 当前收集到的信息是否足以回答原始问题?
2. 如果不够,还需要查什么?
输出JSON:
{
"thought": "你的分析过程",
"finished": true/false,
"search_query": "如果需要继续检索,写出检索查询词;如果finished=true则留空"
}
""".formatted(
question,
history.isEmpty() ? "" : "历史检索记录:\n" + historyText,
collectedDocs.isEmpty() ? "(暂无)" : summarizeDocs(collectedDocs)
);
String response = chatClient.prompt(prompt).call().content();
return parseThoughtAction(response);
}
}方案三:自适应检索深度控制
实际系统里不能对所有问题都做多跳处理,太贵了(Token消耗和延迟都会增加3-5倍)。需要先判断问题是否真的需要多跳。
@Service
public class AdaptiveMultiHopRouter {
/**
* 问题复杂度评估
* 快速、低成本地判断是否需要多跳
*/
public QuestionComplexity assessComplexity(String question) {
// 规则快速判断(低成本)
QuestionComplexity ruleResult = ruleBasedAssessment(question);
if (ruleResult.isDefinitelySingle() || ruleResult.isDefinitelyMulti()) {
return ruleResult;
}
// 模糊情况用LLM判断(较高成本,但只有必要时才调用)
return llmBasedAssessment(question);
}
/**
* 基于规则的快速判断
*/
private QuestionComplexity ruleBasedAssessment(String question) {
// 多跳问题的语言特征
List<String> multiHopIndicators = Arrays.asList(
"为什么", "原因是", "对比", "相比", "哪个更",
"最高的", "最低的", "排名", "第一", "之间的关系",
"导致", "影响", "基于", "综合来看"
);
// 单跳问题的特征
List<String> singleHopIndicators = Arrays.asList(
"是什么", "定义", "介绍一下", "查一下"
);
long multiHopCount = multiHopIndicators.stream()
.filter(question::contains).count();
long singleHopCount = singleHopIndicators.stream()
.filter(question::contains).count();
if (multiHopCount >= 2 && singleHopCount == 0) {
return QuestionComplexity.MULTI_HOP_HIGH_CONFIDENCE;
}
if (singleHopCount >= 1 && multiHopCount == 0) {
return QuestionComplexity.SINGLE_HOP_HIGH_CONFIDENCE;
}
return QuestionComplexity.UNCERTAIN;
}
/**
* 路由决策:选择最合适的检索策略
*/
public RetrievalResult route(String question) {
QuestionComplexity complexity = assessComplexity(question);
return switch (complexity) {
case SINGLE_HOP_HIGH_CONFIDENCE -> simpleRAG.retrieve(question);
case MULTI_HOP_HIGH_CONFIDENCE -> multiHopRAG.retrieve(question);
case UNCERTAIN -> {
// 先尝试简单检索,如果置信度低,升级到多跳
RetrievalResult simpleResult = simpleRAG.retrieve(question);
if (simpleResult.getConfidence() < 0.7) {
yield multiHopRAG.retrieve(question);
}
yield simpleResult;
}
};
}
}工程实践的几个关键经验
1. 子问题的粒度控制
分解得太细,中间步骤太多,累积错误大。分解得太粗,每个子问题还是太复杂。经验是:每个子问题控制在"一次简单向量检索能回答"的粒度。
2. 答案传递的格式
子问题答案传给下个子问题时,不要传全文。让LLM先提取关键信息(比如:人名、日期、数字),再传递。不然上下文会膨胀得很快。
3. 循环依赖的检测
/**
* 拓扑排序前检测循环依赖
* 用Kahn算法,O(V+E)
*/
private List<SubQuestion> topologicalSort(List<SubQuestion> questions) {
Map<String, SubQuestion> idMap = questions.stream()
.collect(Collectors.toMap(SubQuestion::getId, q -> q));
Map<String, Integer> inDegree = new HashMap<>();
for (SubQuestion q : questions) {
inDegree.put(q.getId(), q.getDependsOn().size());
}
Queue<SubQuestion> queue = new LinkedList<>();
for (SubQuestion q : questions) {
if (inDegree.get(q.getId()) == 0) {
queue.offer(q);
}
}
List<SubQuestion> sorted = new ArrayList<>();
while (!queue.isEmpty()) {
SubQuestion current = queue.poll();
sorted.add(current);
for (SubQuestion q : questions) {
if (q.getDependsOn().contains(current.getId())) {
int newDegree = inDegree.get(q.getId()) - 1;
inDegree.put(q.getId(), newDegree);
if (newDegree == 0) {
queue.offer(q);
}
}
}
}
if (sorted.size() != questions.size()) {
throw new CyclicDependencyException("Sub-questions have cyclic dependencies");
}
return sorted;
}4. 失败降级
任何一个子问题检索失败,不要直接报错。降级策略:跳过这个子问题,用剩余信息尽力回答,并在回答里注明"由于无法获取XX信息,以下分析可能不完整"。
多跳问答的工程价值在复杂业务场景下非常显著,但它的代价是延迟和成本的增加。做好路由,让简单问题走简单路径,才是实际系统的正确姿势。
