分布式追踪在 AI 应用里的实践——Trace 一次完整的 RAG 调用
分布式追踪在 AI 应用里的实践——Trace 一次完整的 RAG 调用
适读人群:AI 后端工程师 / 对可观测性感兴趣 | 阅读时长:约16分钟 | 核心价值:用 OpenTelemetry 把 RAG 调用链路完全可见,发现你看不见的性能瓶颈
有一段时间我们的 AI 问答系统响应很慢,用户反馈说"有时候要等好久"。
我去看监控,看到的是 HTTP 总耗时 8-12 秒。但问题在哪?不知道。这 8 秒里,有多少花在了 Embedding?有多少花在了向量检索?有多少花在了 LLM 生成?中间有没有某个环节异常地慢?
在没有分布式追踪之前,这些问题我只能靠打日志猜。在每个关键步骤前后加 System.currentTimeMillis(),然后在日志里搜,手动算差值。这个方法太原始了,而且多线程场景下日志顺序乱了就完全没法看。
接入 OpenTelemetry 之后,情况完全不一样了。我在 Jaeger 里点开一个慢请求,整个 RAG 链路清清楚楚:
总耗时: 9.2s
├── 请求解析: 2ms
├── Query 改写: 680ms ← 这个有点慢
├── Embedding 生成: 320ms
├── 向量检索: 45ms
├── 文档重排序: 1.8s ← 这个是主要问题!
└── LLM 生成: 6.3s文档重排序用了 1.8 秒,这在我接入追踪之前完全不知道。查了一下,原来重排序用的是一个外部 API,请求没有复用连接,每次都新建 TCP 连接。修一行配置,1.8 秒变成 0.2 秒。
这就是分布式追踪在 AI 应用里的价值。
RAG 调用的完整链路
要追踪,先得清楚要追什么。一次典型的 RAG 调用链路:
用户请求
|
v
[1] 请求解析 & 权限验证
|
v
[2] Query 改写(可选,用 LLM 把用户问题改写成更适合检索的形式)
|
v
[3] Embedding 生成(把改写后的 query 转成向量)
|
v
[4] 向量检索(在知识库里找相似文档)
|
v
[5] 文档过滤 & 重排序(可选,二次排序提升相关性)
|
v
[6] Prompt 构建(把检索结果组装进提示词)
|
v
[7] LLM 生成
|
v
[8] 结果解析 & 返回每一步都需要独立的 Span,这样才能精确定位慢在哪。
依赖配置
<!-- Spring Boot 3 + OpenTelemetry -->
<dependencies>
<!-- Micrometer OTEL Bridge -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<!-- OTEL Exporter(导出到 Jaeger/Zipkin/OTLP)-->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<!-- Spring Boot Actuator(自动配置 Tracing)-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 支持异步方法的追踪传播 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
</dependency>
</dependencies># application.yml
management:
tracing:
sampling:
probability: 1.0 # 开发环境全采样,生产环境可以调低到 0.1
otlp:
tracing:
endpoint: http://localhost:4318/v1/traces # OTLP HTTP 端点
spring:
application:
name: rag-service # 在 Jaeger 里显示的服务名核心追踪实现
1. 追踪器工具类
@Component
@Slf4j
public class AiTracer {
private final Tracer tracer;
public AiTracer(Tracer tracer) {
this.tracer = tracer;
}
/**
* 创建一个 RAG 操作的 Span
* 封装了常用的属性设置逻辑
*/
public Span startRagSpan(String operationName, String conversationId) {
return tracer.nextSpan()
.name(operationName)
.tag("rag.conversation_id", conversationId)
.tag("rag.service", "rag-service")
.start();
}
/**
* 执行一个被追踪的操作
* 自动处理 Span 的开始、结束和异常记录
*/
public <T> T traced(String spanName, String conversationId,
Map<String, String> tags, Callable<T> operation) throws Exception {
Span span = tracer.nextSpan().name(spanName).start();
// 设置自定义标签
if (tags != null) {
tags.forEach(span::tag);
}
span.tag("conversation.id", conversationId);
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
T result = operation.call();
span.tag("status", "success");
return result;
} catch (Exception e) {
span.tag("status", "error");
span.tag("error.message", e.getMessage());
span.tag("error.type", e.getClass().getSimpleName());
throw e;
} finally {
span.end();
}
}
}2. 完整的 RAG Service 追踪
@Service
@Slf4j
public class RagService {
@Autowired
private AiTracer aiTracer;
@Autowired
private EmbeddingModel embeddingModel;
@Autowired
private VectorStore vectorStore;
@Autowired
private ChatClient chatClient;
@Autowired
private QueryRewriteService queryRewriteService;
@Autowired
private Tracer tracer;
/**
* 完整的 RAG 调用,每个步骤独立 Span
*/
public RagResponse query(RagRequest request) {
String conversationId = request.getConversationId();
// 在父 Span 里记录整体情况
Span parentSpan = tracer.currentSpan();
if (parentSpan != null) {
parentSpan.tag("rag.query_length", String.valueOf(request.getQuery().length()));
}
try {
// Step 1: Query 改写
String rewrittenQuery = traceQueryRewrite(request.getQuery(), conversationId);
// Step 2: Embedding 生成
float[] queryEmbedding = traceEmbedding(rewrittenQuery, conversationId);
// Step 3: 向量检索
List<Document> retrievedDocs = traceVectorSearch(queryEmbedding, conversationId);
// Step 4: 文档重排序
List<Document> rankedDocs = traceRerank(rewrittenQuery, retrievedDocs, conversationId);
// Step 5: LLM 生成
String answer = traceLlmGeneration(request.getQuery(), rankedDocs, conversationId);
return RagResponse.builder()
.answer(answer)
.sources(extractSources(rankedDocs))
.conversationId(conversationId)
.build();
} catch (Exception e) {
log.error("RAG query failed, conversationId={}", conversationId, e);
throw new RagException("RAG query failed", e);
}
}
private String traceQueryRewrite(String originalQuery, String conversationId) {
Span span = tracer.nextSpan()
.name("rag.query_rewrite")
.tag("conversation.id", conversationId)
.tag("query.original_length", String.valueOf(originalQuery.length()))
.start();
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
long start = System.currentTimeMillis();
String rewritten = queryRewriteService.rewrite(originalQuery);
long elapsed = System.currentTimeMillis() - start;
span.tag("query.rewritten_length", String.valueOf(rewritten.length()));
span.tag("query.rewrite_changed", String.valueOf(!rewritten.equals(originalQuery)));
// 记录耗时(追踪框架会自动记录,这里手动记是为了方便聚合分析)
log.info("[TRACE] query_rewrite conversationId={} elapsed={}ms", conversationId, elapsed);
return rewritten;
} catch (Exception e) {
span.tag("error", e.getMessage());
throw e;
} finally {
span.end();
}
}
private float[] traceEmbedding(String query, String conversationId) {
Span span = tracer.nextSpan()
.name("rag.embedding")
.tag("conversation.id", conversationId)
.tag("embedding.model", embeddingModel.getClass().getSimpleName())
.start();
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
EmbeddingResponse response = embeddingModel.embedForResponse(List.of(query));
float[] embedding = response.getResult().getOutput();
span.tag("embedding.dimensions", String.valueOf(embedding.length));
span.tag("embedding.usage.tokens",
String.valueOf(response.getMetadata().getUsage().getTotalTokens()));
return embedding;
} catch (Exception e) {
span.tag("error", e.getMessage());
throw e;
} finally {
span.end();
}
}
private List<Document> traceVectorSearch(float[] queryEmbedding, String conversationId) {
Span span = tracer.nextSpan()
.name("rag.vector_search")
.tag("conversation.id", conversationId)
.start();
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
// 向量检索
SearchRequest searchRequest = SearchRequest.defaults()
.withTopK(10)
.withSimilarityThreshold(0.7);
List<Document> docs = vectorStore.similaritySearch(searchRequest);
span.tag("search.result_count", String.valueOf(docs.size()));
// 记录最高和最低相似度,方便分析检索质量
if (!docs.isEmpty()) {
double maxScore = docs.stream()
.mapToDouble(d -> (Double) d.getMetadata().getOrDefault("distance", 0.0))
.max().orElse(0.0);
span.tag("search.max_similarity", String.format("%.4f", maxScore));
}
return docs;
} catch (Exception e) {
span.tag("error", e.getMessage());
throw e;
} finally {
span.end();
}
}
private List<Document> traceRerank(String query, List<Document> docs, String conversationId) {
if (docs.isEmpty()) return docs;
Span span = tracer.nextSpan()
.name("rag.rerank")
.tag("conversation.id", conversationId)
.tag("rerank.input_count", String.valueOf(docs.size()))
.start();
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
// 这里是之前发现的性能问题所在
// 重排序 API 调用
List<Document> reranked = rerankWithExternalApi(query, docs);
span.tag("rerank.output_count", String.valueOf(reranked.size()));
return reranked;
} catch (Exception e) {
span.tag("error", e.getMessage());
// 重排序失败时降级:直接用原始检索结果
log.warn("Rerank failed, using original order. conversationId={}", conversationId, e);
span.tag("rerank.fallback", "true");
return docs;
} finally {
span.end();
}
}
private String traceLlmGeneration(String originalQuery, List<Document> docs,
String conversationId) {
Span span = tracer.nextSpan()
.name("rag.llm_generation")
.tag("conversation.id", conversationId)
.start();
// 计算 context 长度
String context = docs.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n\n"));
span.tag("llm.context_length", String.valueOf(context.length()));
span.tag("llm.doc_count", String.valueOf(docs.size()));
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
String prompt = buildRagPrompt(originalQuery, context);
long start = System.currentTimeMillis();
String answer = chatClient.prompt()
.user(prompt)
.call()
.content();
long elapsed = System.currentTimeMillis() - start;
span.tag("llm.answer_length", String.valueOf(answer.length()));
span.tag("llm.latency_ms", String.valueOf(elapsed));
return answer;
} catch (Exception e) {
span.tag("error", e.getMessage());
throw e;
} finally {
span.end();
}
}
private String buildRagPrompt(String query, String context) {
return String.format("""
你是一个专业的问答助手。请根据以下参考资料回答问题。
如果参考资料中没有相关信息,请如实说明,不要编造。
参考资料:
%s
问题:%s
回答:
""", context, query);
}
}异步场景的追踪传播
RAG 里经常有并行检索(同时从多个知识库检索),这时候追踪上下文的传播要特别处理。
@Service
public class ParallelRagService {
@Autowired
private Tracer tracer;
/**
* 并行从多个知识库检索,追踪上下文需要手动传播
*/
public List<Document> parallelSearch(String query, List<VectorStore> stores) {
Span parentSpan = tracer.currentSpan();
List<CompletableFuture<List<Document>>> futures = stores.stream()
.map(store -> {
// 在异步任务里,必须手动恢复追踪上下文
// 否则子 Span 不会挂在父 Span 下面
return CompletableFuture.supplyAsync(() -> {
// 把父 Span 的上下文带进来
Span childSpan = tracer.nextSpan(parentSpan)
.name("rag.parallel_search")
.tag("store", store.getClass().getSimpleName())
.start();
try (Tracer.SpanInScope ws = tracer.withSpan(childSpan)) {
SearchRequest req = SearchRequest.defaults().withTopK(5);
return store.similaritySearch(req);
} catch (Exception e) {
childSpan.tag("error", e.getMessage());
throw e;
} finally {
childSpan.end();
}
}, embeddingTaskExecutor);
})
.collect(Collectors.toList());
// 等待所有检索完成,合并结果
return futures.stream()
.map(f -> {
try {
return f.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Parallel search failed", e);
return Collections.<Document>emptyList();
}
})
.flatMap(List::stream)
.collect(Collectors.toList());
}
}接入追踪后发现的几个隐藏瓶颈
瓶颈1:重排序 API 连接复用问题
如上面说的,文档重排序 API 每次新建连接,1.8 秒降到 0.2 秒。
瓶颈2:Embedding 的批量化机会
接入追踪后,我发现文档导入时,每个文档独立调用一次 Embedding API:
embed_doc_1: 280ms
embed_doc_2: 260ms
embed_doc_3: 290ms
...(20 个文档)总耗时 5.5 秒。改成批量调用:
// 改成批量 Embedding
List<String> contents = docs.stream().map(Document::getContent).collect(Collectors.toList());
EmbeddingResponse batchResponse = embeddingModel.embedForResponse(contents);20 个文档批量调用耗时 380ms,减少了 93%。这个优化在没有追踪之前根本想不到,因为单个文档 280ms 不觉得慢。
瓶颈3:Query 改写的 LLM 调用是否必要
追踪数据显示,Query 改写平均耗时 680ms,占整个 RAG 响应的约 10%。我分析了一批改写前后的 query,发现约 60% 的 query 根本不需要改写——用户的提问本身已经很明确了。
改成:先用简单规则判断是否需要改写(问题长度 < 20 字、包含关键词等),只有需要时才调用 LLM 改写。整体平均延迟降了约 350ms。
瓶颈4:向量检索结果过多
原来 topK=20,检索出 20 个文档,后面的处理(重排序、Prompt 构建)都要处理 20 个。改成 topK=10,效果几乎没变,速度快了一些,Prompt 也短了,LLM 生成也快了一点。
这些优化加起来,平均响应时间从 9 秒降到了 4.5 秒。用户反馈"感觉快多了"。
在 Jaeger 里看什么
接入后,去 Jaeger UI 重点看这几件事:
- 找最慢的请求:按 Duration 排序,分析 P99 的慢在哪
- 找错误率高的 Span:点击 "Errors Only",看哪个步骤最容易出错
- 对比相同类型请求的耗时分布:看某个步骤是稳定慢还是偶发慢
- 看关键路径:找那条决定整体耗时的最长路径
追踪不是一次性的工作,是持续运营。我现在每周会花一个小时看上周的追踪数据,找新出现的慢点。
AI 应用的性能特点是:随着使用量增长、知识库扩大,很多地方会逐渐变慢。如果没有追踪,等用户投诉了你才能发现;有了追踪,你能在问题严重前就察觉到。
