第2103篇:RAG系统的可观测性——追踪每一次检索的完整链路
大约 8 分钟
第2103篇:RAG系统的可观测性——追踪每一次检索的完整链路
适读人群:维护RAG系统的工程师,遇到"不知道为什么效果变差了"问题的团队 | 阅读时长:约20分钟 | 核心价值:建立RAG检索链路的完整追踪体系,能够快速定位检索质量问题的根因
RAG系统有一个独特的调试困境:用户投诉"答案不对",但你不知道是检索没召回对的内容,还是召回了但LLM没用,还是用了但理解错了。
监控LLM系统的基础设施(Prometheus/Grafana)可以告诉你"P99延迟高了",但不能告诉你"这次检索为什么没有找到那份文档"。这需要更细粒度的追踪。
这篇文章构建一套完整的RAG链路追踪方案:从用户提问到最终答案,每一步的数据都被记录,问题可以快速定位。
RAG链路的可观测性层次
/**
* RAG可观测性需要捕获的数据
*
* Level 1:指标(Metrics)
* - 检索平均耗时、召回率(有文档返回vs无文档返回的比例)
* - LLM调用次数、token消耗
* - 最终答案的用户满意度(点赞/点踩)
*
* Level 2:追踪(Tracing)
* - 每次查询的完整链路:原始问题→改写→检索→排序→生成
* - 每一步的输入/输出
* - 检索命中了哪些文档、分数是多少
*
* Level 3:调试日志(Debug Logs)
* - 向量化后的查询向量
* - 检索使用的过滤条件
* - 每个文档片段的详细信息
*
* 生产环境:Level 1 + Level 2(Level 3只在调试时开启)
*/RAG追踪数据模型
/**
* 一次RAG查询的完整追踪数据
*/
@Data
@Builder
public class RagTrace {
// 基础标识
private String traceId; // 唯一追踪ID(和分布式trace关联)
private String sessionId; // 会话ID
private String userId;
private long startTimeMs;
private long endTimeMs;
// 输入
private String originalQuery; // 用户原始问题
private String rewrittenQuery; // 改写后的查询(如果有)
// 检索过程
private List<RetrievalAttempt> retrievalAttempts; // 可能有多路检索
private List<String> selectedDocumentIds; // 最终入选的文档
// 生成过程
private int contextTokenCount; // 检索内容的token数
private int totalInputTokens; // 实际送入LLM的总token
private int outputTokens; // LLM输出的token
// 结果
private String answer;
private float answerConfidence; // 0-1,LLM自评的置信度
// 质量信号
private Boolean userSatisfied; // 用户反馈(可能为null,表示未反馈)
private String userFeedback; // 用户文字反馈
private boolean emptyRetrieval; // 是否发生了空检索(没找到相关文档)
private boolean hallucination; // 是否疑似幻觉(答案中有检索内容里没有的声明)
@Data
@Builder
public static class RetrievalAttempt {
private String strategyName; // 检索策略(vector/bm25/hybrid等)
private int topK;
private long durationMs;
private List<RetrievedDocument> documents;
private String error; // 如果检索失败
}
@Data
@Builder
public static class RetrievedDocument {
private String documentId;
private String chunkId;
private float relevanceScore;
private String contentPreview; // 内容前200字
private String sourceFile;
private boolean usedInAnswer; // 是否实际被LLM使用
private Map<String, String> metadata;
}
public long totalDurationMs() { return endTimeMs - startTimeMs; }
public double avgRetrievalScore() {
return retrievalAttempts.stream()
.flatMap(a -> a.getDocuments().stream())
.mapToDouble(RetrievedDocument::getRelevanceScore)
.average()
.orElse(0.0);
}
}RAG追踪拦截器
/**
* RAG执行追踪拦截器
*
* 用AOP包装RAG的各个阶段,自动收集追踪数据
*/
@Aspect
@Component
@RequiredArgsConstructor
@Slf4j
public class RagTracingAspect {
private final RagTraceRepository traceRepository;
private final ThreadLocal<RagTrace.RagTraceBuilder> currentTrace = new ThreadLocal<>();
/**
* 拦截RAG查询入口
*/
@Around("@annotation(RagQuery)")
public Object traceRagQuery(ProceedingJoinPoint pjp) throws Throwable {
String traceId = generateTraceId();
MDC.put("ragTraceId", traceId);
RagTrace.RagTraceBuilder traceBuilder = RagTrace.builder()
.traceId(traceId)
.startTimeMs(System.currentTimeMillis());
currentTrace.set(traceBuilder);
try {
Object result = pjp.proceed();
traceBuilder.endTimeMs(System.currentTimeMillis());
// 异步保存追踪数据(不阻塞主流程)
RagTrace trace = traceBuilder.build();
CompletableFuture.runAsync(() -> traceRepository.save(trace));
return result;
} catch (Exception e) {
traceBuilder.endTimeMs(System.currentTimeMillis());
traceBuilder.answer("ERROR: " + e.getMessage());
RagTrace trace = traceBuilder.build();
CompletableFuture.runAsync(() -> traceRepository.save(trace));
throw e;
} finally {
currentTrace.remove();
MDC.remove("ragTraceId");
}
}
/**
* 拦截检索过程
*/
@Around("execution(* *..*SearchService.search(..))")
public Object traceRetrieval(ProceedingJoinPoint pjp) throws Throwable {
RagTrace.RagTraceBuilder traceBuilder = currentTrace.get();
if (traceBuilder == null) return pjp.proceed();
long start = System.currentTimeMillis();
String strategyName = pjp.getSignature().getDeclaringType().getSimpleName();
try {
Object result = pjp.proceed();
long elapsed = System.currentTimeMillis() - start;
// 把检索结果加入追踪
if (result instanceof List<?> documents) {
List<RagTrace.RetrievedDocument> tracedDocs = convertToTracedDocs(documents);
RagTrace.RetrievalAttempt attempt = RagTrace.RetrievalAttempt.builder()
.strategyName(strategyName)
.durationMs(elapsed)
.documents(tracedDocs)
.build();
// 添加到当前追踪
if (traceBuilder.build().getRetrievalAttempts() == null) {
traceBuilder.retrievalAttempts(new ArrayList<>());
}
traceBuilder.build().getRetrievalAttempts().add(attempt);
}
return result;
} catch (Exception e) {
long elapsed = System.currentTimeMillis() - start;
RagTrace.RetrievalAttempt attempt = RagTrace.RetrievalAttempt.builder()
.strategyName(strategyName)
.durationMs(elapsed)
.documents(List.of())
.error(e.getMessage())
.build();
if (traceBuilder.build().getRetrievalAttempts() == null) {
traceBuilder.retrievalAttempts(new ArrayList<>());
}
traceBuilder.build().getRetrievalAttempts().add(attempt);
throw e;
}
}
public RagTrace.RagTraceBuilder getCurrentTrace() {
return currentTrace.get();
}
private List<RagTrace.RetrievedDocument> convertToTracedDocs(List<?> docs) {
return docs.stream()
.filter(d -> d instanceof EmbeddingMatch)
.map(d -> {
EmbeddingMatch<TextSegment> match = (EmbeddingMatch<TextSegment>) d;
TextSegment segment = match.embedded();
String content = segment.text();
return RagTrace.RetrievedDocument.builder()
.documentId(segment.metadata().getString("documentId"))
.chunkId(segment.metadata().getString("chunkId"))
.relevanceScore((float) match.score())
.contentPreview(content.length() > 200 ? content.substring(0, 200) : content)
.sourceFile(segment.metadata().getString("sourceFile"))
.usedInAnswer(false) // 默认false,LLM使用后更新
.build();
})
.toList();
}
private String generateTraceId() {
return "rag-" + System.currentTimeMillis() + "-" +
(int)(Math.random() * 10000);
}
}检索质量实时诊断
/**
* 检索质量诊断器
*
* 分析检索结果,识别常见问题
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class RetrievalQualityDiagnostic {
/**
* 诊断一次检索的质量问题
*/
public DiagnosticReport diagnose(RagTrace trace) {
List<String> issues = new ArrayList<>();
List<String> suggestions = new ArrayList<>();
QualityLevel qualityLevel = QualityLevel.GOOD;
// 问题1:空检索
if (trace.isEmptyRetrieval() || allAttemptsEmpty(trace)) {
issues.add("检索结果为空:知识库中未找到相关内容");
suggestions.add("检查查询是否包含过多专业缩写,尝试扩展同义词");
suggestions.add("验证该问题是否在知识库覆盖范围内");
qualityLevel = QualityLevel.CRITICAL;
}
// 问题2:检索分数过低
double avgScore = trace.avgRetrievalScore();
if (avgScore < 0.6 && !trace.isEmptyRetrieval()) {
issues.add(String.format("检索相关度偏低(均值%.2f),可能召回了无关内容", avgScore));
suggestions.add("考虑提高相似度阈值(minScore),过滤低质量结果");
suggestions.add("检查查询改写是否充分,是否需要Query扩展");
qualityLevel = QualityLevel.WARNING;
}
// 问题3:检索结果未被使用(LLM忽略了检索内容)
long totalDocs = countTotalDocuments(trace);
long usedDocs = countUsedDocuments(trace);
if (totalDocs > 0 && usedDocs == 0) {
issues.add("检索到了文档但LLM未引用(可能发生了幻觉)");
suggestions.add("检查System Prompt中是否明确要求使用提供的上下文");
suggestions.add("考虑在Prompt中加强约束:'只能基于以下内容回答'");
qualityLevel = QualityLevel.WARNING;
}
// 问题4:检索延迟过高
long maxRetrievalMs = trace.getRetrievalAttempts().stream()
.mapToLong(RagTrace.RetrievalAttempt::getDurationMs)
.max().orElse(0);
if (maxRetrievalMs > 2000) {
issues.add(String.format("检索延迟过高(%dms),影响用户体验", maxRetrievalMs));
suggestions.add("检查向量数据库索引是否正常");
suggestions.add("考虑减少topK或提高相似度阈值");
qualityLevel = qualityLevel.worsenTo(QualityLevel.WARNING);
}
// 问题5:context token过多(可能稀释重要信息)
if (trace.getContextTokenCount() > 8000) {
issues.add(String.format("检索上下文过长(%d tokens),可能降低LLM关注度",
trace.getContextTokenCount()));
suggestions.add("减少topK,提高相似度阈值,只保留最相关的文档");
qualityLevel = qualityLevel.worsenTo(QualityLevel.WARNING);
}
return new DiagnosticReport(
trace.getTraceId(), qualityLevel, issues, suggestions,
buildQualityMetrics(trace)
);
}
private boolean allAttemptsEmpty(RagTrace trace) {
return trace.getRetrievalAttempts().stream()
.allMatch(a -> a.getDocuments().isEmpty());
}
private long countTotalDocuments(RagTrace trace) {
return trace.getRetrievalAttempts().stream()
.mapToLong(a -> a.getDocuments().size())
.sum();
}
private long countUsedDocuments(RagTrace trace) {
return trace.getRetrievalAttempts().stream()
.flatMap(a -> a.getDocuments().stream())
.filter(RagTrace.RetrievedDocument::isUsedInAnswer)
.count();
}
private Map<String, Object> buildQualityMetrics(RagTrace trace) {
Map<String, Object> metrics = new LinkedHashMap<>();
metrics.put("avgRelevanceScore", String.format("%.3f", trace.avgRetrievalScore()));
metrics.put("totalDocumentsRetrieved", countTotalDocuments(trace));
metrics.put("documentsUsedInAnswer", countUsedDocuments(trace));
metrics.put("contextTokens", trace.getContextTokenCount());
metrics.put("totalDurationMs", trace.totalDurationMs());
return metrics;
}
public enum QualityLevel {
GOOD(0), WARNING(1), CRITICAL(2);
private final int severity;
QualityLevel(int severity) { this.severity = severity; }
public QualityLevel worsenTo(QualityLevel other) {
return other.severity > this.severity ? other : this;
}
}
public record DiagnosticReport(
String traceId, QualityLevel qualityLevel,
List<String> issues, List<String> suggestions,
Map<String, Object> metrics
) {}
}追踪数据的存储和查询
/**
* RAG追踪数据仓库
*
* 存储追踪数据,支持多维度分析
*/
@Repository
@RequiredArgsConstructor
@Slf4j
public class RagTraceRepository {
private final JdbcTemplate jdbc;
private final ObjectMapper objectMapper;
/**
* 保存追踪记录
*/
public void save(RagTrace trace) {
try {
String sql = """
INSERT INTO rag_traces
(trace_id, session_id, user_id, start_time, end_time,
original_query, rewritten_query, answer, context_token_count,
avg_relevance_score, empty_retrieval, user_satisfied,
retrieval_details)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (trace_id) DO NOTHING
""";
jdbc.update(sql,
trace.getTraceId(),
trace.getSessionId(),
trace.getUserId(),
new java.sql.Timestamp(trace.getStartTimeMs()),
new java.sql.Timestamp(trace.getEndTimeMs()),
trace.getOriginalQuery(),
trace.getRewrittenQuery(),
trace.getAnswer() != null ?
trace.getAnswer().substring(0, Math.min(2000, trace.getAnswer().length())) : null,
trace.getContextTokenCount(),
trace.avgRetrievalScore(),
trace.isEmptyRetrieval(),
trace.getUserSatisfied(),
objectMapper.writeValueAsString(trace.getRetrievalAttempts())
);
} catch (Exception e) {
log.error("追踪记录保存失败: traceId={}", trace.getTraceId(), e);
}
}
/**
* 查询空检索比例(过去N小时)
*/
public double getEmptyRetrievalRate(int hours) {
String sql = """
SELECT
COUNT(*) FILTER (WHERE empty_retrieval = true)::float /
NULLIF(COUNT(*), 0) AS empty_rate
FROM rag_traces
WHERE start_time > NOW() - INTERVAL '%d hours'
""".formatted(hours);
Double rate = jdbc.queryForObject(sql, Double.class);
return rate != null ? rate : 0.0;
}
/**
* 查询检索相关度分布
*/
public Map<String, Long> getRelevanceDistribution(int hours) {
String sql = """
SELECT
CASE
WHEN avg_relevance_score >= 0.8 THEN 'HIGH'
WHEN avg_relevance_score >= 0.6 THEN 'MEDIUM'
ELSE 'LOW'
END AS band,
COUNT(*) AS cnt
FROM rag_traces
WHERE start_time > NOW() - INTERVAL '%d hours'
AND avg_relevance_score > 0
GROUP BY 1
""".formatted(hours);
Map<String, Long> distribution = new LinkedHashMap<>();
jdbc.query(sql, rs -> {
distribution.put(rs.getString("band"), rs.getLong("cnt"));
});
return distribution;
}
/**
* 查找有问题的查询(空检索或用户不满意)
*/
public List<RagTrace> findProblematicTraces(int hours, int limit) {
String sql = """
SELECT trace_id, original_query, avg_relevance_score,
empty_retrieval, user_satisfied, start_time
FROM rag_traces
WHERE start_time > NOW() - INTERVAL '%d hours'
AND (empty_retrieval = true OR user_satisfied = false)
ORDER BY start_time DESC
LIMIT %d
""".formatted(hours, limit);
return jdbc.query(sql, (rs, rowNum) -> RagTrace.builder()
.traceId(rs.getString("trace_id"))
.originalQuery(rs.getString("original_query"))
.emptyRetrieval(rs.getBoolean("empty_retrieval"))
.userSatisfied(rs.getObject("user_satisfied", Boolean.class))
.startTimeMs(rs.getTimestamp("start_time").getTime())
.build());
}
}RAG调试仪表板接口
/**
* RAG调试仪表板API
*
* 提供给工程师用于分析RAG系统健康状况
*/
@RestController
@RequestMapping("/api/rag/debug")
@RequiredArgsConstructor
@Slf4j
public class RagDebugController {
private final RagTraceRepository traceRepo;
private final RetrievalQualityDiagnostic diagnostic;
/**
* 获取最近的问题查询
* GET /api/rag/debug/problematic?hours=24&limit=20
*/
@GetMapping("/problematic")
public List<Map<String, Object>> getProblematicQueries(
@RequestParam(defaultValue = "24") int hours,
@RequestParam(defaultValue = "20") int limit) {
return traceRepo.findProblematicTraces(hours, limit).stream()
.map(trace -> {
Map<String, Object> item = new LinkedHashMap<>();
item.put("traceId", trace.getTraceId());
item.put("query", trace.getOriginalQuery());
item.put("emptyRetrieval", trace.isEmptyRetrieval());
item.put("userSatisfied", trace.getUserSatisfied());
return item;
})
.toList();
}
/**
* 获取系统健康概览
* GET /api/rag/debug/health
*/
@GetMapping("/health")
public Map<String, Object> getHealthOverview() {
Map<String, Object> health = new LinkedHashMap<>();
health.put("emptyRetrievalRate_1h", traceRepo.getEmptyRetrievalRate(1));
health.put("emptyRetrievalRate_24h", traceRepo.getEmptyRetrievalRate(24));
health.put("relevanceDistribution_24h", traceRepo.getRelevanceDistribution(24));
// 计算整体健康评分
double emptyRate = (double) health.get("emptyRetrievalRate_24h");
String healthStatus;
if (emptyRate > 0.2) healthStatus = "CRITICAL";
else if (emptyRate > 0.1) healthStatus = "WARNING";
else healthStatus = "HEALTHY";
health.put("status", healthStatus);
return health;
}
}实践建议
追踪粒度和存储成本的权衡
完整的追踪数据(包括检索向量、所有文档内容)很大,如果每个查询都全量保存,存储成本会失控。建议:只保存完整追踪数据的10-20%(随机采样),但对问题查询(空检索、用户差评)100%保存。这样既能分析整体趋势,也能调试具体问题。
"召回了什么"比"答了什么"更重要
很多团队只关注最终答案的质量,却忽视了检索阶段的分析。而我实际观察到的是:80%的RAG效果问题出在检索阶段,而不是生成阶段。如果检索到了错误的文档,LLM生成的答案再好也是错的。定期分析检索结果的分布,是发现问题的最有效手段。
建立问题查询积累机制
每当用户反馈答案不对,把这条查询加入"问题查询库"。这个库有两个用途:一是手动排查根因,二是作为RAG评估的测试集(离线评估时,这些查询都应该被正确回答)。积累到一定数量后,评估集质量会比随机采样高很多,因为都是真实遇到问题的案例。
