第1951篇:AI系统的可观测性实践——从黑盒到白盒的全链路追踪
第1951篇:AI系统的可观测性实践——从黑盒到白盒的全链路追踪
上周有个团队找我咨询,他们上线了一套RAG问答系统,用户反馈答案质量忽高忽低,但开发团队完全不知道问题出在哪——是检索质量差?还是Prompt写得不好?还是模型本身的问题?他们看着监控大盘上那条绿色的"请求成功率99%"曲线,感觉一切正常,但用户投诉却越来越多。
这就是AI系统可观测性缺失的典型症状:系统在工程层面运转正常,但在业务语义层面已经悄悄烂掉了。
传统软件的可观测性三大支柱——Metrics、Logs、Traces——放到AI系统里都需要重新定义。因为传统系统的"正确"是确定性的,SQL要么执行成功要么失败,HTTP响应要么200要么500。但AI系统的"正确"是模糊的,一个回答在语义上是否准确,一个检索结果是否真正相关,这些根本不能用状态码来度量。
今天这篇,我来系统讲一讲如何把AI系统从黑盒变成白盒。
为什么AI系统天然是黑盒
先把问题说清楚。
一个典型的AI应用调用链是这样的:用户输入 → Prompt构建 → 向量检索 → LLM调用 → 后处理 → 用户输出。每一步都有可能出问题,而且问题之间会叠加放大。
问题在于,传统监控只能告诉你"这条链路执行了多少毫秒",却告诉不了你"这条链路每一步的质量如何"。
我之前踩过一个坑:向量检索明明召回了相关文档,但因为Prompt模板里有个排版问题,导致LLM把文档内容和系统指令混淆了,给出了一个看起来格式正确但内容完全胡说八道的答案。请求成功率100%,延迟也正常,但用户拿到的是垃圾。
这就是AI系统可观测性的核心挑战:你需要在语义层面观测,而不只是在工程层面。
可观测性体系设计
我把AI系统的可观测性分成四层:
大多数团队只做了前两层,就以为监控到位了。真正重要的是第三层,偶尔还需要拉通到第四层做业务闭环。
第一步:给每次AI调用打完整的Span
用OpenTelemetry做分布式追踪,但要针对AI场景做扩展。
@Component
public class AITraceInterceptor {
private final Tracer tracer;
public AITraceInterceptor(OpenTelemetry openTelemetry) {
this.tracer = openTelemetry.getTracer("ai-system");
}
public <T> T traceAICall(String spanName, AICallContext context,
Supplier<T> aiCall) {
Span span = tracer.spanBuilder(spanName)
.setAttribute("ai.model", context.getModel())
.setAttribute("ai.prompt_tokens_estimate", context.getEstimatedTokens())
.setAttribute("ai.session_id", context.getSessionId())
.setAttribute("ai.user_id", context.getUserId())
.startSpan();
long startTime = System.currentTimeMillis();
try (Scope scope = span.makeCurrent()) {
T result = aiCall.get();
// 成功后记录关键AI指标
if (result instanceof LLMResponse llmResponse) {
span.setAttribute("ai.prompt_tokens", llmResponse.getPromptTokens());
span.setAttribute("ai.completion_tokens", llmResponse.getCompletionTokens());
span.setAttribute("ai.finish_reason", llmResponse.getFinishReason());
span.setAttribute("ai.response_quality_score",
llmResponse.getQualityScore());
}
span.setStatus(StatusCode.OK);
return result;
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
long duration = System.currentTimeMillis() - startTime;
span.setAttribute("ai.duration_ms", duration);
span.end();
}
}
}注意这里我加了ai.前缀的自定义属性,这是OpenTelemetry社区正在推进的AI语义约定(AI Semantic Conventions)草案里的命名规范。虽然还没正式发布,但提前对齐,将来迁移成本低。
第二步:检索质量的实时评分
RAG系统最容易出问题的地方就是检索。我们要对每次检索结果做实时质量评估。
@Service
public class RetrievalQualityEvaluator {
private final EmbeddingService embeddingService;
private final MeterRegistry meterRegistry;
// 计算查询和文档之间的语义相关性分布
public RetrievalQualityReport evaluate(String query,
List<RetrievedDocument> docs) {
if (docs.isEmpty()) {
recordMetric("retrieval.empty", 1.0);
return RetrievalQualityReport.empty(query);
}
float[] queryEmbedding = embeddingService.embed(query);
List<Double> scores = docs.stream()
.map(doc -> cosineSimilarity(queryEmbedding, doc.getEmbedding()))
.collect(Collectors.toList());
double topScore = scores.stream().mapToDouble(d -> d).max().orElse(0);
double avgScore = scores.stream().mapToDouble(d -> d).average().orElse(0);
double scoreVariance = calculateVariance(scores);
// 关键:分数集中度。如果所有文档得分都差不多,说明没有真正相关的
double concentrationRatio = topScore > 0 ? topScore / avgScore : 0;
RetrievalQualityReport report = RetrievalQualityReport.builder()
.query(query)
.documentCount(docs.size())
.topScore(topScore)
.avgScore(avgScore)
.scoreVariance(scoreVariance)
.concentrationRatio(concentrationRatio)
.qualityLevel(classifyQuality(topScore, concentrationRatio))
.build();
// 打点到监控系统
recordRetrievalMetrics(report);
return report;
}
private QualityLevel classifyQuality(double topScore, double concentrationRatio) {
if (topScore > 0.85 && concentrationRatio > 1.2) {
return QualityLevel.HIGH; // 有明确相关文档
} else if (topScore > 0.70) {
return QualityLevel.MEDIUM; // 有一定相关性
} else if (topScore > 0.55) {
return QualityLevel.LOW; // 勉强相关,需要警惕
} else {
return QualityLevel.POOR; // 检索结果几乎不相关
}
}
private void recordRetrievalMetrics(RetrievalQualityReport report) {
meterRegistry.gauge("ai.retrieval.top_score",
Tags.of("quality", report.getQualityLevel().name()),
report.getTopScore());
meterRegistry.counter("ai.retrieval.quality_level",
Tags.of("level", report.getQualityLevel().name()))
.increment();
}
private double cosineSimilarity(float[] a, float[] b) {
double dotProduct = 0.0, normA = 0.0, normB = 0.0;
for (int i = 0; i < a.length; i++) {
dotProduct += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
}
}这个concentrationRatio是我自己发明的指标,专门用来检测"假召回"——就是虽然召回了K个文档,但其实没有一个真正相关的,只是凑数的。当这个比值接近1时,说明所有文档得分都差不多,没有明显的"最优解",这种情况下LLM通常会开始编造。
第三步:LLM输出的多维评估
光检索质量不够,还要评估LLM的输出质量。我们用几个维度来做实时评分:
@Service
public class LLMOutputEvaluator {
private final LLMClient llmClient;
// 用一个轻量级模型评估主模型的输出
public OutputQualityReport evaluate(LLMEvaluationContext ctx) {
// 1. 基础结构检查(不需要模型,规则就能做)
StructureCheckResult structureCheck = checkStructure(ctx.getOutput());
// 2. 幻觉检测:输出内容是否有明确来源支撑
HallucinationScore hallucinationScore =
detectHallucination(ctx.getOutput(), ctx.getRetrievedDocs());
// 3. 指令遵循度:是否按照Prompt要求的格式和约束来答
InstructionFollowingScore instructionScore =
checkInstructionFollowing(ctx.getPrompt(), ctx.getOutput());
return OutputQualityReport.builder()
.structureScore(structureCheck.getScore())
.hallucinationRisk(hallucinationScore.getRiskLevel())
.instructionFollowingScore(instructionScore.getScore())
.overallQuality(computeOverallQuality(
structureCheck, hallucinationScore, instructionScore))
.flags(collectFlags(structureCheck, hallucinationScore, instructionScore))
.build();
}
private HallucinationScore detectHallucination(String output,
List<RetrievedDocument> docs) {
// 提取输出中的关键声明(数字、专有名词、具体事实)
List<String> claims = extractFactualClaims(output);
int supportedClaims = 0;
int unsupportedClaims = 0;
List<String> unsupportedDetails = new ArrayList<>();
String combinedDocs = docs.stream()
.map(RetrievedDocument::getContent)
.collect(Collectors.joining("\n"));
for (String claim : claims) {
// 简单方案:检查关键词是否在原文中出现
// 生产环境里可以用embedding相似度做更精确的支撑检测
if (isSupportedByDocs(claim, combinedDocs)) {
supportedClaims++;
} else {
unsupportedClaims++;
unsupportedDetails.add(claim);
}
}
double riskRatio = claims.isEmpty() ? 0 :
(double) unsupportedClaims / claims.size();
return HallucinationScore.builder()
.totalClaims(claims.size())
.supportedClaims(supportedClaims)
.unsupportedClaims(unsupportedClaims)
.riskLevel(riskRatio > 0.3 ? RiskLevel.HIGH :
riskRatio > 0.1 ? RiskLevel.MEDIUM : RiskLevel.LOW)
.unsupportedDetails(unsupportedDetails)
.build();
}
private List<String> extractFactualClaims(String text) {
List<String> claims = new ArrayList<>();
// 提取数字相关的声明
Pattern numberPattern = Pattern.compile(
"[^。!?]*\\d+[^。!?]*[。!?]");
Matcher matcher = numberPattern.matcher(text);
while (matcher.find()) {
claims.add(matcher.group().trim());
}
// 提取"是"、"为"开头的定义性句子
Pattern definitionPattern = Pattern.compile(
"[^。!?]*(?:是|为|指)[^。!?]*[。!?]");
matcher = definitionPattern.matcher(text);
while (matcher.find()) {
String claim = matcher.group().trim();
if (claim.length() > 10) { // 过滤太短的
claims.add(claim);
}
}
return claims;
}
}这里幻觉检测用的是轻量级的关键词匹配方案,不是再调一次大模型来评估——那样成本太高,而且会引入新的延迟。在高吞吐场景下,这种规则+统计的方案性价比更好。
第四步:全链路Trace的结构化存储
把所有Span和评估结果存下来,才能做事后分析。
@Entity
@Table(name = "ai_trace_records")
public class AITraceRecord {
@Id
private String traceId;
@Column(name = "session_id")
private String sessionId;
@Column(name = "user_id")
private String userId;
@Column(name = "query_text", columnDefinition = "TEXT")
private String queryText;
// 各阶段耗时(毫秒)
@Column(name = "intent_parse_ms")
private Long intentParseMs;
@Column(name = "retrieval_ms")
private Long retrievalMs;
@Column(name = "prompt_build_ms")
private Long promptBuildMs;
@Column(name = "llm_call_ms")
private Long llmCallMs;
@Column(name = "postprocess_ms")
private Long postprocessMs;
// 质量指标
@Column(name = "retrieval_top_score")
private Double retrievalTopScore;
@Column(name = "retrieval_quality_level")
@Enumerated(EnumType.STRING)
private QualityLevel retrievalQualityLevel;
@Column(name = "hallucination_risk")
@Enumerated(EnumType.STRING)
private RiskLevel hallucinationRisk;
@Column(name = "instruction_following_score")
private Double instructionFollowingScore;
// Token消耗
@Column(name = "prompt_tokens")
private Integer promptTokens;
@Column(name = "completion_tokens")
private Integer completionTokens;
// 用于关联用户反馈
@Column(name = "user_feedback_score")
private Integer userFeedbackScore; // 1-5分,用户事后评分
@Column(name = "created_at")
private LocalDateTime createdAt;
// 标记是否需要人工审核
@Column(name = "needs_review")
private Boolean needsReview;
@Column(name = "review_flags", columnDefinition = "JSON")
private String reviewFlags; // 触发审核的具体原因
}这张表是整个可观测性体系的核心。有了它,你就可以做各种各样的分析:检索质量差的那些case里,用户最后给了几分?幻觉风险高的回答,和用户满意度是什么相关关系?
实时监控Dashboard设计
数据收集完了,要可视化。我用的是Grafana,定义几个关键面板:
@RestController
@RequestMapping("/api/observability")
public class ObservabilityDashboardController {
private final AITraceRepository traceRepository;
// 给Grafana提供数据的API
@GetMapping("/quality-distribution")
public QualityDistributionDTO getQualityDistribution(
@RequestParam @DateTimeFormat(iso = ISO.DATE_TIME) LocalDateTime from,
@RequestParam @DateTimeFormat(iso = ISO.DATE_TIME) LocalDateTime to) {
List<AITraceRecord> records = traceRepository
.findByCreatedAtBetween(from, to);
// 按检索质量分组统计
Map<QualityLevel, Long> retrievalDist = records.stream()
.collect(Collectors.groupingBy(
AITraceRecord::getRetrievalQualityLevel,
Collectors.counting()));
// 幻觉风险分布
Map<RiskLevel, Long> hallucinationDist = records.stream()
.collect(Collectors.groupingBy(
AITraceRecord::getHallucinationRisk,
Collectors.counting()));
// 质量与用户满意度的相关性
Map<QualityLevel, Double> qualityToSatisfaction = records.stream()
.filter(r -> r.getUserFeedbackScore() != null)
.collect(Collectors.groupingBy(
AITraceRecord::getRetrievalQualityLevel,
Collectors.averagingInt(AITraceRecord::getUserFeedbackScore)));
return QualityDistributionDTO.builder()
.retrievalDistribution(retrievalDist)
.hallucinationDistribution(hallucinationDist)
.qualityToSatisfactionCorrelation(qualityToSatisfaction)
.totalRequests(records.size())
.build();
}
// 找出最差的那些case,供人工审核
@GetMapping("/worst-cases")
public List<AITraceRecordDTO> getWorstCases(
@RequestParam(defaultValue = "20") int limit) {
return traceRepository.findWorstCases(limit).stream()
.map(this::toDTO)
.collect(Collectors.toList());
}
}// Repository层的复杂查询
public interface AITraceRepository extends JpaRepository<AITraceRecord, String> {
List<AITraceRecord> findByCreatedAtBetween(LocalDateTime from, LocalDateTime to);
@Query("""
SELECT r FROM AITraceRecord r
WHERE r.hallucinationRisk = 'HIGH'
OR r.retrievalQualityLevel = 'POOR'
OR (r.userFeedbackScore IS NOT NULL AND r.userFeedbackScore <= 2)
ORDER BY r.createdAt DESC
LIMIT :limit
""")
List<AITraceRecord> findWorstCases(@Param("limit") int limit);
@Query("""
SELECT AVG(r.llmCallMs) as avgLlmMs,
AVG(r.retrievalMs) as avgRetrievalMs,
COUNT(r) as total,
SUM(CASE WHEN r.hallucinationRisk = 'HIGH' THEN 1 ELSE 0 END) as highRiskCount
FROM AITraceRecord r
WHERE r.createdAt >= :since
""")
SystemHealthSnapshot getHealthSnapshot(@Param("since") LocalDateTime since);
}我在真实项目里遇到的坑
坑1:观测开销反客为主
第一版实现里,我对每个LLM响应都做了embedding计算来评估幻觉,结果评估本身比LLM调用还慢。后来改成异步评估——主链路只做轻量级规则检查,复杂的语义评估扔到消息队列里异步处理,不影响用户响应时间。
@Service
public class AsyncQualityEvaluationService {
private final BlockingQueue<EvaluationTask> taskQueue =
new LinkedBlockingQueue<>(10000);
@PostConstruct
public void startWorkers() {
// 3个worker线程处理异步评估
for (int i = 0; i < 3; i++) {
Thread worker = new Thread(this::processEvaluations);
worker.setDaemon(true);
worker.setName("ai-eval-worker-" + i);
worker.start();
}
}
public void submitForEvaluation(EvaluationTask task) {
// 队列满了就丢弃,宁可数据不完整也不影响主链路
boolean offered = taskQueue.offer(task);
if (!offered) {
log.warn("Evaluation queue full, dropping task for traceId: {}",
task.getTraceId());
meterRegistry.counter("ai.eval.dropped").increment();
}
}
private void processEvaluations() {
while (!Thread.currentThread().isInterrupted()) {
try {
EvaluationTask task = taskQueue.poll(1, TimeUnit.SECONDS);
if (task != null) {
performFullEvaluation(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Evaluation failed", e);
}
}
}
}坑2:相同问题相似度分不清
用户连续问了两个相似的问题,系统给出了截然不同的回答质量,但两次的检索top score都差不多。后来发现问题在于:第一次查询词恰好和文档标题高度匹配,第二次是语义匹配但关键词不同。于是我加了一个"关键词命中率"补充维度,和embedding相似度取加权平均。
坑3:幻觉误报太多导致告警疲劳
早期幻觉检测太敏感,什么都报,搞得值班人员每天收到几百条告警,完全不看了。后来加了白名单机制,对于某些确定没有知识库支撑但属于通用常识的回答(比如"请问有什么我可以帮助您的?"这类),跳过幻觉检测。
总结
AI系统的可观测性不是一个"做完就行"的事,而是一个随着业务深化不断演进的过程。
我见过太多团队把精力全放在模型选型和Prompt调优上,却忽视了观测体系建设。结果就是:优化了什么,坏了什么,完全靠运气。
从工程角度来说,这篇讲的四层体系是基础:基础设施层保证你系统跑着,工程质量层保证你接口正常,AI质量层告诉你模型答得好不好,业务价值层告诉你用户满不满意。
把这四层都做起来,你才算真正把AI系统从黑盒变成了白盒。
