设计一个日志系统:结构化日志、ELK采集、链路关联
设计一个日志系统:结构化日志、ELK采集、链路关联
适读人群:Java中高级工程师、需要做可观测性建设的技术人员 | 阅读时长:约18分钟 | 难度:★★★☆☆
开篇故事
出了线上问题,最怕的就是日志根本用不起来。我见过最惨的情况:系统有几十个服务,日志格式各不相同,有些用logback输出到文件,有些用System.out.println(真的),日志里没有traceId,不知道一个用户请求经过了哪些服务,出了问题只能靠猜。
有次一个支付失败问题,用户反馈在下午3点左右,我要从5个服务各几十GB的日志里人工搜索"3点左右"的相关日志,因为没有traceId关联,搜索到的日志不知道是不是同一个请求的。最终花了4个小时才定位到一个空指针异常。
那次之后我主导做了日志系统的规范化改造:统一结构化日志格式、引入traceId链路关联、部署ELK采集和搜索。后来同样类型的问题,5分钟内定位,30分钟修复。这篇文章把这套改造的核心方案写出来。
一、需求分析与规模估算
日志系统的核心能力
- 集中采集: 所有服务的日志统一汇聚到一个平台
- 结构化查询: 根据traceId、userId、错误类型快速检索
- 链路关联: 一个请求跨多个服务的完整调用链可视化
- 实时告警: ERROR级别日志触发告警
规模估算
以一个20个微服务的中台系统为例:
日志量:
- 每个服务每秒产生约100条日志
- 20个服务 × 100条/秒 = 2000条/秒(平时)
- 峰值:10倍 = 20000条/秒
- 每条日志约500字节
- 峰值带宽:20000 × 500字节 = 10MB/s
存储估算:
- 保留最近7天日志:10MB/s × 86400 × 7 ≈ 6TB
- 索引开销(Elasticsearch索引约为原始数据的2倍):12TB
- 7天总存储:约18TB
Elasticsearch集群规模:
- 18TB数据,每个ES节点存储约2TB
- 需要9个数据节点(3主+6数据)
二、系统架构设计
三、结构化日志规范
3.1 统一日志格式
所有服务的日志必须输出为JSON格式,字段统一命名:
{
"timestamp": "2026-04-21T14:23:01.123+08:00",
"level": "ERROR",
"service": "order-service",
"version": "1.2.3",
"host": "10.0.1.23",
"traceId": "abc123def456",
"spanId": "789xyz",
"userId": "10001",
"requestId": "req-20260421-001",
"thread": "http-nio-8080-exec-1",
"logger": "com.lz.order.service.OrderService",
"message": "订单创建失败",
"exception": "java.lang.NullPointerException: ...",
"duration": 234,
"extra": {
"orderId": "ORD123456",
"amount": 99.9
}
}3.2 Logback配置(JSON输出)
<!-- logback-spring.xml -->
<configuration>
<!-- 使用logstash-logback-encoder输出JSON格式 -->
<appender name="STDOUT_JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeMdcKeyName>traceId</includeMdcKeyName>
<includeMdcKeyName>spanId</includeMdcKeyName>
<includeMdcKeyName>userId</includeMdcKeyName>
<customFields>{"service":"${spring.application.name}","version":"${app.version}"}</customFields>
</encoder>
</appender>
<appender name="FILE_JSON" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/logs/${spring.application.name}/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/logs/${spring.application.name}/app.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxHistory>7</maxHistory>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeMdcKeyName>traceId</includeMdcKeyName>
<includeMdcKeyName>spanId</includeMdcKeyName>
<includeMdcKeyName>userId</includeMdcKeyName>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT_JSON"/>
<appender-ref ref="FILE_JSON"/>
</root>
</configuration>四、关键代码实现
4.1 链路追踪(TraceId传递)
/**
* TraceId拦截器:在请求入口注入TraceId到MDC
* MDC(Mapped Diagnostic Context)是Logback提供的线程本地变量,
* 日志框架会自动把MDC中的值包含在每条日志里
*/
@Component
@Slf4j
public class TraceIdInterceptor implements HandlerInterceptor {
private static final String TRACE_ID_HEADER = "X-Trace-Id";
private static final String TRACE_ID_MDC_KEY = "traceId";
private static final String SPAN_ID_MDC_KEY = "spanId";
@Override
public boolean preHandle(
HttpServletRequest request,
HttpServletResponse response,
Object handler) {
// 优先从Header中取(微服务间调用时上游传递的)
String traceId = request.getHeader(TRACE_ID_HEADER);
if (traceId == null || traceId.isEmpty()) {
// 没有则生成新的(这是链路的起点)
traceId = generateTraceId();
}
String spanId = generateSpanId();
// 放入MDC,后续该线程的所有日志都会带上这个traceId
MDC.put(TRACE_ID_MDC_KEY, traceId);
MDC.put(SPAN_ID_MDC_KEY, spanId);
// 在响应头里也返回traceId,方便前端定位问题
response.setHeader(TRACE_ID_HEADER, traceId);
return true;
}
@Override
public void afterCompletion(
HttpServletRequest request,
HttpServletResponse response,
Object handler,
Exception ex) {
// 请求结束后清理MDC,防止线程池复用时traceId污染
MDC.clear();
}
private String generateTraceId() {
return UUID.randomUUID().toString().replace("-", "");
}
private String generateSpanId() {
return Long.toHexString(ThreadLocalRandom.current().nextLong());
}
}4.2 跨服务传递TraceId(Feign拦截器)
/**
* Feign请求拦截器:调用其他服务时,把当前线程的traceId传递过去
*/
@Component
public class FeignTraceInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
String traceId = MDC.get("traceId");
if (traceId != null) {
template.header("X-Trace-Id", traceId);
}
String spanId = MDC.get("spanId");
if (spanId != null) {
template.header("X-Parent-Span-Id", spanId);
}
}
}4.3 异步线程的TraceId传递
/**
* 异步任务执行时,MDC不会自动传递(因为换了线程)
* 需要在任务提交时捕获MDC快照,在任务执行时恢复
*/
public class MdcAwareRunnable implements Runnable {
private final Runnable delegate;
private final Map<String, String> mdcContext;
public MdcAwareRunnable(Runnable delegate) {
this.delegate = delegate;
// 捕获当前线程的MDC快照
this.mdcContext = MDC.getCopyOfContextMap();
}
@Override
public void run() {
// 在新线程中恢复MDC
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
try {
delegate.run();
} finally {
MDC.clear();
}
}
}
/**
* MDC感知的线程池
*/
@Configuration
public class ThreadPoolConfig {
@Bean("asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200);
executor.setTaskDecorator(runnable ->
new MdcAwareRunnable(runnable)); // 关键:用MDC装饰器包装任务
executor.initialize();
return executor;
}
}4.4 统一异常日志记录
@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {
/**
* 业务异常:INFO级别(不是系统错误)
*/
@ExceptionHandler(BusinessException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
public ApiResponse<Void> handleBusinessException(
BusinessException e, HttpServletRequest request) {
log.info("业务异常, path={}, code={}, msg={}",
request.getRequestURI(), e.getCode(), e.getMessage());
return ApiResponse.fail(e.getCode(), e.getMessage());
}
/**
* 系统异常:ERROR级别(需要告警)
*/
@ExceptionHandler(Exception.class)
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public ApiResponse<Void> handleException(
Exception e, HttpServletRequest request) {
// 关键:把关键信息都放到日志里,方便ELK检索
log.error("系统异常, path={}, method={}, userId={}",
request.getRequestURI(),
request.getMethod(),
MDC.get("userId"),
e); // 异常栈自动附加到日志
return ApiResponse.fail(500, "系统异常,请联系管理员");
}
}4.5 Filebeat采集配置
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /logs/*/app.log
- /logs/*/app.*.log
# 多行日志合并(异常栈跨多行,合并成一条)
multiline.pattern: '^\{'
multiline.negate: true
multiline.match: after
# 附加元信息
fields:
env: production
fields_under_root: true
output.kafka:
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic: "app-logs"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
# 性能配置
queue.mem:
events: 4096
flush.min_events: 512
flush.timeout: 5s五、扩展性设计
ES索引分片策略
按日期分索引(每天一个索引),保留7天数据时只需要保留7个索引。删除历史数据只需要删除对应的索引,O(1)操作。
// ILM(Index Lifecycle Management)策略
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d"
}
}
},
"delete": {
"min_age": "7d",
"actions": { "delete": {} }
}
}
}
}六、踩坑实录
坑1:Logback同步写日志阻塞业务线程
默认的Logback FileAppender是同步的,高QPS时日志写磁盘阻塞业务线程,导致P99延迟从20ms涨到200ms。
解决方案:使用AsyncAppender包装FileAppender,让日志写入异步化。设置discardingThreshold=0(不丢弃日志)和queueSize=2048(队列大小)。
坑2:MDC在Kafka消费者线程里丢失
Kafka消费者是独立的线程池,不经过HTTP请求拦截器,MDC里没有traceId。消费者处理消息时打的日志,在ELK里找不到关联的请求日志。
解决方案:消息里携带traceId字段,消费者线程在处理消息前,把消息中的traceId写入MDC,处理完后清除。
坑3:日志量暴增导致Kafka消费积压
某次业务故障,大量ERROR日志涌入,Kafka的app-logs topic积压了3亿条消息,Logstash追不上,整个日志系统延迟达到30分钟,告警也跟着延迟了,问题发生半小时后才收到通知。
解决方案:对ERROR级别日志走独立的Kafka topic(优先级高,单独的Logstash消费),INFO日志用另一个低优先级topic。
七、总结
日志系统的建设要点:
- 结构化是前提: JSON格式 + 统一字段命名,才能在ELK中有效检索
- TraceId是灵魂: 没有链路关联的日志系统只是日志收集,不是可观测性系统
- 异步化是性能保障: 同步写日志对高QPS系统有显著影响
- 分层存储是成本控制: 热数据(3天)在ES,冷数据(7-30天)在对象存储
ELK + TraceId的组合,把平均问题定位时间(MTTD)从小时级压缩到分钟级,这是日志系统最核心的价值。
