Spring Cloud Sleuth分布式追踪:TraceId传递与自定义Span的实现
Spring Cloud Sleuth分布式追踪:TraceId传递与自定义Span的实现
适读人群:有微服务实战经验的后端工程师 | 阅读时长:约24分钟 | Spring Boot 3.2 / Micrometer Tracing
开篇故事
微服务架构下排查问题是一件很痛苦的事。早期我们没有链路追踪,出了问题只能翻日志,一个请求可能经过五六个服务,每个服务的日志里找半天,还要靠时间戳来串联。效率极低,而且容易遗漏。
有次用户反馈下单失败,我们排查了两个多小时,翻了五个服务的日志,最终发现是优惠券服务里一段数据库查询超时。这段查询很短,只要50毫秒,但是被一个N+1查询问题放大了,循环了200次,加起来10秒。
如果有链路追踪,打开Zipkin或Jaeger一眼就能看到哪个Span耗时最长,问题五分钟就定位了。那次之后我们把Spring Cloud Sleuth(现在是Micrometer Tracing)接入了所有服务,今天把TraceId的传递机制和自定义Span的实现完整讲一遍。
一、核心问题分析
Spring Boot 3.x之后,Spring Cloud Sleuth被迁移到了Micrometer Tracing(1.x)项目中,API有所变化,但核心概念不变:
Trace(追踪):一次完整的用户请求,从入口到返回,由唯一的TraceId标识。
Span(跨度):链路中的一个独立操作单元,比如一次HTTP调用、一次数据库查询。同一个Trace下有多个Span,每个Span有自己的SpanId,并记录开始时间和结束时间。
TraceId传递:一个请求从服务A调用服务B时,TraceId需要通过HTTP Header(traceparent或X-B3-TraceId)传递下去,服务B创建子Span时使用同一个TraceId。
理解了这三个概念,链路追踪的工作方式就清楚了。接下来要解决的是几个实际问题:TraceId在各种传输方式(Feign、MQ、异步线程)中是否能正确传递,以及如何给关键业务操作加上自定义Span来获得更细粒度的追踪信息。
二、原理深度解析
2.1 TraceId的传递链路
2.2 Span的层次结构
2.3 TraceId在异步场景下的传递
三、完整代码实现
3.1 项目依赖(Spring Boot 3.x)
<dependencies>
<!-- Micrometer Tracing核心 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<!-- Zipkin上报 -->
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
</dependency>
<!-- Spring Boot Actuator(Tracing依赖) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Feign(自动传播TraceId) -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
</dependencies>3.2 Tracing基础配置
management:
tracing:
sampling:
probability: 1.0 # 采样率:1.0表示100%采样(生产环境建议0.1)
zipkin:
tracing:
endpoint: http://zipkin-server:9411/api/v2/spans
spring:
application:
name: order-service
logging:
pattern:
# 在日志中输出traceId和spanId,方便日志与链路追踪关联
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"3.3 自定义Span实现(业务操作追踪)
package com.laozhang.tracing.service;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.annotation.NewSpan;
import io.micrometer.tracing.annotation.SpanTag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class OrderService {
private final Tracer tracer;
private final InventoryServiceClient inventoryClient;
public OrderService(Tracer tracer, InventoryServiceClient inventoryClient) {
this.tracer = tracer;
this.inventoryClient = inventoryClient;
}
/**
* 方式一:使用@NewSpan注解自动创建Span
* SpanTag会把参数值作为Span的标签,方便在Zipkin中过滤
*/
@NewSpan("order.create")
public OrderResult createOrder(
@SpanTag("orderId") String orderId,
@SpanTag("userId") String userId,
CreateOrderRequest request
) {
log.info("开始创建订单,orderId={}", orderId);
// 业务逻辑...
return processOrder(request);
}
/**
* 方式二:手动创建和管理Span(更灵活,可以添加事件和错误信息)
*/
public OrderResult createOrderWithManualSpan(CreateOrderRequest request) {
// 创建子Span
Span span = tracer.nextSpan().name("order.create.manual").start();
// 在Span上添加标签(用于在Zipkin中过滤和搜索)
span.tag("orderId", request.getOrderId());
span.tag("userId", request.getUserId());
span.tag("orderAmount", String.valueOf(request.getAmount()));
try (Tracer.SpanInScope spanInScope = tracer.withSpan(span)) {
// 在这个作用域内,TraceContext包含当前Span
log.info("创建订单,traceId已自动注入到日志");
// 检查库存(这次Feign调用会自动带上当前TraceId)
boolean hasStock = inventoryClient.checkStock(request.getProductId(), request.getQuantity());
if (!hasStock) {
span.tag("result", "NO_STOCK");
span.event("库存不足,订单创建失败");
return OrderResult.fail("库存不足");
}
// 创建订单...
OrderResult result = doCreateOrder(request);
span.tag("result", "SUCCESS");
return result;
} catch (Exception e) {
// 在Span上记录错误信息
span.error(e);
span.tag("result", "ERROR");
throw e;
} finally {
// 必须手动结束Span
span.end();
}
}
private OrderResult processOrder(CreateOrderRequest request) {
return new OrderResult();
}
private OrderResult doCreateOrder(CreateOrderRequest request) {
return new OrderResult();
}
}3.4 TraceId在异步线程池中的正确传递
这是链路追踪里最容易丢失TraceId的场景:
package com.laozhang.tracing.config;
import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextSnapshot;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
@Configuration
public class TracingAwareThreadPoolConfig {
/**
* 使用ContextExecutorService包装线程池,实现TraceId自动传播
* Micrometer Context Propagation会自动把当前线程的TraceContext传递给子线程
*/
@Bean("tracingAwareExecutor")
public Executor tracingAwareExecutor() {
ExecutorService rawExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2
);
// 使用ContextExecutorService包装,实现上下文自动传播
return ContextExecutorService.wrap(rawExecutor, ContextSnapshot::captureAll);
}
}package com.laozhang.tracing.service;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@Slf4j
@Service
public class AsyncOrderService {
private final Tracer tracer;
private final Executor tracingAwareExecutor;
public AsyncOrderService(
Tracer tracer,
@Qualifier("tracingAwareExecutor") Executor tracingAwareExecutor
) {
this.tracer = tracer;
this.tracingAwareExecutor = tracingAwareExecutor;
}
/**
* 使用tracingAwareExecutor提交的异步任务,TraceId会自动传播
*/
public CompletableFuture<NotifyResult> notifyUserAsync(String userId, String message) {
return CompletableFuture.supplyAsync(() -> {
// 这里打印的日志会包含正确的TraceId
log.info("异步发送通知,userId={}", userId);
return doNotify(userId, message);
}, tracingAwareExecutor);
}
/**
* 使用@Async注解时,需要配置支持TraceId传播的线程池
*/
@Async("tracingAwareExecutor")
public CompletableFuture<Void> asyncLogOperation(String orderId, String operation) {
log.info("记录操作日志,orderId={},operation={}", orderId, operation);
return CompletableFuture.completedFuture(null);
}
private NotifyResult doNotify(String userId, String message) {
return new NotifyResult();
}
}3.5 TraceId在消息队列中的传递
package com.laozhang.tracing.mq;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* 带TraceId传播的Kafka消息发送者
* 把当前TraceContext注入到Kafka消息的Header中
*/
@Slf4j
@Component
public class TracingKafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final Tracer tracer;
private final Propagator propagator;
public TracingKafkaProducer(
KafkaTemplate<String, String> kafkaTemplate,
Tracer tracer,
Propagator propagator
) {
this.kafkaTemplate = kafkaTemplate;
this.tracer = tracer;
this.propagator = propagator;
}
public void sendWithTracing(String topic, String key, String payload) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, payload);
// 把当前TraceContext注入到Kafka消息Header
Span currentSpan = tracer.currentSpan();
if (currentSpan != null) {
Map<String, String> traceHeaders = new HashMap<>();
propagator.inject(currentSpan.context(), traceHeaders, Map::put);
// 将TraceContext作为Kafka消息Header发送
traceHeaders.forEach((headerKey, value) ->
record.headers().add(headerKey, value.getBytes(StandardCharsets.UTF_8))
);
}
kafkaTemplate.send(record);
log.info("消息已发送,topic={},traceId已注入Header", topic);
}
}package com.laozhang.tracing.mq;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* 带TraceId提取的Kafka消息消费者
* 从Kafka消息Header中提取TraceContext,实现链路跨MQ传播
*/
@Slf4j
@Component
public class TracingKafkaConsumer {
private final Tracer tracer;
private final Propagator propagator;
public TracingKafkaConsumer(Tracer tracer, Propagator propagator) {
this.tracer = tracer;
this.propagator = propagator;
}
@KafkaListener(topics = "order-events")
public void consume(ConsumerRecord<String, String> record) {
// 从Kafka Header中提取TraceContext
Map<String, String> traceHeaders = new HashMap<>();
record.headers().forEach(header ->
traceHeaders.put(header.key(), new String(header.value(), StandardCharsets.UTF_8))
);
// 提取TraceContext并创建子Span
Propagator.Getter<Map<String, String>> getter = Map::get;
Span span = propagator.extract(traceHeaders, getter)
.name("kafka.consume.order-events")
.start();
try (Tracer.SpanInScope spanInScope = tracer.withSpan(span)) {
// 这里的日志会包含从生产者传来的TraceId
log.info("消费消息,key={},TraceId已从Header恢复", record.key());
processMessage(record.value());
} finally {
span.end();
}
}
private void processMessage(String message) {
log.info("处理消息内容:{}", message);
}
}3.6 自定义TraceId过滤器(在Gateway层注入)
package com.laozhang.tracing.filter;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import java.io.IOException;
/**
* TraceId响应头过滤器
* 把当前请求的TraceId添加到响应Header中
* 方便前端在问题排查时提供TraceId
*/
@Slf4j
@Component
public class TraceIdResponseFilter extends OncePerRequestFilter {
private static final String TRACE_ID_HEADER = "X-Trace-Id";
private final Tracer tracer;
public TraceIdResponseFilter(Tracer tracer) {
this.tracer = tracer;
}
@Override
protected void doFilterInternal(
HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain
) throws ServletException, IOException {
Span currentSpan = tracer.currentSpan();
if (currentSpan != null) {
String traceId = currentSpan.context().traceId();
response.setHeader(TRACE_ID_HEADER, traceId);
}
filterChain.doFilter(request, response);
}
}四、生产配置与调优
4.1 采样率配置
management:
tracing:
sampling:
# 生产环境不要设100%采样,会产生大量数据
# 推荐1%-10%
probability: 0.1
# 如果需要对特定接口100%采样(比如支付接口)
# 可以实现自定义Sampler4.2 自定义采样器
@Bean
public Sampler customSampler() {
return (traceContext) -> {
// 支付相关接口100%采样
String spanName = traceContext.toString();
if (spanName.contains("payment")) {
return SamplingFlags.SAMPLED;
}
// 其他接口10%采样
return Math.random() < 0.1 ? SamplingFlags.SAMPLED : SamplingFlags.NOT_SAMPLED;
};
}五、踩坑实录
坑一:升级Spring Boot 3.x后TraceId消失了,原来Sleuth的@NewSpan不工作了。
Spring Boot 3.x移除了Spring Cloud Sleuth,换成了Micrometer Tracing,包名和API都变了。原来brave.Span、brave.Tracer,现在是io.micrometer.tracing.Span、io.micrometer.tracing.Tracer。@NewSpan注解从spring-cloud-sleuth包移到了io.micrometer.tracing.annotation包。升级时要检查所有import语句。
坑二:异步线程里TraceId消失,日志里看不到TraceId。
默认的线程池不会传播TraceContext,异步任务里的日志没有TraceId,和主链路断开了。解决方案就是用ContextExecutorService.wrap包装线程池,或者用支持上下文传播的@Async线程池。
坑三:Zipkin中看到的链路是断开的,生产者和消费者不在同一条链路里。
Kafka消息消费者没有从消息Header里提取TraceContext,而是新建了一个全新的Span。导致生产者和消费者在Zipkin里是两条独立的链路,无法关联。必须在消费者里显式调用propagator.extract从Header中恢复TraceContext。
坑四:自定义Span忘记调用span.end(),导致内存泄漏和数据不上报。
手动创建的Span如果不调用end(),Span数据不会被上报到Zipkin,也不会释放相关资源。必须用try-finally确保span.end()被调用。
六、总结
分布式链路追踪的核心是TraceId在各种传输边界(HTTP、MQ、异步线程)上的正确传递。Spring Boot 3.x用Micrometer Tracing替代了Sleuth,API有所变化,但概念一致。关键点:异步线程用ContextExecutorService传播TraceContext,MQ用Propagator注入和提取Header,自定义Span用try-finally确保end()被调用。
