微服务日志聚合:ELK+TraceId关联与结构化日志规范
微服务日志聚合:ELK+TraceId关联与结构化日志规范
适读人群:有微服务实战经验的后端工程师 | 阅读时长:约24分钟 | Spring Boot 3.2 / Elasticsearch 8.x
开篇故事
我们早期的微服务日志管理是一片混乱:每个服务用自己的日志格式,有的用logback默认格式,有的用log4j2,有的用System.out.println(是的,真实存在)。出了问题要在十几台服务器上分别查日志,有时候同一条请求链路,要在五个服务的机器上各自翻才能拼出完整的调用过程。
有次排查一个跨五个服务的数据问题,我和同事两个人花了整整一个下午,每人负责几个服务,用Slack互相发日志截图来拼图。最后问题找到了,但这种排查方式实在太原始。
那次之后我们专项推进了ELK日志聚合,制定了统一的结构化日志规范,把TraceId强制写入每条日志,让Kibana能按TraceId把所有服务的日志串起来。现在出了问题,在Kibana里一搜TraceId,整条链路所有服务的日志一览无余,排查时间从小时级降到了分钟级。
一、核心问题分析
ELK日志聚合涉及三个核心问题:
结构化日志格式:日志必须是JSON格式,包含固定字段(时间戳、服务名、日志级别、TraceId、SpanId、日志内容等),这样Elasticsearch才能按字段搜索和过滤。非结构化的日志(普通文本)只能全文搜索,效率低且容易误匹配。
TraceId的注入:每条日志必须包含TraceId,这是把不同服务的日志关联起来的关键。Micrometer Tracing会自动把TraceId写入SLF4J的MDC(Mapped Diagnostic Context),只需要在日志格式里输出MDC字段即可。
日志采集与传输:在K8s环境下,推荐用DaemonSet方式部署Filebeat,采集所有Pod的标准输出日志,发送到Logstash/Kafka,再写入Elasticsearch。应用不需要直接连接Elasticsearch,只需要把日志输出到stdout即可。
二、原理深度解析
2.1 ELK日志采集链路
2.2 TraceId在日志中的注入
2.3 结构化日志字段规范
三、完整代码实现
3.1 Logback结构化JSON配置
<!-- logback-spring.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<springProperty scope="context" name="appName" source="spring.application.name"/>
<springProperty scope="context" name="env" source="spring.profiles.active" defaultValue="unknown"/>
<!-- 结构化JSON Appender -->
<appender name="JSON_STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<!-- 自定义字段 -->
<customFields>{"service":"${appName}","env":"${env}"}</customFields>
<!-- 包含MDC字段(TraceId等由Micrometer自动注入) -->
<includeMdcKeyName>traceId</includeMdcKeyName>
<includeMdcKeyName>spanId</includeMdcKeyName>
<includeMdcKeyName>userId</includeMdcKeyName>
<!-- 时间格式 -->
<timeZone>Asia/Shanghai</timeZone>
<!-- 排除不必要的字段 -->
<fieldNames>
<timestamp>@timestamp</timestamp>
<message>message</message>
<levelValue>[ignore]</levelValue>
</fieldNames>
<!-- 敏感字段脱敏 -->
<provider class="net.logstash.logback.composite.loggingevent.MdcJsonProvider"/>
</encoder>
</appender>
<!-- 开发环境:普通格式,方便阅读 -->
<springProfile name="local,dev">
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level [%X{traceId:-}] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
</springProfile>
<!-- 生产/测试环境:JSON格式 -->
<springProfile name="test,prod">
<root level="INFO">
<appender-ref ref="JSON_STDOUT"/>
</root>
</springProfile>
</configuration>需要引入logstash-logback-encoder依赖:
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.4</version>
</dependency>3.2 自定义MDC字段注入(用户信息)
Micrometer Tracing会自动注入traceId和spanId到MDC,但用户ID等业务字段需要手动注入:
package com.laozhang.log.filter;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import java.io.IOException;
import java.util.UUID;
/**
* MDC上下文注入过滤器
* 把请求相关的上下文信息注入MDC,让每条日志都携带这些信息
*/
@Component
public class MdcContextFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(
HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain
) throws ServletException, IOException {
try {
// 用户ID(从Gateway注入的Header获取)
String userId = request.getHeader("X-User-Id");
if (userId != null) {
MDC.put("userId", userId);
}
// 请求ID(如果没有就生成一个)
String requestId = request.getHeader("X-Request-Id");
if (requestId == null) {
requestId = UUID.randomUUID().toString().replace("-", "").substring(0, 16);
}
MDC.put("requestId", requestId);
// 接口路径(用于日志过滤)
MDC.put("path", request.getRequestURI());
// 把requestId写到响应Header,方便客户端排查
response.setHeader("X-Request-Id", requestId);
filterChain.doFilter(request, response);
} finally {
// 必须清理MDC,避免线程复用时数据污染
MDC.remove("userId");
MDC.remove("requestId");
MDC.remove("path");
}
}
}3.3 结构化日志工具类(业务日志规范)
package com.laozhang.log.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 结构化业务日志工具
* 确保业务日志格式统一,包含必要的上下文字段
*/
@Slf4j
public class BizLog {
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* 记录业务操作日志
* 格式:{"event":"order.create","userId":"xxx","orderId":"yyy","result":"success","duration":50}
*/
public static void event(String eventName, Map<String, Object> params) {
Map<String, Object> logEntry = new LinkedHashMap<>();
logEntry.put("event", eventName);
logEntry.put("userId", MDC.get("userId"));
logEntry.put("traceId", MDC.get("traceId"));
logEntry.putAll(params);
try {
log.info(MAPPER.writeValueAsString(logEntry));
} catch (Exception e) {
log.info("event={} params={}", eventName, params);
}
}
/**
* 记录接口调用日志
*/
public static void apiCall(String service, String method, long durationMs, boolean success, String errorMsg) {
Map<String, Object> params = new LinkedHashMap<>();
params.put("service", service);
params.put("method", method);
params.put("duration", durationMs);
params.put("success", success);
if (errorMsg != null) {
params.put("error", errorMsg);
}
if (success) {
log.info("api_call {}", params);
} else {
log.error("api_call_error {}", params);
}
}
/**
* 敏感字段脱敏
*/
public static String maskPhone(String phone) {
if (phone == null || phone.length() < 7) return phone;
return phone.substring(0, 3) + "****" + phone.substring(phone.length() - 4);
}
public static String maskCardNo(String cardNo) {
if (cardNo == null || cardNo.length() < 8) return "****";
return cardNo.substring(0, 4) + "****" + cardNo.substring(cardNo.length() - 4);
}
}3.4 Filebeat采集配置(K8s)
# filebeat-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
data:
filebeat.yml: |
filebeat.inputs:
- type: container
paths:
- /var/log/containers/*.log
# 过滤:只采集带特定annotation的Pod的日志
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
# 过滤不需要的日志(减少存储)
processors:
- drop_event:
when:
regexp:
kubernetes.namespace: "^kube-"
output.kafka:
hosts: ["kafka:9092"]
topic: "app-logs"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip3.5 Logstash处理配置
# logstash.conf
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["app-logs"]
group_id => "logstash-consumers"
codec => json
}
}
filter {
# 解析JSON日志
json {
source => "message"
target => "log"
}
# 提取关键字段到顶层
mutate {
rename => {
"[log][traceId]" => "traceId"
"[log][service]" => "service"
"[log][level]" => "level"
"[log][userId]" => "userId"
}
}
# 时间字段处理
date {
match => ["[log][@timestamp]", "ISO8601"]
target => "@timestamp"
}
# 删除不需要的原始字段(节省存储)
mutate {
remove_field => ["message", "agent", "ecs", "input"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
# 按服务名+日期分索引,便于管理和过期清理
index => "app-logs-%{[service]}-%{+YYYY.MM.dd}"
document_type => "_doc"
}
}四、生产配置与调优
4.1 Elasticsearch索引生命周期管理
PUT _ilm/policy/app-logs-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"forcemerge": {"max_num_segments": 1},
"allocate": {"number_of_replicas": 0}
}
},
"delete": {
"min_age": "30d",
"actions": {"delete": {}}
}
}
}
}4.2 Kibana查询技巧
# 按TraceId查询整条链路日志
traceId: "abc123def456"
# 查询某个服务的错误日志
service: "order-service" AND level: "ERROR"
# 查询某个用户最近1小时的操作
userId: "12345" AND @timestamp: [now-1h TO now]
# 查询慢接口(duration > 1000ms)
service: "order-service" AND duration: >1000五、踩坑实录
坑一:日志量爆炸,Elasticsearch磁盘撑不住。
某服务在循环里打了DEBUG日志,生产环境日志级别没有设对,DEBUG日志全部输出,一天产生了几百GB日志,把Elasticsearch磁盘打满了。
解决方案:生产环境日志级别严格控制为INFO,DEBUG级别只在本地开发使用;通过ILM自动删除老数据;对于高频接口,用采样率控制日志输出频率。
坑二:MDC在线程切换时丢失。
@Async方法里MDC是空的,导致异步日志里没有userId和requestId,排查时无法关联到对应的请求。
解决方案:自定义ThreadPoolTaskExecutor,包装任务时传递MDC上下文:
executor.setTaskDecorator(runnable -> {
Map<String, String> mdcCopy = MDC.getCopyOfContextMap();
return () -> {
if (mdcCopy != null) MDC.setContextMap(mdcCopy);
try {
runnable.run();
} finally {
MDC.clear();
}
};
});坑三:日志里输出了用户密码和Token,造成安全风险。
有个接口的请求参数日志里包含了password字段,直接把明文密码打到了日志里,被日志系统收集并存储到Elasticsearch,有安全风险。
必须建立日志安全规范:密码、Token、银行卡号等敏感字段一律脱敏后输出,使用BizLog工具类的脱敏方法。
坑四:Logback配置的JSON格式和Elasticsearch的mapping不兼容。
某个字段在不同日志里类型不一样(有时候是数字,有时候是字符串),Elasticsearch动态mapping会冲突,导致部分日志写入失败。
需要提前定义Elasticsearch的index template,明确所有字段的类型,避免动态mapping冲突。
六、总结
ELK日志聚合的核心是三个统一:统一JSON格式(logstash-logback-encoder),统一TraceId注入(Micrometer自动注入MDC),统一字段规范(避免高基数Tag和敏感信息)。K8s环境下推荐DaemonSet方式部署Filebeat,应用只需要把JSON日志输出到stdout,简单且高效。
