第2385篇:问题分解RAG——把复杂问题拆解后分别检索再合并答案
大约 7 分钟
第2385篇:问题分解RAG——把复杂问题拆解后分别检索再合并答案
适读人群:需要处理复杂综合性问题的RAG系统工程师 | 阅读时长:约18分钟 | 核心价值:掌握问题分解策略的工程实现,解决单次检索无法覆盖多维度问题的核心挑战
有段时间我们的RAG系统里有个让人头疼的问题类型:用户的综合性问题。
比如:"我们Q3的销售额、毛利率和新客户获取数,跟去年同期相比,有什么亮点和问题?"
这个问题实际上包含了多个子问题:Q3销售额是多少?Q3毛利率是多少?Q3新客户数是多少?去年同期各项数据是什么?对比分析。
单次向量检索只能拿到一两个相关文档,不可能同时把所有维度的数据都检索到。结果AI的回答要么不完整(只回答了部分维度),要么胡编(找不到的数据就编造)。
问题分解是解决这类问题的关键技术。
问题分解的原则
/**
* 什么样的问题需要分解?
*
* 需要分解的特征:
* 1. 问题包含多个明确的数据点或维度(如:销售额、毛利率、新客户数)
* 2. 问题需要对比(如:Q3 vs 去年同期)
* 3. 问题需要综合推理(如:找出亮点和问题)
* 4. 不同部分的信息来自不同文档
*
* 不需要分解的特征:
* 1. 单一的事实查询(如:某产品的价格是多少?)
* 2. 概念解释类(如:什么是毛利率?)
* 3. 操作步骤类(如:如何申请报销?)
*
* 关键原则:
* 分解不是目的,分解是为了让每个子问题都能被检索有效回答
* 如果一个问题一次检索能回答,不要为了分解而分解
*/核心实现:分解 + 并行检索 + 合并
@Service
public class QueryDecompositionRAG {
private final ChatClient chatClient;
private final VectorStore vectorStore;
private final ExecutorService parallelExecutor;
/**
* 问题分解RAG的完整流程
*/
public DecompositionResult process(String originalQuestion) {
long startTime = System.currentTimeMillis();
// 第一步:判断是否需要分解
DecompositionPlan plan = planDecomposition(originalQuestion);
if (!plan.isNeedsDecomposition()) {
// 简单问题直接检索
List<Document> docs = vectorStore.similaritySearch(
SearchRequest.query(originalQuestion).withTopK(5)
);
String answer = generateAnswer(originalQuestion, docs);
return DecompositionResult.simple(answer, System.currentTimeMillis() - startTime);
}
// 第二步:并行检索所有子问题
Map<String, SubQuestionResult> subResults =
retrieveSubQuestionsInParallel(plan.getSubQuestions());
// 第三步:合并生成最终答案
String finalAnswer = synthesizeAnswer(originalQuestion, plan, subResults);
return DecompositionResult.complex(
finalAnswer,
plan.getSubQuestions().size(),
System.currentTimeMillis() - startTime
);
}
/**
* 规划分解方案
*/
private DecompositionPlan planDecomposition(String question) {
String prompt = """
分析以下问题,判断是否需要分解为多个子问题来检索。
问题:%s
判断标准:
1. 问题是否包含多个独立的信息需求?
2. 这些信息是否可能在不同文档中?
3. 直接搜索是否足够?
如果需要分解,列出子问题(每个子问题应该能被单次检索有效回答)。
输出JSON:
{
"needs_decomposition": true/false,
"reason": "判断理由",
"sub_questions": [
{
"id": "q1",
"question": "子问题文本",
"search_query": "最优检索查询词(可以和问题不同)",
"data_type": "数值/文本/比较/分析"
}
]
}
""".formatted(question);
String response = chatClient.prompt(prompt).call().content();
return parseDecompositionPlan(response);
}
/**
* 并行检索所有子问题
* 子问题之间通常没有依赖关系,可以并行处理
*/
private Map<String, SubQuestionResult> retrieveSubQuestionsInParallel(
List<SubQuestion> subQuestions) {
Map<String, CompletableFuture<SubQuestionResult>> futures = new HashMap<>();
for (SubQuestion sq : subQuestions) {
CompletableFuture<SubQuestionResult> future = CompletableFuture.supplyAsync(() -> {
try {
// 使用子问题优化的搜索查询
String searchQuery = sq.getSearchQuery() != null
? sq.getSearchQuery()
: sq.getQuestion();
List<Document> docs = vectorStore.similaritySearch(
SearchRequest.query(searchQuery).withTopK(3)
);
// 为每个子问题单独生成回答
String subAnswer = generateSubAnswer(sq.getQuestion(), docs);
return SubQuestionResult.builder()
.questionId(sq.getId())
.question(sq.getQuestion())
.retrievedDocs(docs)
.answer(subAnswer)
.confidence(assessConfidence(docs, subAnswer))
.build();
} catch (Exception e) {
log.error("Failed to retrieve sub-question: {}", sq.getQuestion(), e);
return SubQuestionResult.failed(sq.getId(), sq.getQuestion(), e.getMessage());
}
}, parallelExecutor);
futures.put(sq.getId(), future);
}
// 等待所有子问题完成(最多10秒)
Map<String, SubQuestionResult> results = new HashMap<>();
for (Map.Entry<String, CompletableFuture<SubQuestionResult>> entry : futures.entrySet()) {
try {
SubQuestionResult result = entry.getValue().get(10, TimeUnit.SECONDS);
results.put(entry.getKey(), result);
} catch (TimeoutException e) {
results.put(entry.getKey(),
SubQuestionResult.failed(entry.getKey(), "未知", "检索超时"));
} catch (Exception e) {
results.put(entry.getKey(),
SubQuestionResult.failed(entry.getKey(), "未知", e.getMessage()));
}
}
return results;
}
/**
* 为单个子问题生成简洁的回答
* 注意:这里只生成子答案,不是最终答案
*/
private String generateSubAnswer(String subQuestion, List<Document> docs) {
if (docs.isEmpty()) {
return "未找到相关信息";
}
String context = docs.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n\n"));
String prompt = """
基于以下信息,简洁地回答子问题。
只需要回答这个具体的子问题,不需要综合分析。
%s
子问题:%s
简洁回答(如果信息不足,说明"信息不足"):
""".formatted(context, subQuestion);
return chatClient.prompt(prompt).call().content().trim();
}
/**
* 综合所有子问题的答案,生成最终回答
*/
private String synthesizeAnswer(String originalQuestion, DecompositionPlan plan,
Map<String, SubQuestionResult> subResults) {
// 构建子问题答案的摘要
StringBuilder subAnswersSummary = new StringBuilder();
for (SubQuestion sq : plan.getSubQuestions()) {
SubQuestionResult result = subResults.get(sq.getId());
if (result != null) {
subAnswersSummary.append(String.format(
"关于「%s」:\n%s\n\n",
sq.getQuestion(),
result.isSuccess() ? result.getAnswer() : "(信息获取失败)"
));
}
}
String prompt = """
用户问了一个综合性问题,我已经分别查找了各个维度的信息。
请基于这些信息,给出完整、综合的回答。
用户的原始问题:%s
各维度的信息:
%s
请综合以上信息,给出完整的回答。
如果某些维度信息不足,请在回答中说明。
""".formatted(originalQuestion, subAnswersSummary.toString());
return chatClient.prompt(prompt).call().content();
}
}子问题的依赖处理
有些子问题有依赖关系:第二个子问题的内容依赖第一个子问题的结果。
@Service
public class DependencyAwareDecomposition {
/**
* 处理有依赖关系的子问题
*
* 场景:问题"销售额同比增长了多少"
* 子问题:q1=今年销售额,q2=去年销售额,q3=计算增长率(依赖q1和q2)
*
* q1和q2可以并行检索,q3必须等q1和q2都完成
*/
public Map<String, SubQuestionResult> executeWithDependencies(
List<SubQuestion> subQuestions) {
Map<String, SubQuestionResult> completedResults = new HashMap<>();
// 按依赖层级分组
List<List<SubQuestion>> executionLayers = buildExecutionLayers(subQuestions);
for (List<SubQuestion> layer : executionLayers) {
// 同一层的子问题并行执行
Map<String, SubQuestionResult> layerResults =
retrieveSubQuestionsInParallel(layer);
completedResults.putAll(layerResults);
}
return completedResults;
}
/**
* 构建执行层级(拓扑排序的分层版本)
* 没有依赖的问题在第一层,依赖第一层的在第二层,以此类推
*/
private List<List<SubQuestion>> buildExecutionLayers(List<SubQuestion> questions) {
Map<String, SubQuestion> questionMap = questions.stream()
.collect(Collectors.toMap(SubQuestion::getId, q -> q));
List<List<SubQuestion>> layers = new ArrayList<>();
Set<String> completed = new HashSet<>();
Set<String> remaining = new HashSet<>(questionMap.keySet());
while (!remaining.isEmpty()) {
// 找到当前所有依赖已完成的问题
List<SubQuestion> currentLayer = remaining.stream()
.filter(id -> completed.containsAll(questionMap.get(id).getDependsOn()))
.map(questionMap::get)
.collect(Collectors.toList());
if (currentLayer.isEmpty()) {
throw new CircularDependencyException("Sub-questions have circular dependencies");
}
layers.add(currentLayer);
currentLayer.forEach(q -> {
completed.add(q.getId());
remaining.remove(q.getId());
});
}
return layers;
}
}答案质量评估
@Service
public class DecompositionQualityEvaluator {
/**
* 评估分解后的答案质量
*
* 主要评估:
* 1. 覆盖率:原始问题的各个方面是否都被回答了
* 2. 一致性:各子答案之间是否有矛盾
* 3. 完整性:最终综合答案是否完整
*/
public QualityScore evaluate(String originalQuestion, DecompositionResult result) {
String prompt = """
请评估以下问答的质量:
原始问题:%s
AI的回答:%s
评估维度(每项0-10分):
1. 覆盖率:是否回答了问题的所有方面?
2. 准确性:答案内容是否准确?
3. 完整性:答案是否完整?
输出JSON:
{
"coverage_score": 8,
"accuracy_score": 7,
"completeness_score": 9,
"missing_aspects": ["未覆盖的方面"],
"overall_assessment": "总体评价"
}
""".formatted(originalQuestion, result.getFinalAnswer());
String response = chatClient.prompt(prompt).call().content();
return parseQualityScore(response);
}
}性能和成本权衡
问题分解会增加LLM调用次数,成本是普通RAG的3-5倍。必须做好路由:
@Service
public class DecompositionRouter {
/**
* 判断是否值得进行问题分解
*
* 判断标准:
* - 问题确实需要多维度信息
* - 用户对这类问题的体验反馈差
* - 问题的答案有实际价值(不是随意问的)
*/
public RoutingDecision route(String question) {
// 快速规则判断
if (containsMultipleDataDimensions(question)) {
return RoutingDecision.DECOMPOSE;
}
if (isSimpleFactualQuery(question)) {
return RoutingDecision.DIRECT;
}
// 用小模型快速判断(比大模型便宜很多)
boolean needsDecomposition = quickClassify(question);
return needsDecomposition ? RoutingDecision.DECOMPOSE : RoutingDecision.DIRECT;
}
private boolean containsMultipleDataDimensions(String question) {
// 包含顿号或"和"连接的多个数据项
long dimensionCount = Arrays.stream(question.split("[和、及]"))
.filter(s -> s.trim().length() > 2)
.count();
return dimensionCount >= 3;
}
}通过问题分解,我们把复杂综合性问题的回答完整度从不足50%提升到了85%以上。关键在于分解的粒度——分解得太细会增加太多开销,分解得太粗效果不好。通过大量实际测试找到适合自己业务场景的分解策略,是最重要的工程工作。
