LangChain4j 的 RAG 流水线定制——从默认实现到自定义各阶段
LangChain4j 的 RAG 流水线定制——从默认实现到自定义各阶段
有个同事跑来问我:他用 LangChain4j 做了一个内部知识库问答,嵌入和检索用的都是默认配置,但效果一直不太好——相关文档明明在库里,但就是检索不出来,或者检索出来但答案质量很低。
我问他:你知道 LangChain4j 的 RAG 流水线里,从用户输入到最终答案,经历了哪些阶段吗?每个阶段用的是什么策略?
他说不知道。
这就是问题所在。用默认配置能跑通 demo,但要把效果调好,你必须知道流水线里每个环节在做什么,能改什么。
这篇文章把 LangChain4j 的 RAG 管道从头到尾拆开讲,然后展示怎么把每个阶段换成自己的实现。
LangChain4j RAG 的整体架构
LangChain4j 从 0.27 版本开始,引入了 RetrievalAugmentor 接口体系,把 RAG 流水线拆成了可以单独替换的阶段。
默认的 RAG 流水线大概长这样:
四个核心接口:
- QueryTransformer:把用户的原始查询转换/扩展成更好的检索查询
- QueryRouter:决定把查询路由到哪个(或哪些)
ContentRetriever - ContentRetriever:实际执行检索,返回相关文档
- ContentInjector:把检索到的内容注入到 Prompt 中
每个接口都有默认实现,也可以完全自定义。
依赖配置
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-spring-boot-starter</artifactId>
<version>0.35.0</version>
</dependency>
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-pgvector</artifactId>
<version>0.35.0</version>
</dependency>
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-elasticsearch</artifactId>
<version>0.35.0</version>
</dependency>阶段一:QueryTransformer 定制
默认的 QueryTransformer 是 DefaultQueryTransformer,它什么都不做,直接把原始查询传下去。
这是第一个影响检索质量的地方。用户的提问往往很口语化,不适合直接用来做向量检索。
自定义 QueryTransformer:查询重写
@Component
@Slf4j
public class LlmQueryRewriter implements QueryTransformer {
private final ChatLanguageModel llm;
private static final String REWRITE_PROMPT = """
你是一个查询优化专家。将用户的口语化问题转换为更适合文档检索的查询语句。
规则:
1. 去掉口语化的表达("请问"、"能告诉我"等)
2. 提取核心关键词
3. 如果问题模糊,给出更精确的表述
4. 只输出重写后的查询,不要有任何解释
原始查询:%s
重写后的查询:
""";
@Override
public Collection<Query> transform(Query query) {
String originalQuery = query.text();
try {
String rewrittenText = llm.generate(
REWRITE_PROMPT.formatted(originalQuery));
log.debug("查询重写:'{}' -> '{}'", originalQuery, rewrittenText.trim());
// 返回重写后的查询(保留原始查询作为备选)
return Arrays.asList(
Query.from(rewrittenText.trim()),
query // 也保留原始查询,防止重写失去重要信息
);
} catch (Exception e) {
log.warn("查询重写失败,使用原始查询", e);
return Collections.singletonList(query);
}
}
}更进一步:多查询扩展
@Component
@Slf4j
public class MultiQueryExpander implements QueryTransformer {
private final ChatLanguageModel llm;
private static final String EXPAND_PROMPT = """
针对以下问题,生成 3 个不同角度的查询变体,以提高检索覆盖率。
每个变体一行,不要有序号或其他格式。
原始问题:%s
查询变体:
""";
@Override
public Collection<Query> transform(Query query) {
String originalQuery = query.text();
try {
String expandedText = llm.generate(EXPAND_PROMPT.formatted(originalQuery));
List<Query> queries = new ArrayList<>();
queries.add(query); // 保留原始查询
// 解析生成的查询变体
Arrays.stream(expandedText.split("\n"))
.map(String::trim)
.filter(line -> !line.isEmpty())
.limit(3)
.forEach(variant -> queries.add(Query.from(variant)));
log.debug("查询扩展:1 -> {} 个查询变体", queries.size());
return queries;
} catch (Exception e) {
log.warn("查询扩展失败", e);
return Collections.singletonList(query);
}
}
}阶段二:QueryRouter 定制
QueryRouter 决定把查询发给哪个 ContentRetriever。默认实现是把查询发给所有注册的 Retriever。
在有多个知识库(产品文档、技术规范、FAQ 等)的场景下,智能路由很有价值:
@Component
@Slf4j
public class IntelligentQueryRouter implements QueryRouter {
private final ChatLanguageModel llm;
private final Map<String, ContentRetriever> retrievers;
// 知识库描述(用于让 LLM 决定路由)
private static final Map<String, String> RETRIEVER_DESCRIPTIONS = Map.of(
"product_docs", "产品文档:包含产品功能说明、使用指南、API 文档",
"technical_specs", "技术规范:包含系统架构、接口规范、数据库设计文档",
"faq", "常见问题:包含用户常见问题和答案",
"changelog", "更新日志:包含版本更新记录和 breaking changes"
);
private static final String ROUTING_PROMPT = """
根据用户的问题,判断应该从哪些知识库检索信息。
可用知识库:
%s
用户问题:%s
请选择最相关的知识库(可以多选),只输出知识库名称,用逗号分隔,不要有其他解释。
""";
@Override
public Collection<ContentRetriever> route(Query query) {
String knowledgeBaseDescriptions = RETRIEVER_DESCRIPTIONS.entrySet().stream()
.map(e -> "- " + e.getKey() + ": " + e.getValue())
.collect(Collectors.joining("\n"));
try {
String routingDecision = llm.generate(
ROUTING_PROMPT.formatted(knowledgeBaseDescriptions, query.text()));
Set<String> selectedRetrievers = Arrays.stream(routingDecision.split(","))
.map(String::trim)
.filter(retrievers::containsKey)
.collect(Collectors.toSet());
if (selectedRetrievers.isEmpty()) {
log.warn("路由未选择任何知识库,回退到全量检索");
return retrievers.values();
}
log.info("查询路由:'{}' -> {}", query.text(), selectedRetrievers);
return selectedRetrievers.stream()
.map(retrievers::get)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("查询路由失败,回退到全量检索", e);
return retrievers.values();
}
}
}阶段三:ContentRetriever 定制
这是 RAG 质量的核心所在。LangChain4j 提供了 EmbeddingStoreContentRetriever(向量检索),但纯向量检索有明显的局限性:对精确词汇匹配不友好(比如代码、专有名词、产品编号)。
生产级的检索通常需要混合检索(Hybrid Retrieval):向量检索 + BM25/全文检索,然后用 RRF(Reciprocal Rank Fusion)融合结果。
@Component
@Slf4j
public class HybridContentRetriever implements ContentRetriever {
private final EmbeddingStore<TextSegment> embeddingStore;
private final EmbeddingModel embeddingModel;
private final ElasticsearchClient esClient;
private static final int TOP_K = 5;
private static final double VECTOR_WEIGHT = 0.6;
private static final double BM25_WEIGHT = 0.4;
@Override
public List<Content> retrieve(Query query) {
String queryText = query.text();
// 1. 向量检索
List<ScoredResult> vectorResults = vectorSearch(queryText);
// 2. BM25 全文检索(基于 Elasticsearch)
List<ScoredResult> bm25Results = bm25Search(queryText);
// 3. RRF 融合
List<Content> mergedResults = rrfMerge(vectorResults, bm25Results, TOP_K);
log.debug("混合检索完成:向量 {} 条,BM25 {} 条,融合后 {} 条",
vectorResults.size(), bm25Results.size(), mergedResults.size());
return mergedResults;
}
private List<ScoredResult> vectorSearch(String query) {
Embedding queryEmbedding = embeddingModel.embed(query).content();
EmbeddingSearchRequest request = EmbeddingSearchRequest.builder()
.queryEmbedding(queryEmbedding)
.maxResults(TOP_K * 2) // 多检索一些,留给 RRF 选择
.minScore(0.5)
.build();
return embeddingStore.search(request).matches().stream()
.map(match -> new ScoredResult(
match.embedded().text(),
match.embedded().metadata(),
match.score()))
.collect(Collectors.toList());
}
private List<ScoredResult> bm25Search(String query) {
try {
SearchResponse<Map> response = esClient.search(s -> s
.index("knowledge_base")
.query(q -> q
.match(m -> m
.field("content")
.query(query)))
.size(TOP_K * 2),
Map.class
);
return response.hits().hits().stream()
.filter(hit -> hit.source() != null)
.map(hit -> new ScoredResult(
(String) hit.source().get("content"),
Map.of("source", hit.source().getOrDefault("source", "")),
hit.score() != null ? hit.score() : 0.0))
.collect(Collectors.toList());
} catch (Exception e) {
log.warn("BM25 检索失败,降级为纯向量检索", e);
return Collections.emptyList();
}
}
/**
* Reciprocal Rank Fusion 算法
* RRF(d) = sum(1 / (k + rank(d, list))) for each result list
* k 通常取 60
*/
private List<Content> rrfMerge(List<ScoredResult> vectorResults,
List<ScoredResult> bm25Results,
int topK) {
int k = 60;
Map<String, Double> rrfScores = new HashMap<>();
Map<String, String> contentMap = new HashMap<>();
// 计算向量检索的 RRF 分数
for (int rank = 0; rank < vectorResults.size(); rank++) {
ScoredResult result = vectorResults.get(rank);
String key = result.content();
contentMap.put(key, result.content());
rrfScores.merge(key, VECTOR_WEIGHT / (k + rank + 1), Double::sum);
}
// 计算 BM25 检索的 RRF 分数
for (int rank = 0; rank < bm25Results.size(); rank++) {
ScoredResult result = bm25Results.get(rank);
String key = result.content();
contentMap.put(key, result.content());
rrfScores.merge(key, BM25_WEIGHT / (k + rank + 1), Double::sum);
}
// 按 RRF 分数排序,取 Top-K
return rrfScores.entrySet().stream()
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.limit(topK)
.map(entry -> Content.from(TextSegment.from(contentMap.get(entry.getKey()))))
.collect(Collectors.toList());
}
@Data
@AllArgsConstructor
private static class ScoredResult {
private String content;
private Map<String, Object> metadata;
private double score;
}
}检索后处理:重排序(Reranking)
检索出来的文档,还需要用一个 Reranker 模型重新打分,进一步提升相关性:
@Component
@Slf4j
public class RerankingContentRetriever implements ContentRetriever {
private final HybridContentRetriever hybridRetriever;
private final CrossEncoderModel rerankModel; // 交叉编码器,专门用于重排序
@Override
public List<Content> retrieve(Query query) {
// 1. 先用混合检索获取候选文档(多取一些)
List<Content> candidates = hybridRetriever.retrieve(query);
// 2. 用 Reranker 重新评分
List<RankedContent> rankedContents = candidates.stream()
.map(content -> {
double rerankScore = rerankModel.score(
query.text(),
content.textSegment().text());
return new RankedContent(content, rerankScore);
})
.collect(Collectors.toList());
// 3. 按 Reranker 分数重新排序
return rankedContents.stream()
.sorted(Comparator.comparingDouble(RankedContent::score).reversed())
.limit(5) // 最终只取 Top 5
.map(RankedContent::content)
.collect(Collectors.toList());
}
@Data
@AllArgsConstructor
private static class RankedContent {
private Content content;
private double score;
}
}阶段四:ContentInjector 定制
ContentInjector 决定如何把检索到的内容注入到 Prompt 中。默认实现把所有内容拼在一起,格式比较简单。
定制 ContentInjector 可以做更精细的格式控制:
@Component
public class StructuredContentInjector implements ContentInjector {
private static final String CONTEXT_TEMPLATE = """
以下是从知识库检索到的相关内容,请基于这些内容回答用户问题。
如果检索内容不足以回答问题,请明确说明。
===检索结果===
%s
===检索结果结束===
""";
private static final String SINGLE_CONTENT_TEMPLATE = """
【来源 %d】(相关性:%s)
%s
""";
@Override
public UserMessage inject(List<Content> contents, UserMessage userMessage) {
if (contents.isEmpty()) {
// 没有检索到内容,注入提示
String noContextMessage = "【注意:未检索到相关知识库内容,以下回答基于通用知识】\n\n" +
userMessage.singleText();
return UserMessage.from(noContextMessage);
}
// 格式化检索内容
StringBuilder contextBuilder = new StringBuilder();
for (int i = 0; i < contents.size(); i++) {
Content content = contents.get(i);
String relevanceLabel = getRelevanceLabel(i, contents.size());
contextBuilder.append(SINGLE_CONTENT_TEMPLATE.formatted(
i + 1,
relevanceLabel,
content.textSegment().text().trim()
));
// 如果有来源元数据,附加来源信息
String source = (String) content.textSegment().metadata()
.getString("source");
if (source != null) {
contextBuilder.append("(来源:").append(source).append(")\n\n");
}
}
String fullPrompt = CONTEXT_TEMPLATE.formatted(contextBuilder.toString()) +
"用户问题:" + userMessage.singleText();
return UserMessage.from(fullPrompt);
}
private String getRelevanceLabel(int rank, int total) {
double ratio = (double)(total - rank) / total;
if (ratio > 0.7) return "高";
if (ratio > 0.3) return "中";
return "低";
}
}组装完整的自定义 RAG 流水线
把所有自定义组件组装起来:
@Configuration
public class CustomRagConfiguration {
@Bean
public RetrievalAugmentor customRetrievalAugmentor(
LlmQueryRewriter queryRewriter,
IntelligentQueryRouter queryRouter,
RerankingContentRetriever rerankingRetriever,
StructuredContentInjector contentInjector) {
return DefaultRetrievalAugmentor.builder()
.queryTransformer(queryRewriter)
.queryRouter(queryRouter)
// 注意:当使用自定义 queryRouter 时,
// queryRouter 内部已经包含了 retriever 逻辑
// 这里的 contentInjector 作用于最终聚合的结果
.contentInjector(contentInjector)
.build();
}
@Bean
@AiService
public KnowledgeBaseAssistant knowledgeBaseAssistant(
ChatLanguageModel chatLanguageModel,
RetrievalAugmentor retrievalAugmentor) {
return AiServices.builder(KnowledgeBaseAssistant.class)
.chatLanguageModel(chatLanguageModel)
.retrievalAugmentor(retrievalAugmentor)
.chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
.build();
}
}
@AiService
public interface KnowledgeBaseAssistant {
@SystemMessage("""
你是一名专业的知识库问答助手。
请基于提供的知识库内容回答问题。
如果知识库中没有相关信息,请诚实说明,不要编造内容。
回答要简洁、准确,必要时引用来源。
""")
String answer(@MemoryId String userId, @UserMessage String question);
}评估 RAG 流水线的质量
自定义了这么多组件,怎么知道效果好不好?需要一套评估框架:
@Service
@Slf4j
public class RagEvaluationService {
private final KnowledgeBaseAssistant assistant;
private final ChatLanguageModel evaluatorLlm;
/**
* 评估检索质量(Context Recall + Context Precision)
*/
public RetrievalMetrics evaluateRetrieval(List<EvalCase> evalCases) {
double totalRecall = 0;
double totalPrecision = 0;
for (EvalCase evalCase : evalCases) {
// 获取检索到的文档(需要在 Retriever 层面加上日志收集)
List<String> retrievedDocs = getRetrievedDocs(evalCase.getQuestion());
// Context Recall: 正确答案需要的信息有多少被检索到了
double recall = calculateRecall(retrievedDocs, evalCase.getGroundTruth());
// Context Precision: 检索到的文档有多少是真正相关的
double precision = calculatePrecision(retrievedDocs, evalCase.getGroundTruth());
totalRecall += recall;
totalPrecision += precision;
}
return RetrievalMetrics.builder()
.averageRecall(totalRecall / evalCases.size())
.averagePrecision(totalPrecision / evalCases.size())
.build();
}
/**
* 用 LLM 作为 Judge 评估生成质量(Faithfulness + Answer Relevance)
*/
public GenerationMetrics evaluateGeneration(List<EvalCase> evalCases) {
double totalFaithfulness = 0;
double totalRelevance = 0;
for (EvalCase evalCase : evalCases) {
String generatedAnswer = assistant.answer("eval", evalCase.getQuestion());
// Faithfulness: 生成的答案是否基于检索内容,没有幻觉
double faithfulness = evaluateFaithfulness(
evalCase.getQuestion(),
generatedAnswer,
evalCase.getGroundTruth());
// Answer Relevance: 答案是否回答了用户的问题
double relevance = evaluateAnswerRelevance(
evalCase.getQuestion(),
generatedAnswer);
totalFaithfulness += faithfulness;
totalRelevance += relevance;
log.info("问题:{},忠实度:{:.2f},相关性:{:.2f}",
evalCase.getQuestion(), faithfulness, relevance);
}
return GenerationMetrics.builder()
.averageFaithfulness(totalFaithfulness / evalCases.size())
.averageAnswerRelevance(totalRelevance / evalCases.size())
.build();
}
private double evaluateFaithfulness(String question, String answer, String context) {
String evalPrompt = """
请评估以下回答是否忠实于给定的上下文(不编造信息)。
问题:%s
上下文:%s
回答:%s
请给出 0-1 之间的忠实度分数(1 = 完全基于上下文,0 = 严重偏离或编造)。
只输出一个数字,不要有任何解释。
""".formatted(question, context, answer);
try {
String scoreStr = evaluatorLlm.generate(evalPrompt).trim();
return Double.parseDouble(scoreStr);
} catch (Exception e) {
return 0.5;
}
}
private double evaluateAnswerRelevance(String question, String answer) {
String evalPrompt = """
请评估以下回答是否回答了用户的问题。
问题:%s
回答:%s
请给出 0-1 之间的相关性分数(1 = 完全回答了问题,0 = 完全没有回答)。
只输出一个数字,不要有任何解释。
""".formatted(question, answer);
try {
String scoreStr = evaluatorLlm.generate(evalPrompt).trim();
return Double.parseDouble(scoreStr);
} catch (Exception e) {
return 0.5;
}
}
}各阶段优化对效果的影响
根据我的实际测试(内部知识库,约 5000 篇文档),各阶段优化对最终答案质量的提升:
| 优化项 | 基线准确率 | 优化后准确率 | 提升 |
|---|---|---|---|
| 仅向量检索(基线) | 62% | - | - |
| + 查询重写 | 62% | 71% | +9% |
| + 混合检索 | 71% | 79% | +8% |
| + Reranker | 79% | 86% | +7% |
| + 智能路由 | 86% | 88% | +2% |
| + 结构化注入 | 88% | 89% | +1% |
查询重写和混合检索对效果提升最大,Reranker 也有明显效果。智能路由和结构化注入在这个数据集上提升有限,但在多知识库场景下效果会更明显。
总结
LangChain4j 的 RAG 流水线设计得很好——每个阶段都是可替换的接口,但大多数人只用了默认实现。
真正好的 RAG 系统需要对每个阶段都做定制:
- QueryTransformer:查询重写/扩展,对口语化输入效果最明显
- QueryRouter:多知识库场景下的智能路由
- ContentRetriever:混合检索 + Reranker,这是效果提升最大的地方
- ContentInjector:结构化注入,提升模型利用检索内容的能力
最后,一定要有评估框架——不做评估,你不知道优化是否有效,也不知道哪个环节是瓶颈。
