第1752篇:分布式链路追踪在AI系统中的实践——OpenTelemetry与SkyWalking集成
第1752篇:分布式链路追踪在AI系统中的实践——OpenTelemetry与SkyWalking集成
上个月帮一个团队排查问题,用户反映AI对话偶尔会出现超长延迟,但不是每次都有,复现率大概30%。
日志翻了一遍没有异常,各服务的监控面板也都绿着。最后花了两天时间,靠着一条条日志拼时间线,才发现是某个Embedding服务在特定情况下会发起一次多余的数据库查询,这个查询偶尔会碰到慢查询,整个链路就被拖慢了。
如果当时有完善的分布式链路追踪,五分钟就能定位。这件事让我下定决心把链路追踪这块补上,而且要针对AI系统的特点做深度集成。
今天把这块的实战经验整理出来。
一、AI系统的链路追踪与传统系统的差异
传统的分布式链路追踪,解决的是"一个请求经过了哪些服务、每个服务耗了多少时间"这个问题。对于AI系统,这个问题依然存在,但还多了几层复杂度:
AI调用的语义信息很重要。知道某次LLM调用耗时8秒没什么意义,但如果同时记录了"输入1200tokens、输出800tokens、使用模型gpt-4",问题就变得有意义了——可以对比不同模型的延迟分布、估算成本、发现异常的token消耗。
工具调用形成嵌套链路。现代AI系统大量使用Agent,一个用户请求可能触发:LLM推理→工具调用1→工具调用2→二次LLM推理→返回。这是树状的调用链,而不是简单的线性链路。如果追踪系统不能体现这种嵌套结构,排查起来依然是一团糨糊。
向量检索是关键路径。RAG系统里,向量检索是高频热路径,其延迟直接影响整体体验。向量索引的命中率、检索耗时分布,都需要被追踪起来。
异步任务的链路传播。AI任务经常是异步的——用户提交一个文档处理任务,后台跑几分钟再通知结果。这个异步链路怎么追踪,是个专门的问题。
二、选型:OpenTelemetry + SkyWalking
先说我为什么选这个组合,而不是Zipkin或Jaeger。
OpenTelemetry(简称OTel)是现在的事实标准,它统一了追踪、指标、日志三种信号的采集规范。不绑定具体的后端,今天用SkyWalking,明天想换Jaeger,只改配置就行。
SkyWalking选它有几个实际考量:
- Java生态集成成熟,有完善的自动探针
- 支持AI框架的插件(虽然还不完整,但社区在快速发展)
- 部署运维比Jaeger、Tempo简单
- 国内社区活跃,遇到问题好找资料
架构如下图:
三、基础集成配置
依赖引入:
<dependencies>
<!-- OpenTelemetry SDK -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.32.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<version>1.32.0</version>
</dependency>
<!-- Spring Boot自动配置 -->
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-spring-boot-starter</artifactId>
<version>1.32.0-alpha</version>
</dependency>
<!-- Micrometer桥接 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
</dependencies>application.yml基础配置:
management:
tracing:
sampling:
probability: 1.0 # 开发环境100%采样,生产按需调整
otlp:
tracing:
endpoint: http://otel-collector:4317
otel:
service:
name: ai-chat-service
traces:
exporter: otlp
metrics:
exporter: otlp
exporter:
otlp:
endpoint: http://otel-collector:4317
protocol: grpc
logging:
pattern:
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"OTel Collector的配置:
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 1s
send_batch_size: 1024
# 添加AI特有的资源属性
resource:
attributes:
- key: deployment.environment
value: production
action: insert
# 过滤健康检查等噪声
filter:
traces:
span:
- 'attributes["http.route"] == "/actuator/health"'
exporters:
# 发往SkyWalking
otlp/skywalking:
endpoint: http://skywalking-oap:11800
tls:
insecure: true
# 同时发往Jaeger做备份
jaeger:
endpoint: http://jaeger:14250
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, resource, filter]
exporters: [otlp/skywalking]四、核心实现:AI调用的语义追踪
这是整个方案最有价值的部分。我们需要给AI调用加上有意义的语义属性,让追踪数据真正可分析。
先定义AI相关的Span属性常量,遵循OpenTelemetry Semantic Conventions的AI/LLM规范(目前还在草案阶段,但值得提前对齐):
public final class AISemanticAttributes {
// LLM相关
public static final AttributeKey<String> LLM_VENDOR =
AttributeKey.stringKey("llm.vendor");
public static final AttributeKey<String> LLM_REQUEST_MODEL =
AttributeKey.stringKey("llm.request.model");
public static final AttributeKey<String> LLM_RESPONSE_MODEL =
AttributeKey.stringKey("llm.response.model");
public static final AttributeKey<Long> LLM_USAGE_PROMPT_TOKENS =
AttributeKey.longKey("llm.usage.prompt_tokens");
public static final AttributeKey<Long> LLM_USAGE_COMPLETION_TOKENS =
AttributeKey.longKey("llm.usage.completion_tokens");
public static final AttributeKey<Long> LLM_USAGE_TOTAL_TOKENS =
AttributeKey.longKey("llm.usage.total_tokens");
public static final AttributeKey<String> LLM_FINISH_REASON =
AttributeKey.stringKey("llm.finish_reason");
public static final AttributeKey<Double> LLM_TEMPERATURE =
AttributeKey.doubleKey("llm.request.temperature");
// 向量检索相关
public static final AttributeKey<String> VECTOR_DB_VENDOR =
AttributeKey.stringKey("vector_db.vendor");
public static final AttributeKey<String> VECTOR_DB_OPERATION =
AttributeKey.stringKey("vector_db.operation");
public static final AttributeKey<Long> VECTOR_DB_RESULTS_COUNT =
AttributeKey.longKey("vector_db.results_count");
public static final AttributeKey<Double> VECTOR_DB_TOP_SCORE =
AttributeKey.doubleKey("vector_db.top_similarity_score");
// RAG相关
public static final AttributeKey<Long> RAG_CHUNKS_RETRIEVED =
AttributeKey.longKey("rag.chunks_retrieved");
public static final AttributeKey<String> RAG_RETRIEVAL_STRATEGY =
AttributeKey.stringKey("rag.retrieval_strategy");
private AISemanticAttributes() {}
}封装LLM调用的追踪包装器:
@Component
@Slf4j
public class TracedLLMClient {
private final Tracer tracer;
private final OpenAiChatClient openAiChatClient;
private final MeterRegistry meterRegistry;
public ChatResponse chat(ChatRequest request) {
Span span = tracer.spanBuilder("llm.chat")
.setSpanKind(SpanKind.CLIENT)
.setAttribute(AISemanticAttributes.LLM_VENDOR, "openai")
.setAttribute(AISemanticAttributes.LLM_REQUEST_MODEL, request.getModel())
.setAttribute(AISemanticAttributes.LLM_TEMPERATURE, request.getTemperature())
.startSpan();
try (Scope scope = span.makeCurrent()) {
long startTime = System.currentTimeMillis();
ChatResponse response = openAiChatClient.chat(request);
long duration = System.currentTimeMillis() - startTime;
// 记录语义信息到Span
if (response.getUsage() != null) {
span.setAttribute(AISemanticAttributes.LLM_USAGE_PROMPT_TOKENS,
response.getUsage().getPromptTokens());
span.setAttribute(AISemanticAttributes.LLM_USAGE_COMPLETION_TOKENS,
response.getUsage().getCompletionTokens());
span.setAttribute(AISemanticAttributes.LLM_USAGE_TOTAL_TOKENS,
response.getUsage().getTotalTokens());
// 同步记录到Metrics,方便聚合统计
meterRegistry.counter("llm.tokens.used",
"model", request.getModel(),
"type", "prompt"
).increment(response.getUsage().getPromptTokens());
meterRegistry.counter("llm.tokens.used",
"model", request.getModel(),
"type", "completion"
).increment(response.getUsage().getCompletionTokens());
}
if (response.getChoices() != null && !response.getChoices().isEmpty()) {
span.setAttribute(AISemanticAttributes.LLM_FINISH_REASON,
response.getChoices().get(0).getFinishReason());
}
span.setAttribute(AISemanticAttributes.LLM_RESPONSE_MODEL,
response.getModel() != null ? response.getModel() : request.getModel());
span.setStatus(StatusCode.OK);
// 记录成本估算事件(便于后续分析)
span.addEvent("cost.estimated", Attributes.of(
AttributeKey.doubleKey("cost.usd"),
estimateCost(request.getModel(), response.getUsage())
));
return response;
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
meterRegistry.counter("llm.errors",
"model", request.getModel(),
"error_type", e.getClass().getSimpleName()
).increment();
throw e;
} finally {
span.end();
}
}
// 流式响应的追踪,稍微复杂一点
public Flux<ChatResponseChunk> chatStream(ChatRequest request) {
Span span = tracer.spanBuilder("llm.chat.stream")
.setSpanKind(SpanKind.CLIENT)
.setAttribute(AISemanticAttributes.LLM_VENDOR, "openai")
.setAttribute(AISemanticAttributes.LLM_REQUEST_MODEL, request.getModel())
.startSpan();
AtomicLong completionTokens = new AtomicLong(0);
AtomicBoolean firstChunk = new AtomicBoolean(true);
long startTime = System.currentTimeMillis();
try (Scope scope = span.makeCurrent()) {
return openAiChatClient.chatStream(request)
.doOnNext(chunk -> {
if (firstChunk.compareAndSet(true, false)) {
// 记录首token延迟(Time To First Token,AI响应的重要指标)
long ttft = System.currentTimeMillis() - startTime;
span.addEvent("first_token", Attributes.of(
AttributeKey.longKey("ttft_ms"), ttft
));
meterRegistry.timer("llm.ttft", "model", request.getModel())
.record(ttft, TimeUnit.MILLISECONDS);
}
completionTokens.incrementAndGet();
})
.doOnComplete(() -> {
span.setAttribute(AISemanticAttributes.LLM_USAGE_COMPLETION_TOKENS,
completionTokens.get());
span.setStatus(StatusCode.OK);
span.end();
})
.doOnError(e -> {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
span.end();
})
// 关键:传播Span上下文到响应流中
.contextWrite(Context.current().with(span));
}
}
private double estimateCost(String model, TokenUsage usage) {
if (usage == null) return 0;
// 简化的成本估算,实际应根据最新定价调整
Map<String, double[]> pricing = Map.of(
"gpt-4", new double[]{0.03, 0.06}, // [input/1k, output/1k]
"gpt-4-turbo", new double[]{0.01, 0.03},
"gpt-3.5-turbo", new double[]{0.001, 0.002}
);
double[] price = pricing.getOrDefault(model, new double[]{0.01, 0.01});
return (usage.getPromptTokens() * price[0] +
usage.getCompletionTokens() * price[1]) / 1000.0;
}
}五、向量检索的追踪
RAG系统里向量检索是高频路径,也是经常出问题的地方,必须要有专门的追踪。
@Component
public class TracedVectorStore {
private final Tracer tracer;
private final VectorStore vectorStore;
public List<Document> similaritySearch(SearchRequest searchRequest) {
Span span = tracer.spanBuilder("vector_db.similarity_search")
.setSpanKind(SpanKind.CLIENT)
.setAttribute(AISemanticAttributes.VECTOR_DB_VENDOR, "milvus")
.setAttribute(AISemanticAttributes.VECTOR_DB_OPERATION, "similarity_search")
.setAttribute(AttributeKey.longKey("vector_db.top_k"),
(long) searchRequest.getTopK())
.setAttribute(AttributeKey.doubleKey("vector_db.similarity_threshold"),
searchRequest.getSimilarityThreshold())
.startSpan();
try (Scope scope = span.makeCurrent()) {
List<Document> results = vectorStore.similaritySearch(searchRequest);
span.setAttribute(AISemanticAttributes.VECTOR_DB_RESULTS_COUNT,
(long) results.size());
if (!results.isEmpty()) {
// 记录最高相似度分数
double topScore = results.stream()
.mapToDouble(doc -> doc.getMetadata().containsKey("score") ?
(Double) doc.getMetadata().get("score") : 0.0)
.max()
.orElse(0.0);
span.setAttribute(AISemanticAttributes.VECTOR_DB_TOP_SCORE, topScore);
// 记录检索到的文档来源(便于分析知识库质量)
List<String> sources = results.stream()
.map(doc -> (String) doc.getMetadata().getOrDefault("source", "unknown"))
.distinct()
.limit(5)
.collect(Collectors.toList());
span.setAttribute(AttributeKey.stringArrayKey("vector_db.sources"), sources);
}
span.setStatus(StatusCode.OK);
return results;
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}
}六、Agent工具调用的链路追踪
Agent场景下,工具调用会形成嵌套的Span树,这正是链路追踪最能发挥价值的地方。
@Component
@Slf4j
public class TracedAgentExecutor {
private final Tracer tracer;
private final List<AgentTool> tools;
private final TracedLLMClient llmClient;
public AgentResult execute(String userQuery, String conversationId) {
// 创建根Span
Span rootSpan = tracer.spanBuilder("agent.execute")
.setAttribute(AttributeKey.stringKey("agent.conversation_id"), conversationId)
.setAttribute(AttributeKey.stringKey("agent.query_preview"),
truncate(userQuery, 100))
.startSpan();
try (Scope scope = rootSpan.makeCurrent()) {
int iterationCount = 0;
List<Message> messages = new ArrayList<>();
messages.add(Message.user(userQuery));
while (iterationCount < 10) { // 最大迭代次数
iterationCount++;
// LLM推理 - 会作为rootSpan的子Span
ChatResponse response = llmClient.chat(
ChatRequest.builder()
.messages(messages)
.tools(tools.stream().map(AgentTool::getDefinition).collect(Collectors.toList()))
.build()
);
// 检查是否需要调用工具
if (response.getToolCalls() == null || response.getToolCalls().isEmpty()) {
rootSpan.setAttribute(AttributeKey.longKey("agent.iterations"), iterationCount);
rootSpan.setStatus(StatusCode.OK);
return AgentResult.success(response.getContent());
}
// 执行工具调用
for (ToolCall toolCall : response.getToolCalls()) {
executeToolWithTrace(toolCall, messages);
}
}
rootSpan.setAttribute(AttributeKey.booleanKey("agent.max_iterations_reached"), true);
return AgentResult.maxIterationsReached();
} catch (Exception e) {
rootSpan.setStatus(StatusCode.ERROR, e.getMessage());
rootSpan.recordException(e);
throw e;
} finally {
rootSpan.end();
}
}
private void executeToolWithTrace(ToolCall toolCall, List<Message> messages) {
// 工具调用Span会自动成为当前活跃Span(rootSpan)的子Span
Span toolSpan = tracer.spanBuilder("agent.tool." + toolCall.getName())
.setAttribute(AttributeKey.stringKey("tool.name"), toolCall.getName())
.setAttribute(AttributeKey.stringKey("tool.arguments_preview"),
truncate(toolCall.getArguments(), 200))
.startSpan();
try (Scope scope = toolSpan.makeCurrent()) {
AgentTool tool = findTool(toolCall.getName());
if (tool == null) {
toolSpan.setStatus(StatusCode.ERROR, "Tool not found: " + toolCall.getName());
messages.add(Message.toolResult(toolCall.getId(), "Error: tool not found"));
return;
}
String result = tool.execute(toolCall.getArguments());
toolSpan.setAttribute(AttributeKey.longKey("tool.result_length"),
result.length());
toolSpan.setStatus(StatusCode.OK);
messages.add(Message.assistant(null, List.of(toolCall)));
messages.add(Message.toolResult(toolCall.getId(), result));
} catch (Exception e) {
toolSpan.setStatus(StatusCode.ERROR, e.getMessage());
toolSpan.recordException(e);
messages.add(Message.toolResult(toolCall.getId(),
"Error: " + e.getMessage()));
} finally {
toolSpan.end();
}
}
}七、异步任务的链路传播
这是最容易搞坏的部分。异步任务通过消息队列传递时,TraceContext不会自动传播,需要手动处理。
@Service
@Slf4j
public class AsyncAITaskService {
private final RabbitTemplate rabbitTemplate;
private final Tracer tracer;
private final W3CTraceContextPropagator propagator;
// 任务提交:在消息中注入TraceContext
public String submitTask(AITask task) {
Span span = tracer.spanBuilder("ai.task.submit")
.setAttribute(AttributeKey.stringKey("task.type"), task.getType())
.setAttribute(AttributeKey.stringKey("task.id"), task.getId())
.startSpan();
try (Scope scope = span.makeCurrent()) {
// 将TraceContext注入到消息Headers
Map<String, Object> headers = new HashMap<>();
propagator.inject(
io.opentelemetry.context.Context.current(),
headers,
(carrier, key, value) -> carrier.put(key, value)
);
task.setTraceHeaders(headers);
task.setParentTraceId(span.getSpanContext().getTraceId());
rabbitTemplate.convertAndSend(
"ai.tasks.exchange",
"ai.task." + task.getType(),
task
);
span.setStatus(StatusCode.OK);
return task.getId();
} finally {
span.end();
}
}
}
@Component
@RabbitListener(queues = "ai.tasks.document.processing")
@Slf4j
public class DocumentProcessingWorker {
private final Tracer tracer;
private final W3CTraceContextPropagator propagator;
@RabbitHandler
public void processDocument(AITask task) {
// 从消息中提取TraceContext,恢复链路
io.opentelemetry.context.Context parentContext = propagator.extract(
io.opentelemetry.context.Context.root(),
task.getTraceHeaders(),
(carrier, key) -> {
Object value = carrier.get(key);
return value != null ? value.toString() : null;
}
);
// 创建新Span,以消息中的TraceContext为父
Span span = tracer.spanBuilder("ai.task.process")
.setParent(parentContext) // 关键:继承父TraceContext
.setAttribute(AttributeKey.stringKey("task.id"), task.getId())
.setAttribute(AttributeKey.stringKey("task.type"), task.getType())
.startSpan();
try (Scope scope = span.makeCurrent()) {
log.info("Processing task: {}, traceId: {}",
task.getId(), span.getSpanContext().getTraceId());
// 实际处理逻辑...
doProcessDocument(task);
span.setStatus(StatusCode.OK);
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}
}八、SkyWalking的自定义Dashboard配置
光把数据收集起来还不够,需要有针对性的Dashboard才能快速发现问题。
SkyWalking支持OAL(Observability Analysis Language)来定义自定义指标:
// skywalking-custom.oal
// LLM调用的P99延迟,按模型分组
llm_call_p99 = from(Span.latency).filter(tags["llm.vendor"] != null).percentile(99);
// Token消耗速率
llm_tokens_per_minute = from(Span.*).filter(tags["llm.usage.total_tokens"] != null)
.rate1m();
// 向量检索命中质量(平均最高相似度)
vector_search_avg_score = from(Span.tag("vector_db.top_similarity_score"))
.filter(tags["vector_db.operation"] == "similarity_search")
.avg();
// Agent迭代次数分布
agent_iteration_count = from(Span.tag("agent.iterations"))
.filter(tags["agent.iterations"] != null)
.histogram(1, 10);九、采样策略:在完整性和性能之间取平衡
100%采样在生产环境不现实,但采样率太低又会漏掉关键问题。AI系统的采样策略需要特别设计:
@Configuration
public class TracingSamplerConfig {
@Bean
public Sampler aiAwareSampler() {
return new AIAwareSampler();
}
public static class AIAwareSampler implements Sampler {
@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {
// 规则1:错误请求100%采样(不管是什么操作)
// 这个在SpanProcessor里做更合适,这里先标记
// 规则2:LLM调用100%采样(这是最有价值的数据)
if (name.startsWith("llm.")) {
return SamplingResult.recordAndSample();
}
// 规则3:Agent执行100%采样
if (name.startsWith("agent.")) {
return SamplingResult.recordAndSample();
}
// 规则4:慢请求追踪(通过父Span的属性判断)
// 这个需要tail-based sampling,超出本文范围
// 规则5:HTTP请求10%采样
if (spanKind == SpanKind.SERVER || spanKind == SpanKind.CLIENT) {
return traceId.hashCode() % 10 == 0 ?
SamplingResult.recordAndSample() :
SamplingResult.drop();
}
return SamplingResult.recordAndSample();
}
@Override
public String getDescription() {
return "AIAwareSampler";
}
}
}十、踩坑记录
坑一:Reactor上下文传播
用Spring WebFlux做AI接口时,Mono/Flux的上下文传播和传统的ThreadLocal完全不同。Span信息存在ThreadLocal里,切换到IO线程后就丢了。必须用Reactor Context来传播:
// 错误做法
return llmClient.chatAsync(request)
.map(response -> {
// 这里的Span上下文可能已经丢失
span.end();
return response;
});
// 正确做法
return llmClient.chatAsync(request)
.contextWrite(Context.current()) // 关键:写入Reactor Context
.map(response -> {
span.end();
return response;
});坑二:流式响应的Span生命周期
SSE流式响应中,不能在发起请求时就结束Span,必须等流完全结束才能调用span.end()。如果提前结束,后面的doOnComplete里再往Span里写数据会被静默忽略。
坑三:TraceId格式不兼容
SkyWalking自己的traceId是类似abc123def456.1.1这种格式,而OTel的traceId是128位十六进制。通过OTel协议接入SkyWalking时,SkyWalking会做格式转换,但如果你的代码里手动读traceId然后写日志,会发现两边的格式不一致,关联不起来。解决方案是统一用OTel的API读traceId,不要混用SkyWalking原生API。
坑四:高频操作的性能影响
向量检索可能每秒调用几百次,每次都创建Span会有可测量的性能开销。实测下来,在压测环境里开启100%采样,每个向量检索Span大概增加0.5-1ms延迟。对于高频低延迟路径,考虑把采样率降到10%-20%,或者只记录关键属性不记录完整Span。
十一、整体效果
接入链路追踪之后,我们系统的可观测性有了质的提升。最直观的改变是:
以前排查问题,先看日志,再看监控,再复现,通常要半天;现在直接在SkyWalking里搜有问题的traceId,点开链路图,五分钟就能定位到哪个Span异常,是LLM调用慢了还是向量检索出了问题,一目了然。
更重要的是,有了token消耗的历史数据,我们能精确地做成本分析,知道哪些功能在烧钱,哪些用户是高消耗用户,做精细化运营有了数据支撑。
这套方案虽然前期投入不小,但上线后的价值是持续的,强烈建议每个认真做AI系统的团队把链路追踪列为基础设施必选项。
