AI 应用的可观测性体系——比普通应用多追踪的那些维度
AI 应用的可观测性体系——比普通应用多追踪的那些维度
去年我们的 RAG 客服系统上线三个月后,运营同学突然找过来,说用户投诉回答质量越来越差。
我当时第一反应是去看监控,CPU 正常、内存正常、接口响应时间 P99 在 180ms 以内。按普通 Web 应用的标准,这个系统完全健康。但用户投诉是真实的。
花了两天时间才搞清楚:问题出在 Embedding 模型版本悄悄升级了,导致新写入的向量和历史向量在语义空间里产生了偏移。召回的文档看起来相关,但实际上答非所问。整个过程里,我们没有任何一个指标在报警。
那次之后,我重新审视了整个可观测性体系,得出一个结论:用监控普通 Web 应用的思路去监控 AI 应用,本质上是在用错误的尺子量东西。
这篇文章不讲某个具体工具怎么配置,而是从头建立「AI 应用需要观测什么」这个认知框架。把框架搭清楚了,工具选哪个是次要问题。
一、为什么普通监控不够用
先说清楚普通应用监控解决了什么问题。
传统 Web 应用的核心可观测性指标是所谓的 RED 方法:Rate(请求速率)、Errors(错误率)、Duration(延迟)。这套框架在绝大多数场景下都够用,因为普通接口要么成功要么失败,响应时间直接代表服务质量。
但 AI 应用打破了这个假设:
第一,「成功」不等于「正确」。AI 接口返回 200,不代表答案是对的。一个语言模型可以流畅地输出一段充满错误事实的文字,HTTP 层面完全没有异常。
第二,响应时间的含义变了。普通接口响应时间是一个点,AI 流式接口的响应时间是一条线。用户等待第一个字和等待整段响应完成,体验差异巨大,但普通的延迟指标把这两个时间点混在一起了。
第三,消耗的资源维度变了。普通应用监控 CPU、内存、网络带宽。AI 应用还要监控 Token 消耗,因为 Token 直接对应成本,而且 Token 的分布模式会暴露 Prompt 设计问题。
第四,「质量」成为一个需要量化的维度。这是最难的部分,也是普通可观测性框架完全没有覆盖的领域。
理解了这四点差异,我们来看 AI 应用的可观测性分层架构。
二、AI 应用可观测性分层架构
这四层是层层依赖的关系。底层挂了上层必然出问题,但底层健康不代表上层健康。绝大多数团队在 C 层和 D 层是空白的——这就是盲区所在。
三、AI 特有指标详解
3.1 TTFT:首 Token 时间
TTFT(Time To First Token)是流式场景里最重要的用户体验指标。
普通接口用户等待的是「整个响应」,但 AI 流式接口用户等待的是「第一个字」。TTFT 代表的是模型开始「思考并输出」的时延,它主要由以下几个部分构成:
- 请求到达负载均衡 → 服务实例的网络时延
- 服务实例处理 Prompt(向量检索、Prompt 组装)的时间
- 模型服务收到请求到开始吐出第一个 Token 的时间(这部分由模型方控制)
在实践中,TTFT > 2 秒用户会明显感到等待;TTFT > 5 秒大概率会有用户关闭对话框。
另一个配套指标是 TPOT(Time Per Output Token),即生成每个 Token 的平均时间,反映的是流式输出的流畅度。TPOT 不稳定会导致输出「卡顿感」。
3.2 Token 利用率
Token 利用率 = 有效 Token 数 / 总 Token 数
什么是「有效 Token」?这里有个工程上的定义:真正对生成最终答案有贡献的 Token,通常包括用户查询、检索到的相关上下文、系统指令核心部分。
高 Token 消耗但低利用率,通常意味着:
- Prompt 模板里有大量冗余说明
- RAG 检索召回了大量无关文档塞进了 Context
- System Prompt 随着迭代越来越臃肿
我曾经在一个项目里发现,Prompt 的 40% 都是历史遗留的说明性文字,而且这些说明对输出质量几乎没有影响。删掉之后,Token 成本直接降了 35%。
3.3 语义相关性得分
在 RAG 系统里,需要度量「检索到的文档和用户问题的语义匹配程度」。这个指标可以通过以下方式获得:
- 余弦相似度:检索返回的文档向量和 Query 向量的相似度,向量数据库本身会返回这个值
- Cross-Encoder 重排分数:使用 Cross-Encoder 模型对检索结果重排时产生的分数
这个指标如果持续下降,往往意味着知识库的向量质量在退化,或者用户的查询模式发生了漂移(新的问题类型知识库里没有对应内容)。
3.4 幻觉率估算
这是最难量化的指标,但不代表不能做。
离线评估:准备一批带有标准答案的测试问题,定期用模型回答并与标准答案比对,用 ROUGE、BERTScore 等指标评估。这种方式精确但有延迟,适合版本发布前的质量门控。
在线轻量估算:用一个轻量的「验证模型」实时判断生成内容是否和给定的上下文一致。比如:把原始 Context 和生成的 Answer 喂给一个小型模型,让它判断 Answer 里的关键事实是否能从 Context 里找到支撑。这种方式有额外成本,但可以做到实时。
信号代理:无法直接量化时,可以监控一些代理信号,比如用户是否在同一个会话里对同一问题追问、是否点击了「这个回答没有帮助」按钮。
四、普通指标在 AI 场景的变体
4.1 错误率的扩展
普通应用的错误率 = HTTP 5xx / 总请求数。
AI 应用的错误需要分几类:
- 硬错误:HTTP 5xx、超时、网络中断
- 软错误:模型返回了内容过滤提示(触发了安全策略)、Token 超限导致响应截断
- 质量错误:回答明显不相关(需要业务逻辑判定)
这三类错误的处理策略完全不同,混在一起看错误率会掩盖问题。
4.2 延迟的多维度拆解
AI 请求的延迟不是一个数,而是几个阶段的叠加:
总延迟 = 前处理时间 + TTFT + (TPOT × 输出Token数) + 后处理时间其中前处理包括:身份验证、RAG 检索、Prompt 组装。后处理包括:内容过滤、结果结构化、日志写入。
把这几个阶段分开追踪,才能在延迟抖动时快速定位是哪个环节出了问题。
4.3 吞吐量的 Token 维度
对 AI 应用来说,「每秒请求数」不是一个好的吞吐量指标,因为处理一个 100 Token 的请求和一个 4000 Token 的请求消耗的资源差异巨大。
更有意义的指标是:每分钟处理的 Token 总量(TPM),分别统计 Input TPM 和 Output TPM。这个指标直接和成本挂钩,也能反映系统的实际负载。
五、基于 Micrometer 实现 AI 指标埋点
说完理论,来看具体实现。我们用 Micrometer(Spring Boot Actuator 默认集成)来实现自定义 AI 指标的埋点。
5.1 依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>5.2 AI 指标注册器
@Component
public class AiMetricsRegistry {
private final MeterRegistry meterRegistry;
// 计数器:总请求数,按模型和状态分维度
private final Counter aiRequestTotal;
// 直方图:TTFT 分布
private final DistributionSummary ttftSummary;
// 直方图:总响应延迟分布
private final DistributionSummary totalLatencySummary;
// 计数器:Token 消耗(输入)
private final Counter inputTokenCounter;
// 计数器:Token 消耗(输出)
private final Counter outputTokenCounter;
// 摘要:语义相关性得分分布
private final DistributionSummary semanticScoreSummary;
public AiMetricsRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.aiRequestTotal = Counter.builder("ai.request.total")
.description("AI 请求总数")
.tag("model", "unknown")
.tag("status", "unknown")
.register(meterRegistry);
this.ttftSummary = DistributionSummary.builder("ai.ttft.milliseconds")
.description("首 Token 时间(毫秒)")
.publishPercentiles(0.5, 0.95, 0.99)
.publishPercentileHistogram()
.register(meterRegistry);
this.totalLatencySummary = DistributionSummary.builder("ai.latency.milliseconds")
.description("AI 请求总延迟(毫秒)")
.publishPercentiles(0.5, 0.95, 0.99)
.publishPercentileHistogram()
.register(meterRegistry);
this.inputTokenCounter = Counter.builder("ai.token.input.total")
.description("输入 Token 总消耗")
.register(meterRegistry);
this.outputTokenCounter = Counter.builder("ai.token.output.total")
.description("输出 Token 总消耗")
.register(meterRegistry);
this.semanticScoreSummary = DistributionSummary.builder("ai.semantic.score")
.description("RAG 检索语义相关性得分")
.publishPercentiles(0.1, 0.5, 0.9)
.register(meterRegistry);
}
/**
* 记录一次 AI 请求完成
*/
public void recordRequest(String model, String status, long ttftMs,
long totalLatencyMs, int inputTokens, int outputTokens) {
// 带维度的计数器需要重新绑定 tag
meterRegistry.counter("ai.request.total",
"model", model,
"status", status).increment();
ttftSummary.record(ttftMs);
totalLatencySummary.record(totalLatencyMs);
meterRegistry.counter("ai.token.input.total",
"model", model).increment(inputTokens);
meterRegistry.counter("ai.token.output.total",
"model", model).increment(outputTokens);
}
/**
* 记录 RAG 检索的语义相关性得分
*/
public void recordSemanticScore(String collectionName, double score) {
meterRegistry.summary("ai.semantic.score",
"collection", collectionName).record(score);
}
/**
* 记录一次幻觉检测结果
*/
public void recordHallucinationCheck(String model, boolean isHallucination) {
meterRegistry.counter("ai.hallucination.check.total",
"model", model,
"result", isHallucination ? "hallucination" : "grounded")
.increment();
}
}5.3 AI 调用拦截器(自动埋点)
为了不在每个业务方法里手动调用埋点,我们用 AOP 拦截 AI 调用:
@Aspect
@Component
@Slf4j
public class AiMetricsAspect {
@Autowired
private AiMetricsRegistry metricsRegistry;
/**
* 拦截所有标注了 @AiTracked 注解的方法
*/
@Around("@annotation(aiTracked)")
public Object trackAiCall(ProceedingJoinPoint pjp, AiTracked aiTracked) throws Throwable {
long startTime = System.currentTimeMillis();
String model = aiTracked.model();
AtomicLong ttft = new AtomicLong(-1);
try {
Object result = pjp.proceed();
long totalLatency = System.currentTimeMillis() - startTime;
// 如果返回的是流式响应,需要特殊处理
if (result instanceof AiStreamResult streamResult) {
// 包装流式结果,在第一个 Token 到来时记录 TTFT
return wrapStreamWithMetrics(streamResult, startTime, model);
}
// 非流式响应直接记录
if (result instanceof AiResponse aiResponse) {
metricsRegistry.recordRequest(
model, "success",
totalLatency, // 非流式场景 TTFT ≈ 总延迟
totalLatency,
aiResponse.getInputTokens(),
aiResponse.getOutputTokens()
);
}
return result;
} catch (Exception e) {
long totalLatency = System.currentTimeMillis() - startTime;
metricsRegistry.recordRequest(model, "error", totalLatency, totalLatency, 0, 0);
throw e;
}
}
private AiStreamResult wrapStreamWithMetrics(AiStreamResult original,
long startTime, String model) {
return new AiStreamResult() {
private boolean firstTokenRecorded = false;
private long ttftMs = -1;
@Override
public Flux<String> getTokenFlux() {
return original.getTokenFlux()
.doOnNext(token -> {
if (!firstTokenRecorded) {
ttftMs = System.currentTimeMillis() - startTime;
firstTokenRecorded = true;
log.debug("TTFT for model {}: {}ms", model, ttftMs);
}
})
.doOnComplete(() -> {
long totalLatency = System.currentTimeMillis() - startTime;
metricsRegistry.recordRequest(
model, "success",
ttftMs > 0 ? ttftMs : totalLatency,
totalLatency,
original.getInputTokens(),
original.getOutputTokens()
);
})
.doOnError(e -> {
long totalLatency = System.currentTimeMillis() - startTime;
metricsRegistry.recordRequest(model, "error", totalLatency, totalLatency, 0, 0);
});
}
@Override
public int getInputTokens() { return original.getInputTokens(); }
@Override
public int getOutputTokens() { return original.getOutputTokens(); }
};
}
}5.4 自定义注解定义
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AiTracked {
String model() default "gpt-4o";
String scene() default "default";
}5.5 业务使用示例
@Service
public class RagQueryService {
@AiTracked(model = "gpt-4o", scene = "customer_service")
public AiStreamResult query(String userQuestion) {
// 1. 向量检索
List<DocumentChunk> chunks = vectorStore.search(userQuestion, 5);
// 2. 记录语义相关性得分
double avgScore = chunks.stream()
.mapToDouble(DocumentChunk::getScore)
.average()
.orElse(0.0);
metricsRegistry.recordSemanticScore("customer_service_kb", avgScore);
// 3. 组装 Prompt
String prompt = promptBuilder.build(userQuestion, chunks);
// 4. 调用模型(AOP 会自动追踪)
return openAiClient.streamChat(prompt);
}
}六、指标的生命周期管理
埋点做好了,还有一个容易忽略的问题:指标数据的保留策略。
AI 应用的指标有明显的冷热特征:
- 最近 24 小时的数据需要高精度(用于实时告警和排查)
- 最近 30 天的数据需要中等精度(用于趋势分析)
- 超过 30 天的数据可以聚合降精度(用于成本分析)
在 Prometheus 里,可以用 Recording Rules 提前做聚合,减少长期存储的数据量:
groups:
- name: ai_recording_rules
interval: 5m
rules:
# 每5分钟记录一次各模型的 Token 消耗汇总
- record: ai:token_usage:rate5m
expr: rate(ai_token_input_total[5m]) + rate(ai_token_output_total[5m])
# 每5分钟记录一次请求成功率
- record: ai:request_success_rate:rate5m
expr: |
rate(ai_request_total{status="success"}[5m])
/ rate(ai_request_total[5m])七、建立可观测性的优先级
最后说一个实用建议:可观测性不是一步到位的,要按优先级建设。
第一优先级(上线必须有):
- 基础错误率监控(硬错误)
- TTFT 和总延迟 P99
- Token 消耗量(防止成本失控)
第二优先级(上线后一个月内):
- 语义相关性得分(RAG 系统专属)
- 分阶段延迟拆解
- 软错误分类统计
第三优先级(系统稳定后):
- 幻觉率估算
- 用户满意度信号关联
- 成本效益分析(Token 利用率)
不要一开始就想把所有指标全做了,重要的是先把框架搭好,高优先级的指标做扎实,再逐步补充。
总结
这篇文章建立了 AI 应用可观测性的四层框架:基础设施层、应用运行时层、AI 调用层、业务质量层。核心观点是:
- 普通监控的「成功即健康」假设在 AI 场景不成立
- TTFT、Token 利用率、语义相关性、幻觉率是 AI 特有的核心指标
- 用 Micrometer + AOP 可以做到低侵入的自动埋点
- 按优先级建设,不要追求一步到位
可观测性的本质是「在系统出问题之前就能察觉异常,出了问题能快速定位」。AI 应用的特殊性在于,出问题的方式更隐蔽——它不会崩溃,它只是悄悄变得不准确。
