第1665篇:多跳推理RAG——跨文档的复杂问题回答系统
第1665篇:多跳推理RAG——跨文档的复杂问题回答系统
上一篇聊了自适应RAG,其中提到了"复杂推理"这个类别,把问题分解成子问题逐一回答。
今天我想更深入地讲多跳推理(Multi-hop Reasoning)——这是RAG系统里我觉得最有挑战性、也最有价值的一个方向。
先说个具体场景:用户问"负责我们数据库中台架构的工程师,之前在哪个团队积累了分布式存储经验?"
这个问题需要几步:
- 找到"数据库中台"对应的架构文档,确认负责架构的工程师是谁
- 查询这位工程师的历史经历,找到"分布式存储"相关的团队
- 把这两步的结果拼接起来回答
这就是典型的多跳推理:每一步的输入依赖上一步的输出。单次检索根本做不到,必须有推理链。
一、多跳推理的核心挑战
1.1 信息分散在多个文档
每个文档只包含部分信息,最终答案需要跨越至少2-3个文档才能拼完整。
这在传统向量检索里很难处理——你的查询向量只能找到和"查询语义最近"的文档,找不到"推理链上的中间节点"。
1.2 推理链的动态性
推理链不是预先固定的,而是根据每一步的检索结果动态确定的。第一步找到了甲,第二步才知道要去找甲的上级;如果第一步找到了乙,第二步就要找乙的团队。这种动态推理链设计起来很有挑战。
1.3 错误传播
在推理链中,第一步的错误会传播到所有后续步骤。如果第一跳就检索到了错误的实体,后面的推理全错。需要在每一跳都做质量检查。
二、多跳推理的三种架构模式
实践中,模式二(动态推理链)是最灵活也最常用的,下面着重讲它的实现。
三、基于ReAct框架的多跳推理实现
ReAct(Reason + Act)是多跳推理的主流框架,让LLM交替进行"推理"和"行动"(检索),直到得出答案。
3.1 ReAct Agent核心循环
@Service
public class MultiHopReasoningAgent {
@Autowired
private LLMClient llmClient;
@Autowired
private VectorSearchService vectorSearch;
@Autowired
private KnowledgeGraphService graphService;
private static final int MAX_HOPS = 5; // 最大跳数,防止无限循环
private static final String REACT_SYSTEM_PROMPT = """
你是一个多步推理助手。通过交替进行"推理"和"检索"来回答复杂问题。
可用工具:
1. search(query) - 在知识库中检索相关文档
2. graph_query(entity, relation) - 在知识图谱中查询实体关系
3. answer(response) - 当你已经有足够信息时,输出最终答案
格式规范:
Thought: 分析当前情况,决定下一步行动
Action: search("查询内容") 或 graph_query("实体", "关系") 或 answer("最终答案")
Observation: [工具返回结果]
... (重复以上步骤直到可以回答)
""";
/**
* 多跳推理主流程
*/
public MultiHopResult reason(String question) {
List<ReasoningStep> steps = new ArrayList<>();
StringBuilder conversationHistory = new StringBuilder();
conversationHistory.append("问题:").append(question).append("\n\n");
for (int hop = 0; hop < MAX_HOPS; hop++) {
// 让LLM决定下一步行动
String llmPrompt = buildReActPrompt(question, conversationHistory.toString());
String llmResponse = llmClient.chat(REACT_SYSTEM_PROMPT, llmPrompt);
conversationHistory.append(llmResponse).append("\n");
// 解析LLM的行动
ReActOutput output = parseReActOutput(llmResponse);
if (output.isAnswer()) {
// LLM认为可以回答了
return MultiHopResult.builder()
.finalAnswer(output.getAnswerContent())
.steps(steps)
.hopCount(hop + 1)
.build();
}
// 执行检索行动
String observation = executeAction(output);
ReasoningStep step = ReasoningStep.builder()
.hop(hop + 1)
.thought(output.getThought())
.action(output.getAction())
.observation(observation)
.build();
steps.add(step);
// 把观察结果加回对话历史
conversationHistory.append("Observation: ").append(observation).append("\n\n");
log.debug("第{}跳推理完成:action={}, observation片段={}",
hop + 1, output.getAction(),
observation.substring(0, Math.min(100, observation.length())));
}
// 达到最大跳数,强制生成答案
String forcedAnswer = generateForcedAnswer(question, conversationHistory.toString());
return MultiHopResult.builder()
.finalAnswer(forcedAnswer)
.steps(steps)
.hopCount(MAX_HOPS)
.reachedMaxHops(true)
.build();
}
/**
* 解析LLM输出的Thought/Action结构
*/
private ReActOutput parseReActOutput(String llmResponse) {
String thought = "";
String action = "";
String actionInput = "";
boolean isAnswer = false;
String answerContent = "";
String[] lines = llmResponse.split("\n");
for (String line : lines) {
if (line.startsWith("Thought:")) {
thought = line.substring("Thought:".length()).trim();
} else if (line.startsWith("Action:")) {
action = line.substring("Action:".length()).trim();
// 解析action类型和参数
if (action.startsWith("answer(")) {
isAnswer = true;
answerContent = extractActionParam(action, "answer");
}
}
}
return ReActOutput.builder()
.thought(thought)
.action(action)
.isAnswer(isAnswer)
.answerContent(answerContent)
.build();
}
/**
* 执行检索行动,返回观察结果
*/
private String executeAction(ReActOutput output) {
String action = output.getAction();
try {
if (action.startsWith("search(")) {
String query = extractActionParam(action, "search");
List<Document> docs = vectorSearch.search(query, 3);
if (docs.isEmpty()) {
return "未找到相关文档。";
}
return docs.stream()
.map(d -> "来源[" + d.getMetadata().get("source") + "]:" +
d.getContent().substring(0, Math.min(500, d.getContent().length())))
.collect(Collectors.joining("\n---\n"));
} else if (action.startsWith("graph_query(")) {
String params = extractActionParam(action, "graph_query");
String[] parts = params.split(",", 2);
if (parts.length == 2) {
String entity = parts[0].trim().replaceAll("\"", "");
String relation = parts[1].trim().replaceAll("\"", "");
return graphService.queryRelation(entity, relation);
}
}
} catch (Exception e) {
log.error("执行行动失败:{}", action, e);
return "执行检索时出现错误:" + e.getMessage();
}
return "无法识别的行动:" + action;
}
private String extractActionParam(String action, String actionName) {
String pattern = actionName + "\\(\"?([^\"\\)]+)\"?\\)";
java.util.regex.Matcher matcher =
java.util.regex.Pattern.compile(pattern).matcher(action);
return matcher.find() ? matcher.group(1) : "";
}
private String buildReActPrompt(String question, String history) {
return "请继续以下推理过程,解答问题。\n\n" + history;
}
private String generateForcedAnswer(String question, String history) {
return llmClient.chat(
"基于以下推理过程,尽力回答原始问题(即使信息不完整,也要给出最佳答案):\n\n" +
history + "\n原始问题:" + question
);
}
}四、推理链的可视化与调试
多跳推理的一大挑战是调试——当答案错误时,你需要知道是哪一跳出了问题。
@Service
public class ReasoningChainVisualizer {
/**
* 将推理链转换为可读的报告
*/
public ReasoningReport buildReport(MultiHopResult result) {
StringBuilder report = new StringBuilder();
report.append("=== 多跳推理过程报告 ===\n\n");
report.append("最终答案:").append(result.getFinalAnswer()).append("\n");
report.append("总跳数:").append(result.getHopCount()).append("\n");
if (result.isReachedMaxHops()) {
report.append("⚠️ 已达到最大跳数限制\n");
}
report.append("\n--- 推理步骤 ---\n\n");
for (ReasoningStep step : result.getSteps()) {
report.append(String.format("【第%d跳】\n", step.getHop()));
report.append("思考:").append(step.getThought()).append("\n");
report.append("行动:").append(step.getAction()).append("\n");
report.append("观察:").append(truncate(step.getObservation(), 200)).append("\n\n");
}
// 评估每一跳的质量
List<HopQualityScore> hopScores = evaluateHopQuality(result.getSteps());
report.append("--- 各跳质量评估 ---\n");
hopScores.forEach(score -> {
report.append(String.format("第%d跳:相关性=%.2f,信息增量=%.2f\n",
score.getHop(), score.getRelevance(), score.getInfoGain()));
});
return ReasoningReport.builder()
.summary(report.toString())
.hopScores(hopScores)
.bottleneckHop(findBottleneckHop(hopScores))
.build();
}
/**
* 找出推理链中质量最差的跳(瓶颈)
*/
private int findBottleneckHop(List<HopQualityScore> scores) {
return scores.stream()
.min(Comparator.comparingDouble(HopQualityScore::getRelevance))
.map(HopQualityScore::getHop)
.orElse(-1);
}
/**
* 评估每一跳的质量
*/
private List<HopQualityScore> evaluateHopQuality(List<ReasoningStep> steps) {
List<HopQualityScore> scores = new ArrayList<>();
for (int i = 0; i < steps.size(); i++) {
ReasoningStep step = steps.get(i);
// 相关性:行动与思考的语义一致性
double relevance = calculateRelevance(step.getThought(), step.getAction());
// 信息增量:观察结果是否带来新信息
double infoGain = i == 0 ? 1.0 :
calculateInfoGain(step.getObservation(), steps.get(i-1).getObservation());
scores.add(HopQualityScore.builder()
.hop(step.getHop())
.relevance(relevance)
.infoGain(infoGain)
.build());
}
return scores;
}
private double calculateRelevance(String thought, String action) {
// 简化实现:检查行动中是否包含思考的关键词
String[] thoughtWords = thought.split("\\s+");
long matchCount = Arrays.stream(thoughtWords)
.filter(w -> w.length() >= 2 && action.contains(w))
.count();
return Math.min(1.0, matchCount * 0.2);
}
private double calculateInfoGain(String current, String previous) {
// 简化实现:计算新内容占比
Set<String> currentWords = new HashSet<>(Arrays.asList(current.split("\\s+")));
Set<String> previousWords = new HashSet<>(Arrays.asList(previous.split("\\s+")));
long newWords = currentWords.stream()
.filter(w -> !previousWords.contains(w) && w.length() >= 2)
.count();
return currentWords.isEmpty() ? 0 : Math.min(1.0, (double) newWords / currentWords.size());
}
private String truncate(String text, int maxLen) {
return text.length() <= maxLen ? text : text.substring(0, maxLen) + "...";
}
}五、HotpotQA风格的双文档推理
有一类多跳问题专门需要两个文档的桥接信息——"A是什么的X,B是什么的Y,那X和Y的共同点是什么"这种结构。
@Service
public class BridgingReasoningService {
@Autowired
private VectorSearchService vectorSearch;
@Autowired
private LLMClient llmClient;
/**
* 桥接推理:找到连接两个概念的中间实体
*/
public BridgingResult bridgeQuery(String question) {
// 第一步:识别问题中的两个关键实体
EntityPair entities = extractEntityPair(question);
if (entities == null) {
return fallbackToGeneral(question);
}
// 第二步:分别检索两个实体的信息
List<Document> docsA = vectorSearch.search(entities.getEntityA(), 3);
List<Document> docsB = vectorSearch.search(entities.getEntityB(), 3);
// 第三步:寻找桥接信息(两个实体的共同关联)
String bridgingPrompt = String.format("""
我需要找到连接以下两个实体的桥梁信息。
实体A的相关信息:
%s
实体B的相关信息:
%s
问题:%s
请分析:
1. 实体A的关键属性/关联
2. 实体B的关键属性/关联
3. 两者之间的桥接信息
4. 基于以上,回答原始问题
""",
formatDocs(docsA),
formatDocs(docsB),
question);
String answer = llmClient.chat(bridgingPrompt);
return BridgingResult.builder()
.answer(answer)
.entityA(entities.getEntityA())
.entityB(entities.getEntityB())
.docsA(docsA)
.docsB(docsB)
.build();
}
private EntityPair extractEntityPair(String question) {
String extractPrompt = String.format("""
从以下问题中提取两个关键实体(人名、产品名、概念名等)。
如果找不到两个明确实体,返回null。
问题:%s
输出格式:{"entity_a": "实体1", "entity_b": "实体2"}
如果找不到,输出:null
""", question);
String response = llmClient.chat(extractPrompt).trim();
if ("null".equals(response)) return null;
try {
JsonNode node = objectMapper.readTree(response);
return EntityPair.of(
node.get("entity_a").asText(),
node.get("entity_b").asText()
);
} catch (Exception e) {
return null;
}
}
private String formatDocs(List<Document> docs) {
if (docs.isEmpty()) return "(未找到相关信息)";
return docs.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n---\n"));
}
private BridgingResult fallbackToGeneral(String question) {
List<Document> docs = vectorSearch.search(question, 5);
String answer = llmClient.chat(
"基于以下文档回答问题:\n" + formatDocs(docs) + "\n\n问题:" + question
);
return BridgingResult.builder().answer(answer).build();
}
@Autowired
private ObjectMapper objectMapper;
}六、推理记忆管理——防止上下文爆炸
多跳推理最头疼的工程问题是:随着跳数增加,对话历史越来越长,最终超过LLM的上下文窗口限制,或者成本暴增。
@Service
public class ReasoningMemoryManager {
private static final int MAX_CONTEXT_TOKENS = 8000;
private static final int SUMMARY_THRESHOLD = 5000;
@Autowired
private LLMClient llmClient;
@Autowired
private TokenCounter tokenCounter;
/**
* 管理推理过程中的上下文,防止超长
*/
public String manageContext(String newStep, String currentContext, String originalQuestion) {
String updatedContext = currentContext + "\n" + newStep;
int tokenCount = tokenCounter.count(updatedContext);
if (tokenCount <= SUMMARY_THRESHOLD) {
// 上下文还在可控范围,直接使用
return updatedContext;
}
// 超过阈值,进行摘要压缩
return compressContext(updatedContext, originalQuestion);
}
/**
* 压缩上下文:保留关键信息,去除冗余
*/
private String compressContext(String context, String originalQuestion) {
String compressionPrompt = String.format("""
以下是一个多步推理过程的历史记录,为了节省空间需要压缩。
原始问题:%s
推理历史:
%s
请将上述推理历史压缩,保留:
1. 已确定的关键事实
2. 当前推理状态(知道了什么,还差什么)
3. 每一步的核心发现
去除:
1. 完整的检索结果文本(只保留从中提取的关键信息)
2. 重复的信息
压缩后的推理摘要:
""", originalQuestion, context);
String compressed = llmClient.chat(compressionPrompt);
log.info("上下文压缩:原始{}字符 -> 压缩后{}字符",
context.length(), compressed.length());
return "[压缩摘要]\n" + compressed;
}
/**
* 提取推理过程中积累的事实集合
*/
public FactAccumulator accumulateFacts(List<ReasoningStep> steps) {
FactAccumulator accumulator = new FactAccumulator();
for (ReasoningStep step : steps) {
// 从观察中提取事实
List<String> facts = extractFacts(step.getObservation());
facts.forEach(f -> accumulator.addFact(step.getHop(), f));
}
return accumulator;
}
private List<String> extractFacts(String observation) {
if (observation.length() < 20) return Collections.emptyList();
String extractPrompt = String.format("""
从以下文本中提取1-3条关键事实,每条事实用一句话表述。
只提取明确陈述的事实,不要推断。
文本:%s
事实列表(每行一条):
""", observation.substring(0, Math.min(500, observation.length())));
String response = llmClient.chat(extractPrompt);
return Arrays.stream(response.split("\n"))
.filter(l -> !l.isBlank())
.map(String::trim)
.collect(Collectors.toList());
}
}七、多路径推理与答案聚合
对于高置信度要求的场景,可以同时运行多条推理路径,对答案进行投票:
@Service
public class MultiPathReasoningService {
@Autowired
private MultiHopReasoningAgent singlePathAgent;
@Autowired
private LLMClient llmClient;
/**
* 多路径推理:并行执行多条推理链,对最终答案投票
*/
public MultiPathResult reasonMultiPath(String question, int pathCount) {
// 并行执行多条推理路径(实际中用线程池)
List<CompletableFuture<MultiHopResult>> futures = new ArrayList<>();
for (int i = 0; i < pathCount; i++) {
final int pathIndex = i;
CompletableFuture<MultiHopResult> future = CompletableFuture.supplyAsync(() -> {
// 每条路径加入轻微随机性,让推理探索不同方向
return singlePathAgent.reason(question);
});
futures.add(future);
}
// 等待所有路径完成
List<MultiHopResult> pathResults = futures.stream()
.map(f -> {
try {
return f.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("推理路径执行失败", e);
return null;
}
})
.filter(r -> r != null)
.collect(Collectors.toList());
if (pathResults.isEmpty()) {
throw new RuntimeException("所有推理路径均失败");
}
// 对答案进行聚合
String aggregatedAnswer = aggregateAnswers(question, pathResults);
double consistency = calculateConsistency(pathResults);
return MultiPathResult.builder()
.finalAnswer(aggregatedAnswer)
.pathResults(pathResults)
.consistencyScore(consistency)
.reliable(consistency >= 0.6)
.build();
}
/**
* 对多条路径的答案进行聚合
*/
private String aggregateAnswers(String question, List<MultiHopResult> pathResults) {
if (pathResults.size() == 1) {
return pathResults.get(0).getFinalAnswer();
}
// 收集所有路径的答案
String answersText = IntStream.range(0, pathResults.size())
.mapToObj(i -> String.format("路径%d的答案:%s",
i + 1, pathResults.get(i).getFinalAnswer()))
.collect(Collectors.joining("\n\n"));
String aggregationPrompt = String.format("""
以下是对同一问题的多条推理路径分别得出的答案。
请综合分析这些答案,给出最终可靠的答案。
如果答案一致,直接采用;如果有分歧,分析原因并给出最合理的版本。
问题:%s
各路径答案:
%s
综合后的最终答案:
""", question, answersText);
return llmClient.chat(aggregationPrompt);
}
/**
* 计算各路径答案的一致性分数
*/
private double calculateConsistency(List<MultiHopResult> pathResults) {
if (pathResults.size() <= 1) return 1.0;
List<String> answers = pathResults.stream()
.map(MultiHopResult::getFinalAnswer)
.collect(Collectors.toList());
// 简化:用词汇重叠度估算一致性
String firstAnswer = answers.get(0);
Set<String> firstWords = new HashSet<>(Arrays.asList(firstAnswer.split("\\s+")));
double totalOverlap = answers.subList(1, answers.size()).stream()
.mapToDouble(answer -> {
Set<String> words = new HashSet<>(Arrays.asList(answer.split("\\s+")));
long common = firstWords.stream().filter(words::contains).count();
return firstWords.isEmpty() ? 0 : (double) common / firstWords.size();
})
.average()
.orElse(0);
return totalOverlap;
}
}八、实际案例:企业知识库的多跳问答
给一个完整的端到端例子,展示多跳推理在实际问答中的表现:
用户提问:"我们产品团队用的那个分布式消息队列,是哪个开源项目的衍生版本?它的主要维护者现在在哪家公司?"
多跳推理过程:
第1跳:
Thought: 需要先找到产品团队使用的分布式消息队列是什么
Action: search("产品团队 分布式消息队列")
Observation: 找到文档:"我们内部使用的消息队列MQ-Pro,基于Apache Kafka 2.8定制开发..."
第2跳:
Thought: 消息队列是Kafka的衍生版,现在需要找Kafka的主要维护者
Action: search("Apache Kafka 主要维护者 核心开发者")
Observation: 找到文档:"Jay Kreps是Apache Kafka的联合创始人之一,目前担任Confluent CEO..."
第3跳:
Thought: 已知Jay Kreps是Kafka的主要维护者,他现在在Confluent。可以回答了。
Action: answer("产品团队使用的分布式消息队列是MQ-Pro,它基于Apache Kafka定制开发。
Apache Kafka的主要维护者是Jay Kreps,他目前在Confluent公司担任CEO。")最终答案:产品团队使用的分布式消息队列MQ-Pro是基于Apache Kafka定制开发的衍生版本。Apache Kafka的联合创始人和主要维护者之一Jay Kreps,目前是Confluent公司的CEO。
整个过程3跳,准确且有依据。
九、多跳推理的适用场景与局限
适用场景:
- 实体关系链查询(A的B的C是谁)
- 条件汇聚查询(同时满足X和Y条件的Z)
- 因果链分析(A导致了B,B又如何影响了C)
局限:
- 延迟高:每跳都需要一次LLM调用,3跳下来延迟可能超过5秒
- 成本高:Token消耗是单跳的3-5倍
- 错误传播:第一跳错了后面全错,需要回溯机制
对于延迟敏感的场景(聊天类产品),多跳推理往往不可接受。更适合用在异步场景,比如定期分析报告、离线知识图谱构建等。
下一篇讲纠错RAG(Corrective RAG),当检索质量差时,系统能自动检测并修正,这是多跳推理之外另一个提升RAG鲁棒性的重要思路。
