第1698篇:Spring AI的Observability集成——Micrometer指标与分布式追踪
第1698篇:Spring AI的Observability集成——Micrometer指标与分布式追踪
你们的AI服务出了问题,是怎么排查的?
如果答案是"加日志、看日志、猜",那说明可观测性这块还差得很远。
我在一个项目里曾经花了2天时间排查一个AI服务的偶发超时问题。最后发现原因是:某个RAG向量检索在特定查询条件下会突然变慢,而这个慢查询触发了整个请求链路的雪崩。但我们当时没有分布式追踪,只能靠日志一行一行对着看时间戳,这2天里有1.5天花在了这种低效的对比工作上。
加了分布式追踪之后,同类问题的排查时间从天级别降到了分钟级别。
今天这篇,我来讲Spring AI的可观测性集成,包括指标、追踪和日志的完整方案。
可观测性的三根柱子
可观测性(Observability)在AI服务里分三个层面:
指标(Metrics):量化的数值型数据。大模型调用次数、响应延迟分布、Token使用量、缓存命中率……这些数字告诉你系统的整体健康状况。
追踪(Tracing):单次请求的完整执行路径。一次AI对话请求,经过了哪些组件,每一步花了多少时间,哪里出了错。追踪能帮你快速定位慢请求的根因。
日志(Logging):文本形式的事件记录。代码里写的log.info/warn/error,记录关键事件和上下文。
三者各有侧重,但真正发挥威力是把它们关联起来——从告警指标开始,用追踪找到慢请求,用日志看具体错误详情。
Spring AI的内置可观测性支持
Spring AI 1.0开始对可观测性有了比较好的内置支持,主要基于Micrometer的Observation API(Spring Boot 3.x的标准)。
Spring AI会自动为以下操作生成Observation(观察点):
- ChatModel调用(记录延迟、Token数量)
- EmbeddingModel调用(记录延迟、维度)
- VectorStore操作(记录检索延迟、结果数量)
先加依赖:
<!-- pom.xml -->
<!-- Spring AI(已包含Micrometer观察) -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<!-- Micrometer Tracing(选择你的追踪后端)-->
<!-- 用Zipkin/Brave -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
</dependency>
<!-- 或者用OpenTelemetry -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<!-- Prometheus指标导出 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>基本配置:
# application.yml
management:
endpoints:
web:
exposure:
include: health, metrics, prometheus, info
metrics:
export:
prometheus:
enabled: true
tracing:
sampling:
probability: 1.0 # 开发环境100%采样,生产环境可以降低
spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
observations:
enabled: true # 开启Chat观察
log-prompt: true # 在DEBUG级别记录Prompt(生产环境注意敏感信息)
log-completion: true # 记录AI回复
# Zipkin配置
management:
zipkin:
tracing:
endpoint: http://zipkin:9411/api/v2/spans理解Spring AI生成的指标
开启后,Spring AI会自动生成以下Micrometer指标:
# Chat Model指标
gen_ai.client.operation.duration # 请求延迟(直方图)
gen_ai.client.token.usage # Token使用量(Counter)
# 指标标签(tags):
# gen_ai.operation.name = "chat"
# gen_ai.system = "openai"
# gen_ai.request.model = "gpt-4"
# gen_ai.response.finish_reasons = "stop"
# Embedding Model指标
gen_ai.client.operation.duration{gen_ai.operation.name="embeddings"}
# VectorStore指标(需要Spring AI Vector Store)
db.vector.client.operation.duration用Prometheus查询语言查看这些指标:
# 大模型调用的P99延迟(按模型分组)
histogram_quantile(0.99,
rate(gen_ai_client_operation_duration_seconds_bucket{gen_ai_operation_name="chat"}[5m])
) by (gen_ai_request_model)
# 每分钟Token消耗量
rate(gen_ai_client_token_usage_total[1m])
# 大模型调用错误率
rate(gen_ai_client_operation_duration_seconds_count{error="true"}[5m])
/
rate(gen_ai_client_operation_duration_seconds_count[5m])添加自定义业务指标
Spring AI内置的指标是基础,业务层面的指标需要自己加:
@Service
public class InstrumentedChatService {
private final ChatClient chatClient;
private final MeterRegistry meterRegistry;
private final ObservationRegistry observationRegistry;
// 自定义业务指标
private final Counter sessionCreatedCounter;
private final Counter cacheHitCounter;
private final Counter cacheMissCounter;
private final Timer ragRetrievalTimer;
public InstrumentedChatService(ChatClient chatClient,
MeterRegistry meterRegistry,
ObservationRegistry observationRegistry) {
this.chatClient = chatClient;
this.meterRegistry = meterRegistry;
this.observationRegistry = observationRegistry;
// 初始化计数器
this.sessionCreatedCounter = Counter.builder("ai.session.created")
.description("新建AI会话数")
.register(meterRegistry);
this.cacheHitCounter = Counter.builder("ai.embedding.cache")
.tag("result", "hit")
.description("Embedding缓存命中")
.register(meterRegistry);
this.cacheMissCounter = Counter.builder("ai.embedding.cache")
.tag("result", "miss")
.description("Embedding缓存未命中")
.register(meterRegistry);
this.ragRetrievalTimer = Timer.builder("ai.rag.retrieval.duration")
.description("RAG检索耗时")
.publishPercentiles(0.5, 0.90, 0.95, 0.99)
.publishPercentileHistogram()
.register(meterRegistry);
}
public ChatResponse chat(String sessionId, String message) {
// 使用Observation API包装整个业务流程
Observation observation = Observation.createNotStarted(
"ai.chat.request",
observationRegistry
)
.contextualName("AI对话请求")
.lowCardinalityKeyValue("session.type", getSessionType(sessionId))
.start();
try (Observation.Scope scope = observation.openScope()) {
// 1. RAG检索(记录耗时和结果数量)
List<String> context = ragRetrievalTimer.record(() -> {
List<String> results = vectorStore.search(message, 5);
observation.highCardinalityKeyValue("rag.results.count",
String.valueOf(results.size()));
return results;
});
// 2. 调用大模型
String response = chatClient.call(buildPrompt(sessionId, message, context));
// 3. 记录业务指标
observation.highCardinalityKeyValue("response.length",
String.valueOf(response.length()));
observation.event(Observation.Event.of("response.received"));
return new ChatResponse(response);
} catch (Exception e) {
observation.error(e);
throw e;
} finally {
observation.stop();
}
}
}分布式追踪的Span设计
在AI服务里,一个好的分布式追踪需要把AI调用的关键步骤都体现出来。
@Component
public class RAGChatPipeline {
private final Tracer tracer;
public String chat(String sessionId, String userMessage) {
// 根Span:整个AI对话请求
Span rootSpan = tracer.nextSpan()
.name("ai.chat.pipeline")
.tag("session.id", sessionId)
.start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(rootSpan)) {
// 子Span1:Prompt构建
String prompt = traceAndExecute("ai.prompt.build", () -> {
return buildPrompt(sessionId, userMessage);
});
// 子Span2:RAG检索
List<String> context = traceAndExecute("ai.rag.retrieve", span -> {
span.tag("query.length", String.valueOf(userMessage.length()));
List<String> results = vectorStore.search(userMessage, 5);
span.tag("results.count", String.valueOf(results.size()));
return results;
});
// 子Span3:大模型调用(Spring AI会自动创建子Span)
String response = chatClient.call(buildFinalPrompt(prompt, context));
// 子Span4:结果保存
traceAndExecute("ai.response.save", () -> {
conversationRepo.save(sessionId, userMessage, response);
return null;
});
rootSpan.tag("response.length", String.valueOf(response.length()));
return response;
} catch (Exception e) {
rootSpan.error(e);
throw e;
} finally {
rootSpan.end();
}
}
// 辅助方法:创建子Span并执行
private <T> T traceAndExecute(String spanName, Supplier<T> action) {
Span span = tracer.nextSpan().name(spanName).start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
return action.get();
} catch (Exception e) {
span.error(e);
throw e;
} finally {
span.end();
}
}
private <T> T traceAndExecute(String spanName, Function<Span, T> action) {
Span span = tracer.nextSpan().name(spanName).start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
return action.apply(span);
} catch (Exception e) {
span.error(e);
throw e;
} finally {
span.end();
}
}
}在Zipkin或Jaeger里,这条追踪链路会显示为:
ai.chat.pipeline (180ms)
├── ai.prompt.build (3ms)
├── ai.rag.retrieve (45ms)
├── gen_ai.chat (125ms) ← Spring AI自动生成
│ └── http.client.request (120ms) ← OkHttp自动生成
└── ai.response.save (7ms)一眼就能看出慢在哪里。
Token使用量的精细化监控
大模型调用是按Token计费的,Token监控对成本控制至关重要:
@Component
public class TokenUsageMonitor {
private final MeterRegistry meterRegistry;
// 记录Token使用(区分不同维度)
public void recordTokenUsage(TokenUsage usage, String model, String feature) {
// Prompt Token计数(输入,更贵)
Counter.builder("ai.token.usage")
.tag("model", model)
.tag("feature", feature) // 功能模块(chat/summary/classification)
.tag("type", "prompt")
.register(meterRegistry)
.increment(usage.getPromptTokens());
// Completion Token计数(输出)
Counter.builder("ai.token.usage")
.tag("model", model)
.tag("feature", feature)
.tag("type", "completion")
.register(meterRegistry)
.increment(usage.getCompletionTokens());
// 成本估算(按实际价格计算)
double estimatedCost = calculateCost(usage, model);
Counter.builder("ai.estimated.cost.usd")
.tag("model", model)
.tag("feature", feature)
.register(meterRegistry)
.increment(estimatedCost);
}
private double calculateCost(TokenUsage usage, String model) {
// 简化的价格计算(实际价格要看官网)
return switch (model) {
case "gpt-4" -> usage.getPromptTokens() * 0.00003 +
usage.getCompletionTokens() * 0.00006;
case "gpt-3.5-turbo" -> usage.getPromptTokens() * 0.0000015 +
usage.getCompletionTokens() * 0.000002;
default -> 0.0;
};
}
}Prometheus告警规则:
# 按功能模块的每日Token消耗超阈值告警
- alert: HighTokenUsage
expr: sum(increase(ai_token_usage_total[24h])) by (feature) > 1000000
for: 0m
annotations:
summary: "功能 {{ $labels.feature }} 24小时Token消耗超100万"
description: "当前消耗: {{ $value | humanize }} tokens"
# 估算成本超阈值告警
- alert: HighAICost
expr: sum(increase(ai_estimated_cost_usd_total[24h])) > 100
annotations:
summary: "AI服务24小时估算成本超100美元"日志与追踪的关联
单独的日志难以追踪跨服务的问题,要把traceId注入到日志里:
<!-- logback-spring.xml -->
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<property name="LOG_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{traceId},%X{spanId}] %logger{36} - %msg%n"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${LOG_PATTERN}</pattern>
</encoder>
</appender>
<!-- 生产环境输出JSON格式,方便ELK/Loki采集 -->
<appender name="JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeMdcKeyName>traceId</includeMdcKeyName>
<includeMdcKeyName>spanId</includeMdcKeyName>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="JSON"/>
</root>
</configuration>Spring Boot Actuator + Micrometer Tracing会自动把traceId注入到MDC,所以每条日志都会带上traceId。这样当你在Zipkin里找到一条慢请求,复制它的traceId,直接去Kibana/Grafana Loki搜索这个traceId,就能找到这次请求的所有相关日志。
Grafana Dashboard实战
这是我们AI服务的Grafana Dashboard核心面板配置,分享出来直接用:
面板1:AI服务整体健康
# 请求成功率
1 - (
rate(gen_ai_client_operation_duration_seconds_count{error="true"}[5m])
/
rate(gen_ai_client_operation_duration_seconds_count[5m])
)
# 当前并发请求数
sum(ai_active_requests_gauge)面板2:延迟分布热力图
# P50/P90/P99延迟
histogram_quantile(0.50, rate(gen_ai_client_operation_duration_seconds_bucket[5m]))
histogram_quantile(0.90, rate(gen_ai_client_operation_duration_seconds_bucket[5m]))
histogram_quantile(0.99, rate(gen_ai_client_operation_duration_seconds_bucket[5m]))面板3:Token消耗趋势
# 每分钟Prompt Token消耗
rate(ai_token_usage_total{type="prompt"}[1m]) * 60
# 每分钟Completion Token消耗
rate(ai_token_usage_total{type="completion"}[1m]) * 60面板4:RAG系统性能
# RAG检索P99延迟
histogram_quantile(0.99, rate(ai_rag_retrieval_duration_seconds_bucket[5m]))
# 向量检索每秒操作数
rate(ai_rag_retrieval_duration_seconds_count[1m])
# 缓存命中率
rate(ai_embedding_cache_total{result="hit"}[5m])
/
(rate(ai_embedding_cache_total{result="hit"}[5m]) + rate(ai_embedding_cache_total{result="miss"}[5m]))采样率的策略设计
分布式追踪如果100%采样,存储和性能开销很大。需要合理的采样策略:
@Configuration
public class TracingConfig {
@Bean
public Sampler customSampler() {
return new Sampler() {
@Override
public SamplingFlags isSampled(long traceId) {
// 策略1:基于概率采样
// 普通请求10%采样,慢请求100%采样
return SamplingFlags.SAMPLED; // 这里简化,实际要更复杂
}
};
}
// 用自定义ObservationFilter实现动态采样
@Bean
public ObservationFilter samplingFilter(Tracer tracer) {
return (context, chain) -> {
Observation.Context ctx = chain.handle(context);
// 获取当前Span
Span span = tracer.currentSpan();
if (span != null) {
// 如果这次请求耗时超过2秒,强制记录(即使采样率低)
// 这需要在after-complete钩子里处理
}
return ctx;
};
}
}更实用的方案是用OpenTelemetry的Tail-Based Sampling(尾部采样):在请求完成后,根据请求的结果决定是否保留追踪数据。这样可以保留所有错误请求和慢请求的追踪,而过滤掉大部分正常请求。
一个完整的可观测性架构
这套架构的关键是三者的关联:
- Grafana里点击一个延迟高的时间段,可以跳转到Jaeger查看那段时间的慢请求追踪
- 在Jaeger里找到一个慢Span,复制traceId,跳转到Loki查看对应日志
- 在Loki里看到日志里的错误,能定位到是哪次具体的AI调用
总结
可观测性不是锦上添花,而是AI服务工程化的基础设施。没有可观测性,你的AI服务就是一个黑盒子。
从哪里开始:
- 先加Prometheus + Grafana:Spring AI内置的指标已经够用,先把基础监控建起来
- 然后加分布式追踪:用Zipkin或Jaeger,把关键链路的Span设计好
- 最后优化日志:结构化日志,注入traceId,建立关联
- 建立告警规则:不要靠人工盯,让系统自动告警
每一步都能独立产生价值,不需要一次性全做完。
