第2081篇:LangChain4j的ContentRetriever扩展——自定义检索策略
2026/4/30大约 7 分钟
第2081篇:LangChain4j的ContentRetriever扩展——自定义检索策略
适读人群:正在基于LangChain4j构建RAG系统的Java工程师 | 阅读时长:约19分钟 | 核心价值:深入理解ContentRetriever接口,实现混合检索、多数据源检索等高级场景
RAG系统的检索层是最需要定制化的地方。内置的EmbeddingStoreContentRetriever对于大多数场景够用,但一旦遇到特殊需求——多知识库融合、时效性过滤、实时数据源——就需要自己扩展。
这篇文章讲LangChain4j的ContentRetriever接口,从设计到实现,全面覆盖扩展场景。
ContentRetriever的设计
/**
* ContentRetriever接口——RAG检索层的核心抽象
*
* 一句话理解:给定用户查询,返回相关的上下文内容
*/
public interface ContentRetriever {
/**
* 检索与给定查询相关的内容
* @param query 用户的查询对象(包含查询文本和元数据)
* @return 检索到的内容列表
*/
List<Content> retrieve(Query query);
}
/**
* Query对象:不仅包含查询文本,还有元数据(用户信息、上下文等)
*/
public class Query {
private String text; // 查询文本
private Map<String, Object> metadata; // 元数据(可以包含用户ID、会话ID等)
// 方便的工厂方法
public static Query from(String text) { ... }
public static Query from(String text, Map<String, Object> metadata) { ... }
}
/**
* Content对象:检索到的内容
*/
public class Content {
private TextSegment textSegment; // 文本内容 + 元数据
public static Content from(String text) { ... }
public static Content from(TextSegment textSegment) { ... }
}实现一:多知识库联合检索
/**
* 从多个知识库并行检索,合并结果
* 适用场景:产品文档+FAQ+用户手册等多个独立知识库
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class MultiSourceContentRetriever implements ContentRetriever {
private final Map<String, EmbeddingStore<TextSegment>> knowledgeBases;
private final EmbeddingModel embeddingModel;
// 每个知识库的检索权重(sum=1)
private final Map<String, Double> sourceWeights;
@Override
public List<Content> retrieve(Query query) {
float[] queryEmbedding = embeddingModel.embed(query.text());
// 并行检索所有知识库
List<CompletableFuture<List<ScoredContent>>> futures = knowledgeBases.entrySet()
.stream()
.map(entry -> CompletableFuture.supplyAsync(() ->
searchInSource(entry.getKey(), entry.getValue(), queryEmbedding)))
.toList();
// 收集所有结果
List<ScoredContent> allResults = new ArrayList<>();
for (CompletableFuture<List<ScoredContent>> future : futures) {
try {
allResults.addAll(future.join());
} catch (Exception e) {
log.warn("知识库检索失败: {}", e.getMessage());
}
}
// 去重(相同内容只保留最高分)
Map<String, ScoredContent> deduped = new LinkedHashMap<>();
for (ScoredContent sc : allResults) {
String contentHash = hashContent(sc.content().textSegment().text());
deduped.merge(contentHash, sc, (existing, newItem) ->
newItem.score() > existing.score() ? newItem : existing);
}
// 按分数排序
return deduped.values().stream()
.sorted(Comparator.comparingDouble(ScoredContent::score).reversed())
.limit(5)
.map(ScoredContent::content)
.toList();
}
private List<ScoredContent> searchInSource(
String sourceName,
EmbeddingStore<TextSegment> store,
float[] queryEmbedding) {
double weight = sourceWeights.getOrDefault(sourceName, 1.0);
return store.search(EmbeddingSearchRequest.builder()
.queryEmbedding(Embedding.from(queryEmbedding))
.maxResults(3)
.minScore(0.65)
.build())
.matches().stream()
.map(match -> {
// 加权分数
double adjustedScore = match.score() * weight;
// 在元数据中记录来源
TextSegment enriched = TextSegment.from(
match.embedded().text(),
Metadata.from(Map.of(
"source", sourceName,
"originalScore", String.valueOf(match.score()),
"adjustedScore", String.valueOf(adjustedScore)
))
);
return new ScoredContent(Content.from(enriched), adjustedScore);
})
.toList();
}
private String hashContent(String text) {
return String.valueOf(text.hashCode());
}
record ScoredContent(Content content, double score) {}
/**
* 构建器:方便创建多源检索器
*/
public static Builder builder() {
return new Builder();
}
public static class Builder {
private final Map<String, EmbeddingStore<TextSegment>> stores = new LinkedHashMap<>();
private final Map<String, Double> weights = new LinkedHashMap<>();
private EmbeddingModel embeddingModel;
public Builder addSource(String name, EmbeddingStore<TextSegment> store) {
stores.put(name, store);
weights.put(name, 1.0);
return this;
}
public Builder addSource(String name, EmbeddingStore<TextSegment> store, double weight) {
stores.put(name, store);
weights.put(name, weight);
return this;
}
public Builder embeddingModel(EmbeddingModel model) {
this.embeddingModel = model;
return this;
}
public MultiSourceContentRetriever build() {
return new MultiSourceContentRetriever(stores, embeddingModel, weights);
}
}
}实现二:实时数据 + 向量检索混合
/**
* 混合检索:历史文档(向量)+ 实时数据(API)
* 适用场景:知识库 + 实时价格、实时库存、实时天气等
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class HybridRealTimeRetriever implements ContentRetriever {
private final EmbeddingStoreContentRetriever staticRetriever; // 静态知识库
private final RealTimeDataService realTimeService; // 实时数据服务
private final IntentClassifier intentClassifier; // 意图分类
@Override
public List<Content> retrieve(Query query) {
String queryText = query.text();
// 识别查询意图
QueryIntent intent = intentClassifier.classify(queryText);
List<Content> results = new ArrayList<>();
// 1. 静态知识库检索(总是执行)
List<Content> staticContent = staticRetriever.retrieve(query);
results.addAll(staticContent);
// 2. 根据意图查询实时数据(按需执行)
if (intent.needsRealTimeData()) {
try {
String realTimeContent = fetchRealTimeData(queryText, intent);
if (realTimeContent != null) {
// 实时数据插入到最前面(优先级最高)
results.add(0, Content.from(TextSegment.from(
realTimeContent,
Metadata.from(Map.of(
"source", "real_time",
"dataType", intent.getRealTimeDataType(),
"timestamp", LocalDateTime.now().toString()
))
)));
}
} catch (Exception e) {
log.warn("实时数据获取失败,使用静态数据: {}", e.getMessage());
}
}
return results;
}
private String fetchRealTimeData(String query, QueryIntent intent) {
return switch (intent.getRealTimeDataType()) {
case "stock_price" -> realTimeService.getStockPrice(extractSymbol(query));
case "inventory" -> realTimeService.getInventory(extractProductId(query));
case "order_status" -> realTimeService.getOrderStatus(extractOrderId(query));
default -> null;
};
}
private String extractSymbol(String query) {
// 从查询中提取股票代码(简化实现)
Pattern pattern = Pattern.compile("[A-Z]{2,5}");
Matcher matcher = pattern.matcher(query.toUpperCase());
return matcher.find() ? matcher.group() : "";
}
private String extractProductId(String query) { return ""; }
private String extractOrderId(String query) { return ""; }
@Data
public static class QueryIntent {
private String intentType;
private String realTimeDataType;
private boolean needsRealTimeData;
}
}实现三:带时效性过滤的检索器
/**
* 时效性感知的检索器
* 对于时效性强的查询(如新闻、政策),优先返回最新文档
*/
@Service
@RequiredArgsConstructor
public class TimeAwareContentRetriever implements ContentRetriever {
private final EmbeddingModel embeddingModel;
private final EmbeddingStore<TextSegment> vectorStore;
private final TemporalIntentDetector temporalDetector;
@Override
public List<Content> retrieve(Query query) {
TemporalRequirement temporal = temporalDetector.detect(query.text());
float[] embedding = embeddingModel.embed(query.text());
// 构建时间过滤条件
Filter timeFilter = buildTimeFilter(temporal);
EmbeddingSearchRequest.Builder requestBuilder = EmbeddingSearchRequest.builder()
.queryEmbedding(Embedding.from(embedding))
.maxResults(5)
.minScore(0.65);
if (timeFilter != null) {
requestBuilder.filter(timeFilter);
}
List<EmbeddingMatch<TextSegment>> matches =
vectorStore.search(requestBuilder.build()).matches();
// 如果时效性过滤后结果太少,放宽限制
if (matches.size() < 2 && timeFilter != null) {
log.debug("时效性过滤后结果不足,放宽时间限制");
matches = vectorStore.search(EmbeddingSearchRequest.builder()
.queryEmbedding(Embedding.from(embedding))
.maxResults(5)
.minScore(0.65)
.build())
.matches();
}
return matches.stream()
.map(m -> {
// 加上文档时间信息的注释
String content = addTemporalContext(m.embedded());
return Content.from(content);
})
.toList();
}
private Filter buildTimeFilter(TemporalRequirement temporal) {
if (temporal == null || !temporal.hasTimeConstraint()) return null;
String afterDate = temporal.getAfterDate().toString();
return metadataKey("updateDate").isGreaterThan(afterDate);
}
private String addTemporalContext(TextSegment segment) {
String updateDate = segment.metadata().getString("updateDate");
if (updateDate != null) {
return "【文档更新时间:" + updateDate + "】\n" + segment.text();
}
return segment.text();
}
@Data
public static class TemporalRequirement {
private LocalDate afterDate;
private boolean hasTimeConstraint;
}
}实现四:上下文感知检索
/**
* 上下文感知的检索器
* 利用对话历史来改善检索效果
*
* 问题:用户说"它还有什么功能?"——"它"指什么?
* 解决:结合对话历史,理解指代词
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ContextualContentRetriever implements ContentRetriever {
private final ChatLanguageModel llm;
private final EmbeddingStoreContentRetriever baseRetriever;
@Override
public List<Content> retrieve(Query query) {
// 获取对话历史(通过查询元数据传入)
List<String> conversationHistory = getConversationHistory(query);
if (conversationHistory.isEmpty()) {
return baseRetriever.retrieve(query);
}
// 使用LLM改写查询,解决指代歧义
String improvedQuery = improveQueryWithContext(query.text(), conversationHistory);
log.debug("查询改写: [{}] → [{}]", query.text(), improvedQuery);
// 用改写后的查询检索
return baseRetriever.retrieve(Query.from(improvedQuery, query.metadata()));
}
private String improveQueryWithContext(
String originalQuery,
List<String> history) {
String historyText = String.join("\n",
history.subList(Math.max(0, history.size() - 6), history.size()));
String prompt = String.format("""
给定以下对话历史,请将最后的查询改写为独立完整的查询(不依赖对话历史)。
如果查询已经完整,原样返回。
对话历史:
%s
当前查询:%s
改写后的独立查询(只输出查询本身):
""", historyText, originalQuery);
String improved = llm.generate(prompt).trim();
// 如果改写结果太长,可能出错了,回退到原始查询
if (improved.length() > originalQuery.length() * 3) {
return originalQuery;
}
return improved;
}
@SuppressWarnings("unchecked")
private List<String> getConversationHistory(Query query) {
Object history = query.metadata().get("conversationHistory");
if (history instanceof List<?>) {
return (List<String>) history;
}
return List.of();
}
}在@AiService中使用自定义检索器
/**
* 将自定义ContentRetriever集成到@AiService
*/
@Configuration
@RequiredArgsConstructor
public class CustomRetrieverAiConfig {
@Bean
public CustomerSupportAssistant customerSupportAssistant(
ChatLanguageModel llm,
MultiSourceContentRetriever multiSourceRetriever,
ChatMemory chatMemory) {
// 用自定义检索器构建RetrievalAugmentor
RetrievalAugmentor augmentor = DefaultRetrievalAugmentor.builder()
.contentRetriever(multiSourceRetriever)
// 可以加内容注入器,控制如何把检索内容注入到Prompt
.contentInjector(new DefaultContentInjector(
List.of("source", "updateDate"), // 在上下文中显示这些元数据
"以下是相关知识库内容:\n" // 上下文前缀
))
.build();
return AiServices.builder(CustomerSupportAssistant.class)
.chatLanguageModel(llm)
.chatMemory(chatMemory)
.retrievalAugmentor(augmentor)
.build();
}
}
/**
* 使用自定义检索器的AI服务接口
*/
@AiService
public interface CustomerSupportAssistant {
@SystemMessage("""
你是一个专业的客服助手,基于知识库内容回答客户问题。
知识库内容已在上下文中提供(标注了来源和更新时间)。
如果知识库没有相关内容,建议客户联系人工客服。
""")
String chat(@MemoryId String sessionId, @UserMessage String question);
}ContentRetriever的链式组合
/**
* 多个ContentRetriever的链式组合
* 先尝试主检索器,不够再补充
*/
@Service
@RequiredArgsConstructor
public class FallbackContentRetriever implements ContentRetriever {
private final ContentRetriever primaryRetriever;
private final ContentRetriever fallbackRetriever;
private final int minRequiredResults;
@Override
public List<Content> retrieve(Query query) {
// 先用主检索器
List<Content> primary = primaryRetriever.retrieve(query);
if (primary.size() >= minRequiredResults) {
return primary;
}
// 主检索器结果不足,补充fallback检索
log.debug("主检索器结果不足({}),启用Fallback", primary.size());
List<Content> fallback = fallbackRetriever.retrieve(query);
// 合并,去重
Set<String> seen = primary.stream()
.map(c -> c.textSegment().text())
.collect(Collectors.toSet());
List<Content> combined = new ArrayList<>(primary);
for (Content content : fallback) {
if (!seen.contains(content.textSegment().text())) {
combined.add(content);
seen.add(content.textSegment().text());
}
}
return combined;
}
}ContentRetriever是LangChain4j RAG系统的最灵活扩展点。通过自定义实现,几乎可以处理所有复杂的检索需求。
设计建议:把不同的检索策略写成独立的ContentRetriever实现,然后通过组合(FallbackContentRetriever、MultiSourceContentRetriever)来搭建复杂的检索流程——这比写一个大而全的检索器更容易维护和测试。
