消息队列监控体系:Lag积压、消费速率、Broker内存的完整指标
消息队列监控体系:Lag积压、消费速率、Broker内存的完整指标
适读人群:负责Kafka运维或需要建立MQ监控体系的工程师 | 阅读时长:约15分钟
开篇故事
"为什么订单系统昨晚3点开始处理变慢?"
这个问题我被问到过好几次,每次都是故障发生后才来追查。通常的结论是:Kafka某个消费者组Lag开始增长,但没有监控,没有告警,等到业务侧感知到延迟时,已经积压了几十万条消息。
没有监控的消息队列就是一个黑盒。问题发生了不知道,发生后找不到根因。
我花了两个月时间,给公司的Kafka集群建立了完整的监控告警体系,从那以后,90%的问题在业务感知之前就已经被我们发现并处理了。今天把这套监控体系分享出来。
一、监控体系全景图
二、核心监控指标清单
2.1 消费端:最重要的指标
Lag(消费积压)是最核心的告警指标。
| 指标名 | 含义 | 告警阈值建议 |
|---|---|---|
kafka_consumer_group_lag | 消费者组总积压 | > 10万 P2告警;> 100万 P1告警 |
kafka_consumer_group_lag_sum | 各Partition积压之和 | 同上 |
kafka_consumer_group_members | 消费者组成员数 | 突降触发告警 |
消费速率是判断积压是否在恢复的关键。
| 指标名 | 含义 | 告警条件 |
|---|---|---|
kafka_consumer_records_consumed_rate | 消费速率(条/秒) | 骤降50%触发告警 |
kafka_consumer_fetch_latency_avg | 平均fetch延迟 | > 500ms告警 |
kafka_consumer_last_heartbeat | 最后心跳时间 | 超过session.timeout告警 |
2.2 Broker端:稳定性指标
| 指标名 | 含义 | 告警阈值 |
|---|---|---|
kafka_server_BrokerTopicMetrics_MessagesInPerSec | 消息写入速率 | 突增5倍告警 |
kafka_server_ReplicaManager_UnderReplicatedPartitions | 未完全复制的Partition数 | > 0 告警 |
kafka_controller_KafkaController_ActiveControllerCount | 活跃Controller数 | != 1 立即告警 |
kafka_log_Log_Size | 日志文件大小 | 磁盘使用率 > 70% 告警 |
kafka_network_RequestMetrics_RequestsPerSec | 请求速率 | 接近容量上限告警 |
kafka_server_KafkaServer_BrokerState | Broker状态 | != 3(RUNNING) 告警 |
2.3 Producer端指标
| 指标名 | 含义 | 告警阈值 |
|---|---|---|
kafka_producer_record_error_rate | 发送错误率 | > 0.1% 告警 |
kafka_producer_record_send_rate | 发送速率 | 监控趋势 |
kafka_producer_request_latency_avg | 请求平均延迟 | > 100ms 告警 |
kafka_producer_buffer_available_bytes | 可用缓冲区 | < 10% 告警 |
三、Java集成Micrometer采集指标
3.1 Producer指标自动采集
/**
* Kafka监控配置
* 使用Micrometer自动采集Producer/Consumer指标到Prometheus
*/
@Configuration
public class KafkaMetricsConfig {
/**
* 配置KafkaTemplate,自动注册Producer指标
*/
@Bean
public KafkaTemplate<String, String> monitoredKafkaTemplate(
MeterRegistry meterRegistry) {
Map<String, Object> configs = buildProducerConfigs();
DefaultKafkaProducerFactory<String, String> factory =
new DefaultKafkaProducerFactory<>(configs);
KafkaTemplate<String, String> template = new KafkaTemplate<>(factory);
// 开启Micrometer观测(Spring Kafka 3.x)
template.setObservationEnabled(true);
// 注册Producer指标
// 自动采集:record-send-rate, record-error-rate, request-latency-avg等
KafkaClientMetrics metrics = new KafkaClientMetrics(factory.createProducer());
metrics.bindTo(meterRegistry);
return template;
}
/**
* 配置消费者工厂,自动注册Consumer指标
*/
@Bean
public ConsumerFactory<String, String> monitoredConsumerFactory(
MeterRegistry meterRegistry) {
Map<String, Object> props = buildConsumerConfigs();
DefaultKafkaConsumerFactory<String, String> factory =
new DefaultKafkaConsumerFactory<>(props);
// 注册Consumer指标监听器
factory.addListener(new MicrometerConsumerListener<>(meterRegistry));
return factory;
}
private Map<String, Object> buildProducerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return configs;
}
private Map<String, Object> buildConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}3.2 自定义Lag监控(采集详细积压数据)
/**
* 自定义Kafka Lag监控
* 定期采集各消费者组各Partition的积压量
* 推送到Prometheus自定义Gauge
*/
@Component
@Slf4j
public class KafkaLagMonitor {
private final AdminClient adminClient;
private final MeterRegistry meterRegistry;
// 缓存每个消费者组+Topic+Partition的Gauge
private final Map<String, AtomicLong> lagGauges = new ConcurrentHashMap<>();
// 监控的消费者组列表
private final List<String> monitoredGroups = Arrays.asList(
"order-consumer-group",
"points-consumer-group",
"payment-consumer-group",
"log-consumer-group"
);
public KafkaLagMonitor(@Value("${kafka.bootstrap-servers}") String bootstrapServers,
MeterRegistry meterRegistry) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.adminClient = AdminClient.create(props);
this.meterRegistry = meterRegistry;
}
/**
* 每30秒采集一次所有消费者组的Lag
*/
@Scheduled(fixedDelay = 30000)
public void collectLagMetrics() {
for (String groupId : monitoredGroups) {
try {
collectGroupLag(groupId);
} catch (Exception e) {
log.warn("采集消费者组Lag失败: groupId={}", groupId, e);
}
}
}
private void collectGroupLag(String groupId)
throws ExecutionException, InterruptedException {
// 获取消费者组的offset
ListConsumerGroupOffsetsResult offsetsResult =
adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> groupOffsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
if (groupOffsets.isEmpty()) {
return;
}
// 获取对应Partition的最新offset(LogEndOffset)
Map<TopicPartition, Long> endOffsets = getEndOffsets(groupOffsets.keySet());
long totalLag = 0;
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : groupOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
long committedOffset = entry.getValue().offset();
long endOffset = endOffsets.getOrDefault(tp, committedOffset);
long lag = Math.max(0, endOffset - committedOffset);
totalLag += lag;
// 注册Partition级别Gauge
String gaugeKey = groupId + ":" + tp.topic() + ":" + tp.partition();
lagGauges.computeIfAbsent(gaugeKey, k -> {
AtomicLong gauge = new AtomicLong(0);
Gauge.builder("kafka.consumer.group.partition.lag", gauge, AtomicLong::get)
.tag("group", groupId)
.tag("topic", tp.topic())
.tag("partition", String.valueOf(tp.partition()))
.description("Kafka消费者组Partition积压量")
.register(meterRegistry);
return gauge;
}).set(lag);
}
// 注册Group总Lag Gauge
String totalKey = groupId + ":total";
lagGauges.computeIfAbsent(totalKey, k -> {
AtomicLong gauge = new AtomicLong(0);
Gauge.builder("kafka.consumer.group.lag.total", gauge, AtomicLong::get)
.tag("group", groupId)
.description("Kafka消费者组总积压量")
.register(meterRegistry);
return gauge;
}).set(totalLag);
log.debug("Lag采集完成: groupId={}, totalLag={}", groupId, totalLag);
}
private Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> partitions)
throws ExecutionException, InterruptedException {
ListOffsetsResult offsetsResult = adminClient.listOffsets(
partitions.stream().collect(Collectors.toMap(
tp -> tp,
tp -> OffsetSpec.latest()
))
);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
offsetsResult.all().get().forEach((tp, offsetInfo) ->
endOffsets.put(tp, offsetInfo.offset())
);
return endOffsets;
}
}3.3 Prometheus告警规则配置
# prometheus/alerts/kafka.yml
groups:
- name: kafka_alerts
rules:
# 消费积压告警
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_group_lag_total > 100000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka消费积压过高"
description: "消费者组 {{ $labels.group }} 积压 {{ $value }} 条消息超过5分钟"
- alert: KafkaConsumerLagCritical
expr: kafka_consumer_group_lag_total > 1000000
for: 2m
labels:
severity: critical
annotations:
summary: "Kafka消费积压严重"
description: "消费者组 {{ $labels.group }} 积压超过100万条,需立即处理"
# 积压增长速率(每分钟增加超过1万条)
- alert: KafkaConsumerLagGrowing
expr: rate(kafka_consumer_group_lag_total[5m]) > 10000
for: 3m
labels:
severity: warning
annotations:
summary: "Kafka消费积压持续增长"
description: "{{ $labels.group }} 积压每分钟增加 {{ $value }} 条"
# Broker未复制Partition
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka存在未完全复制的Partition"
description: "Broker {{ $labels.instance }} 有 {{ $value }} 个Partition未完全复制"
# Controller数量异常
- alert: KafkaNoActiveController
expr: sum(kafka_controller_KafkaController_ActiveControllerCount) != 1
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka Controller状态异常"
description: "活跃Controller数量为 {{ $value }},应为1"
# 磁盘告警
- alert: KafkaDiskUsageHigh
expr: (kafka_log_Log_Size / node_filesystem_size_bytes) > 0.7
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka磁盘使用率过高"
description: "Broker {{ $labels.instance }} 磁盘使用率超过70%"
# Producer错误率
- alert: KafkaProducerErrorRateHigh
expr: kafka_producer_record_error_rate > 0.01
for: 2m
labels:
severity: warning
annotations:
summary: "Kafka Producer错误率过高"
description: "Producer错误率达到 {{ $value }}"四、踩坑实录
坑1:只监控总Lag,无法定位问题Partition
设置了总Lag告警(> 10万),收到告警后,需要人工排查是哪个Partition积压。生产环境有几十个消费者组、每组几十个Partition,人工排查需要30分钟以上。
解决方案:Lag监控细化到Partition级别,告警信息直接包含具体的Group、Topic、Partition,缩短定位时间。
坑2:Lag不增长但消费延迟很高
有一次,Lag监控一直是0(积压为零),但用户反映订单处理很慢。排查发现:消费者在消费,Producer在生产,两者速率相等,Lag没有增长。但消费者处理每条消息需要8秒(下游数据库慢查询),而不是正常的100ms。
解决方案:除了Lag,还要监控消费端处理耗时(kafka_consumer_fetch_latency_avg)和业务处理耗时(自定义Timer),发现业务层的性能问题。
坑3:Prometheus采集间隔过长导致监控盲区
Prometheus默认15秒采集一次,如果一个Kafka事故发生后在15秒内自动恢复,监控数据里可能完全看不到这次事故(Lag瞬间增长后立即下降)。
解决方案:对关键指标(Lag、UnderReplicatedPartitions)配置更短的采集间隔(5秒),或者在Kafka Client端采集(实时推送,不依赖pull间隔)。
坑4:告警阈值设置不合理导致告警疲劳
初期把Lag告警设成 > 1000,结果每天收到几十条告警,全部是正常的短暂积压(下游DB慢了几秒),运维人员开始忽略告警,直到某次真正的大积压没有被注意到。
解决方案:告警分级(P1/P2/P3),低优先级告警用邮件或群机器人,高优先级告警电话通知。阈值要经过压测确定,不要拍脑袋设置。
五、监控Dashboard设计建议
一个好的Kafka监控Dashboard应该包含以下面板(按重要程度排列):
- 告警汇总:当前触发的所有告警,一屏总览
- 消费者Lag趋势图:所有消费者组的Lag变化曲线(支持按Group筛选)
- 生产消费速率:各Topic的生产速率 vs 消费速率
- Broker健康:每个Broker的状态、磁盘使用率、网络IO
- ISR缩容事件:UnderReplicatedPartitions历史记录
- 延迟分位数:P50/P99/P999端到端延迟
用Grafana的Kafka JMX Dashboard模板(Grafana官方ID: 721)可以快速搭建一个基础Dashboard,再基于此定制。
下一篇(第448期)讲Kafka Streams流处理,窗口聚合与实时统计的实现,让Kafka从消息队列升级为流处理平台。
