生产环境线程池监控:如何用JMX和Micrometer观测线程池健康
生产环境线程池监控:如何用JMX和Micrometer观测线程池健康
适读人群:Java后端工程师、关注生产环境稳定性、有线程池使用经验的开发者 | 阅读时长:约16分钟
开篇故事
2021年年底,我们公司上了一个批量导出功能。产品说"用户可以导出最近三年的交易记录",我估算了一下数据量,每个用户最多几十万条,用一个线程池异步处理,感觉没什么问题。
上线一周后,问题来了。运维说服务器内存在涨,虽然不快,但持续往上走。GC日志里也没有明显异常,Full GC频率正常。
我去看监控面板,CPU、内存趋势都有,但就是没有线程池的监控。完全不知道线程池里积了多少任务,队列有多满,拒绝了多少请求。
靠感觉排查了两天,最后用jstack才发现线程池的任务队列里积了8000多个等待任务,每个任务都持有一个大对象(用来存导出的行数据),这些对象无法被GC回收,内存就慢慢泄漏了。
这件事让我下定决心:任何生产级别的线程池,必须有监控。
今天就把我们团队沉淀下来的线程池监控方案分享出来,从JMX到Micrometer,从自定义埋点到Spring Boot Actuator集成,完整讲一遍。
一、为什么要监控线程池
1.1 线程池的四个关键健康指标
一个ThreadPoolExecutor有这些核心状态需要关注:
| 指标 | 获取方式 | 健康状态 | 告警阈值参考 |
|---|---|---|---|
| 核心线程数 | getCorePoolSize() | 固定配置 | 配置变更时告警 |
| 活跃线程数 | getActiveCount() | 反映当前负载 | > 核心线程数的80% |
| 队列大小 | getQueue().size() | 反映积压程度 | > 队列容量的70% |
| 队列剩余容量 | getQueue().remainingCapacity() | 反映快满程度 | < 队列容量的30% |
| 完成任务总数 | getCompletedTaskCount() | 单调递增 | 速率突降告警 |
| 拒绝任务数 | 需要自定义 | 正常为0 | > 0立即告警 |
| 任务等待时间 | 需要自定义 | 反映队列延迟 | > 业务SLA |
ThreadPoolExecutor本身提供了前5个,后两个需要通过自定义RejectedExecutionHandler和任务包装来实现。
1.2 不监控的后果
不监控线程池,就像开车不看仪表盘。可能的后果:
- 任务积压无感知:队列满了还在疯狂往里塞任务,最终OOM或大量拒绝
- 延迟飙高不知情:任务在队列里等了10秒才被执行,用户早超时了
- 线程泄漏:某个任务异常导致线程退出,线程数越来越少,处理速度越来越慢
- 资源浪费:配了100个线程,实际只用了5个,白白占内存
二、线程池监控方案对比
三、完整代码实现
3.1 可监控的ThreadPoolExecutor包装类
package com.laozhang.concurrent.monitor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 可观测的线程池包装器
*
* 在标准ThreadPoolExecutor基础上增加:
* 1. 任务执行时间统计(P50/P99)
* 2. 任务等待时间统计(进队到出队的时间)
* 3. 拒绝任务计数
* 4. 异常任务计数
*
* 测试环境:JDK 11,Spring Boot 2.7
*/
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(MonitoredThreadPoolExecutor.class);
/** 线程池名称,用于区分多个线程池 */
private final String poolName;
/** 拒绝任务总数 */
private final LongAdder rejectedCount = new LongAdder();
/** 任务异常总数 */
private final LongAdder failedCount = new LongAdder();
/** 任务执行总耗时(纳秒),用于计算平均值 */
private final LongAdder totalExecutionNanos = new LongAdder();
/** 任务等待总耗时(纳秒),从提交到开始执行 */
private final LongAdder totalWaitNanos = new LongAdder();
/** 最大任务执行时间(纳秒) */
private final AtomicLong maxExecutionNanos = new AtomicLong(0);
/** 任务完成计数(包含成功和失败) */
private final LongAdder finishedCount = new LongAdder();
/**
* 带提交时间戳的任务包装器
*/
private static class TimestampedTask<T> implements Callable<T> {
private final Callable<T> delegate;
final long submitNanos;
TimestampedTask(Callable<T> delegate) {
this.delegate = delegate;
this.submitNanos = System.nanoTime();
}
@Override
public T call() throws Exception {
return delegate.call();
}
}
public MonitoredThreadPoolExecutor(
String poolName,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
new MonitoredRejectedHandler(poolName));
this.poolName = poolName;
// 将自定义handler的引用传给外部的计数器
((MonitoredRejectedHandler) getRejectedExecutionHandler())
.setRejectedCounter(rejectedCount);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
// 记录任务开始执行时间(放入线程本地变量)
TASK_START_TIME.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
long endNanos = System.nanoTime();
Long startNanos = TASK_START_TIME.get();
TASK_START_TIME.remove();
if (startNanos != null) {
long execNanos = endNanos - startNanos;
totalExecutionNanos.add(execNanos);
finishedCount.increment();
// 更新最大执行时间
long currentMax;
do {
currentMax = maxExecutionNanos.get();
} while (execNanos > currentMax && !maxExecutionNanos.compareAndSet(currentMax, execNanos));
// 慢任务日志(执行超过1秒)
if (execNanos > 1_000_000_000L) {
logger.warn("[{}] 慢任务检测:执行耗时 {}ms",
poolName, execNanos / 1_000_000);
}
}
if (t != null) {
failedCount.increment();
logger.error("[{}] 任务执行异常", poolName, t);
}
}
private static final ThreadLocal<Long> TASK_START_TIME = new ThreadLocal<>();
/**
* 获取线程池健康快照
*/
public ThreadPoolSnapshot getSnapshot() {
long finished = finishedCount.longValue();
long totalExecNanos = totalExecutionNanos.longValue();
return new ThreadPoolSnapshot(
poolName,
getCorePoolSize(),
getMaximumPoolSize(),
getActiveCount(),
getPoolSize(),
getQueue().size(),
getQueue().remainingCapacity(),
getCompletedTaskCount(),
rejectedCount.longValue(),
failedCount.longValue(),
finished > 0 ? totalExecNanos / finished / 1_000_000 : 0, // 平均执行时间(ms)
maxExecutionNanos.get() / 1_000_000 // 最大执行时间(ms)
);
}
/**
* 打印快照到日志(可供定时任务调用)
*/
public void logSnapshot() {
ThreadPoolSnapshot s = getSnapshot();
logger.info("[ThreadPool:{}] core={}/{}, active={}, poolSize={}, " +
"queue={}/{}, completed={}, rejected={}, failed={}, " +
"avgExec={}ms, maxExec={}ms",
s.name(), s.coreSize(), s.maxSize(),
s.activeCount(), s.poolSize(),
s.queueSize(), s.queueSize() + s.queueRemainingCapacity(),
s.completedCount(), s.rejectedCount(), s.failedCount(),
s.avgExecMs(), s.maxExecMs());
}
/**
* 自定义拒绝策略:记录日志+计数,默认丢弃任务
*/
private static class MonitoredRejectedHandler implements RejectedExecutionHandler {
private final String poolName;
private LongAdder rejectedCounter;
MonitoredRejectedHandler(String poolName) {
this.poolName = poolName;
}
void setRejectedCounter(LongAdder counter) {
this.rejectedCounter = counter;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (rejectedCounter != null) rejectedCounter.increment();
LoggerFactory.getLogger(MonitoredRejectedHandler.class)
.error("[{}] 任务被拒绝!queue={}, active={}, poolSize={}",
poolName, executor.getQueue().size(),
executor.getActiveCount(), executor.getPoolSize());
// 这里可以根据业务选择:丢弃、调用方执行、写入DB队列等
// 默认丢弃,生产环境应该根据业务重要性选择合适策略
}
}
/**
* 线程池健康快照(JDK 16+ Record,JDK14以下用普通类)
*/
public record ThreadPoolSnapshot(
String name,
int coreSize, int maxSize,
int activeCount, int poolSize,
int queueSize, int queueRemainingCapacity,
long completedCount, long rejectedCount, long failedCount,
long avgExecMs, long maxExecMs
) {}
}3.2 基于Micrometer的线程池指标注册
package com.laozhang.concurrent.monitor;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.*;
/**
* 将线程池指标注册到Micrometer
* 支持Spring Boot Actuator + Prometheus + Grafana监控栈
*
* 依赖:
* implementation 'io.micrometer:micrometer-registry-prometheus'
* implementation 'org.springframework.boot:spring-boot-starter-actuator'
*
* 指标访问:
* http://localhost:8080/actuator/metrics/thread.pool.active
* http://localhost:8080/actuator/prometheus
*
* Prometheus查询示例:
* rate(thread_pool_completed_total[1m]) -- 每分钟完成任务数
* thread_pool_queue_size / thread_pool_queue_capacity -- 队列使用率
*/
@Component
public class ThreadPoolMetricsRegistrar implements MeterBinder {
private final List<ThreadPoolInfo> threadPools;
public ThreadPoolMetricsRegistrar(List<ThreadPoolInfo> threadPools) {
this.threadPools = threadPools;
}
@Override
public void bindTo(MeterRegistry registry) {
for (ThreadPoolInfo info : threadPools) {
String poolName = info.name();
ThreadPoolExecutor executor = info.executor();
Tags tags = Tags.of("pool", poolName);
// 核心线程数(gauge:当前值)
registry.gauge("thread.pool.core.size", tags, executor,
ThreadPoolExecutor::getCorePoolSize);
// 最大线程数
registry.gauge("thread.pool.max.size", tags, executor,
ThreadPoolExecutor::getMaximumPoolSize);
// 当前活跃线程数
registry.gauge("thread.pool.active", tags, executor,
ThreadPoolExecutor::getActiveCount);
// 当前线程池实际线程数
registry.gauge("thread.pool.pool.size", tags, executor,
ThreadPoolExecutor::getPoolSize);
// 队列中等待的任务数
registry.gauge("thread.pool.queue.size", tags, executor,
e -> e.getQueue().size());
// 队列剩余容量(LinkedBlockingQueue:Integer.MAX_VALUE,慎用)
registry.gauge("thread.pool.queue.remaining", tags, executor,
e -> {
int remaining = e.getQueue().remainingCapacity();
// 无界队列返回MAX_VALUE,替换为-1表示无界
return remaining == Integer.MAX_VALUE ? -1 : remaining;
});
// 完成任务总数(counter:只增不减,用于计算速率)
registry.gauge("thread.pool.completed.total", tags, executor,
ThreadPoolExecutor::getCompletedTaskCount);
// 如果是MonitoredThreadPoolExecutor,注册额外指标
if (executor instanceof MonitoredThreadPoolExecutor monitoredExecutor) {
registry.gauge("thread.pool.rejected.total", tags, monitoredExecutor,
e -> e.getSnapshot().rejectedCount());
registry.gauge("thread.pool.failed.total", tags, monitoredExecutor,
e -> e.getSnapshot().failedCount());
registry.gauge("thread.pool.exec.avg.ms", tags, monitoredExecutor,
e -> e.getSnapshot().avgExecMs());
}
}
}
public record ThreadPoolInfo(String name, ThreadPoolExecutor executor) {}
}3.3 Spring Boot自动配置与Grafana Dashboard配置
package com.laozhang.concurrent.monitor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;
import java.util.concurrent.*;
/**
* 线程池配置中心:统一管理所有业务线程池
*
* 线程池命名规范:
* {业务域}-{功能}-{类型}
* 如:order-export-io, risk-check-cpu, notify-push-io
*
* 配置原则:
* IO密集型:coreSize = CPU * 2,maxSize = CPU * 4,队列 = 200
* CPU密集型:coreSize = CPU,maxSize = CPU + 1,队列 = 50
*/
@Configuration
@EnableScheduling
public class ThreadPoolConfiguration {
private final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/**
* 订单导出线程池(IO密集型)
* - 大量数据库查询,等待时间长
* - 适合较多线程
*/
@Bean(name = "orderExportExecutor")
public MonitoredThreadPoolExecutor orderExportExecutor() {
return new MonitoredThreadPoolExecutor(
"order-export",
CPU_COUNT * 2, // coreSize
CPU_COUNT * 4, // maxSize
60L, TimeUnit.SECONDS, // keepAlive
new ArrayBlockingQueue<>(200), // 有界队列,防止任务无限积压
new NamedThreadFactory("order-export")
);
}
/**
* 风控计算线程池(CPU密集型)
* - 纯内存计算,无IO等待
* - 线程数不超过CPU核心数
*/
@Bean(name = "riskCheckExecutor")
public MonitoredThreadPoolExecutor riskCheckExecutor() {
return new MonitoredThreadPoolExecutor(
"risk-check",
CPU_COUNT,
CPU_COUNT + 1,
30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50),
new NamedThreadFactory("risk-check")
);
}
/**
* 注册所有线程池到Micrometer监控
*/
@Bean
public ThreadPoolMetricsRegistrar threadPoolMetricsRegistrar(
MonitoredThreadPoolExecutor orderExportExecutor,
MonitoredThreadPoolExecutor riskCheckExecutor) {
return new ThreadPoolMetricsRegistrar(List.of(
new ThreadPoolMetricsRegistrar.ThreadPoolInfo("order-export", orderExportExecutor),
new ThreadPoolMetricsRegistrar.ThreadPoolInfo("risk-check", riskCheckExecutor)
));
}
/**
* 定期打印线程池快照到日志
* 即使没有Prometheus,日志也能看到线程池健康状态
*/
@Scheduled(fixedDelay = 30_000) // 每30秒
public void logThreadPoolStatus(
MonitoredThreadPoolExecutor orderExportExecutor,
MonitoredThreadPoolExecutor riskCheckExecutor) {
orderExportExecutor.logSnapshot();
riskCheckExecutor.logSnapshot();
}
/**
* 自定义ThreadFactory,给线程设置有意义的名称
* 方便jstack时识别线程归属
*/
static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private int counter = 0;
NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + "-" + (++counter));
t.setDaemon(false); // 非守护线程,确保任务执行完才关闭
return t;
}
}
}四、踩坑实录
坑1:用LinkedBlockingQueue(无界)导致内存OOM
报错现象: 线程池配置了LinkedBlockingQueue(不传容量参数),大量任务积压,JVM内存持续增长,最终OOM。
原因分析: LinkedBlockingQueue不传容量参数,默认容量是Integer.MAX_VALUE(约21亿)。队列永远不会满,maximumPoolSize设置失效(只有队列满了才会创建超过coreSize的线程),拒绝策略也不会触发。任务对象堆积在队列里,无法被GC。
解法: 生产环境必须使用有界队列(ArrayBlockingQueue或指定了容量的LinkedBlockingQueue),并根据业务特性设置合理的容量。
// 错误:无界队列
new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), // 容量=Integer.MAX_VALUE,危险!
handler);
// 正确:有界队列
new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500), // 队列满了触发拒绝策略
handler);坑2:getActiveCount()的值不准确,不能用于精确控制
报错现象: 用getActiveCount() >= getMaximumPoolSize()判断线程池是否繁忙,用来做限流,但发现判断经常不准,明明线程池很忙但判断结果是false。
原因分析: ThreadPoolExecutor.getActiveCount()的源码:
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked()) ++n; // 通过Worker的锁状态判断是否在执行任务
return n;
} finally {
mainLock.unlock();
}
}isLocked()是AQS的锁状态,线程正在执行beforeExecute或afterExecute时也算active,可能产生偏差。另外,这个方法会加全局锁,高并发调用时本身就是性能瓶颈。
解法: 不要用getActiveCount()做精确控制,用队列大小和提交速率来判断线程池压力。如需精确的活跃计数,用AtomicInteger在beforeExecute/afterExecute中自己维护。
坑3:线程池关闭时未等待任务完成
报错现象: 服务重启时,部分任务执行到一半被中断,导致数据不一致(比如订单已扣款但未创建)。
原因分析: 直接调用executor.shutdownNow()会立即中断所有运行中的线程,正在执行的任务可能被强制停止。
// 错误:shutdownNow() 会中断正在执行的任务
@PreDestroy
public void destroy() {
executor.shutdownNow(); // 危险!
}解法: 使用shutdown()(等待任务完成,不接受新任务)加超时等待:
@PreDestroy
public void destroy() throws InterruptedException {
executor.shutdown(); // 不再接受新任务
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
// 30秒内没完成,强制中断
logger.warn("线程池 [{}] 30秒内未正常关闭,强制关闭", poolName);
executor.shutdownNow();
// 再等5秒
executor.awaitTermination(5, TimeUnit.SECONDS);
}
logger.info("线程池 [{}] 已正常关闭", poolName);
}坑4:Micrometer的Gauge使用弱引用,线程池被GC导致指标消失
报错现象: 某次版本发布后,Grafana上线程池的指标突然全部消失,重启服务后偶尔恢复。
原因分析: Micrometer的Gauge对被监控对象持有弱引用(WeakReference),当被监控的ThreadPoolExecutor被GC回收时,Gauge的value()函数返回NaN,Prometheus数据缺失。
常见原因:在@Bean方法里创建了局部变量,但实际注册到Gauge的对象(executor)被IoC容器正确管理了——但如果IoC容器创建了新的Bean实例替换旧的(比如某些AOP代理场景),旧的executor被GC,Gauge就失效了。
解法: 确保传入Gauge的对象生命周期与应用一致(Spring Bean就足够),不要传入局部变量或匿名对象。使用MeterBinder接口是推荐做法,Spring会在适当时机调用bindTo。
五、总结与延伸
生产环境线程池监控是系统可观测性的重要组成部分,核心是三件事:
1. 指标收集:
- 直接API:
getActiveCount(),getQueue().size(),getCompletedTaskCount() - 自定义埋点:拒绝计数、执行时间分布、任务等待时间
- 框架集成:Micrometer注册Gauge,对接Prometheus
2. 可视化告警:
- Grafana Dashboard展示趋势,设置阈值告警
- 关键指标:队列使用率(> 70%告警)、拒绝数(> 0告警)、活跃线程比例
- 日志兜底:没有监控系统时,定时
logSnapshot()也能保证有据可查
3. 线程命名规范:
{业务}-{功能}-{线程序号}
订单导出:order-export-1, order-export-2...
风控计算:risk-check-1, risk-check-2...规范命名后,jstack时能立刻知道每个线程在干什么。
推荐监控Grafana Query(Prometheus):
# 队列使用率
thread_pool_queue_size{pool="order-export"} /
(thread_pool_queue_size{pool="order-export"} +
thread_pool_queue_remaining{pool="order-export"})
# 每分钟完成任务数(速率)
rate(thread_pool_completed_total[1m])
# 拒绝任务累计数
thread_pool_rejected_total线程池监控做好了,99%的线程池问题能在用户感知之前发现并处理。
