专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

JMX可视化监控线程池

前两天阅读公司代码看到了用JMX监控定时任务信息和状态,JMX这个单词感觉很熟于是便去查阅了一下,并写了监控线程池的Demo

通过阅读本篇文章你将了解到:

  • JMX介绍
  • 线程池介绍
  • JMX监控线程池应用

什么是JMX

JMX简介

JMX(Java Management Extensions),监控管理框架,通过使用JMX可以监控和管理应用程序。JMX最常见的场景是监控Java程序的基本信息和运行情况,任何Java程序都可以开启JMX,然后使用JConsoleVisual VM进行预览

JMX架构

69_1.png总共分为三层,分发层、代理层、设备层
分发层:根据不同的协议定义了对代理层进行各种操作的管理接口,简单的来说是监控指标的查看方式,可以是 HTTP连接、 RMI连接、 SNMP连接
代理层:管理 MBean,通过将 MBean注册到代理层实现 MBean的管理,除了注册 MBean,还可以注册 Adapter,代理层在应用中一般都是 MBeanService
设备层:监控指标抽象出的类,可以分为以下几种:

  • Standard MBean
  • Dynamic MBean
  • Open MBean
  • Model MBean
  • MXBean

应用中一般使用Standard MBean比较多,所以这里只介绍Standard MBean,使用Standard MBean需要满足一定的规则,规则如下:

  • 定义一个接口,接口名必须为XXXXMBean的格式,必须MBean结尾
  • 如果接口为XXXXMBean,则接口实现类必须为MBean,否则程序将报错
  • 接口中通过getset方法表示监控指标是否可读、可写。比如getXXX()抽象方法,则XXX就是监控的指标,getXXX()表示XXX性能指标可读,setXXX()方法表示该监控指标可写
  • 参数和返回类型只能是简单的引用类型(如String)和基本数据类型,不可以是自定义类型,如果返回值为自定义类型可以选择MXBean

线程池简单介绍

线程池是线程的管理工具,通过使用线程池可以复用线程降低资源消耗、提高响应速度、提高线程的可管理性。如果在系统中大量使用线程池,就必须对线程池进行监控方便出错时定位问题。可以通过线程池提供的参数进行监控,线程池提供的参数如下:

方法 含义
getActiveCount 线程池中正在执行任务的线程数量
getCompletedTaskCount 线程池已完成的任务数量
getCorePoolSize 线程池的核心线程数量
getLargestPoolSize 线程池曾经创建过的最大线程数量
getMaximumPoolSize 线程池的最大线程数量
getPoolSize 线程池当前的线程数量
getTaskCount 线程池需要执行的任务数量

应用

介绍完JMX及线程池以后,写一个JMX监控线程池的Demo,总不能纸上谈兵吧

  • 定义线程池监控类:ThreadPoolMonitor.java
    public class ThreadPoolMonitor extends ThreadPoolExecutor {
        private final Logger logger = LoggerFactory.getLogger(getClass());

        /**
         * ActiveCount
         * */
        int ac = 0;

        /**
         * 当前所有线程消耗的时间
         * */
        private AtomicLong totalCostTime = new AtomicLong();

        /**
         * 当前执行的线程总数
         * */
        private AtomicLong totalTasks = new AtomicLong();

        /**
         * 线程池名称
         */
        private String poolName;

        /**
         * 最短 执行时间
         * */
        private long minCostTime;

        /**
         * 最长执行时间
         * */
        private long maxCostTime;

        /**
         * 保存任务开始执行的时间
         */
        private ThreadLocal<Long> startTime = new ThreadLocal<>();

        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
              Executors.defaultThreadFactory(), poolName);
        }

        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory, String poolName) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            this.poolName = poolName;
        }

        public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
            return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
        }

        public static ExecutorService newCachedThreadPool(String poolName) {
            return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
        }

        public static ExecutorService newSingleThreadExecutor(String poolName) {
            return new ThreadPoolMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
        }

        /**
         * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
         */
        @Override
        public void shutdown() {
            // 统计已执行任务、正在执行任务、未执行任务数量
            logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
              this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
            super.shutdown();
        }

        @Override
        public List<Runnable> shutdownNow() {
            // 统计已执行任务、正在执行任务、未执行任务数量
            logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
              this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
            return super.shutdownNow();
        }

        /**
         * 任务执行之前,记录任务开始时间
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            startTime.set(System.currentTimeMillis());
        }

        /**
         * 任务执行之后,计算任务结束时间
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            long costTime = System.currentTimeMillis() - startTime.get();
            startTime.remove();  //删除,避免占用太多内存
            //设置最大最小执行时间
            maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
            if (totalTasks.get() == 0) {
                minCostTime = costTime;
            }
            minCostTime = minCostTime < costTime ? minCostTime : costTime;
            totalCostTime.addAndGet(costTime);
            totalTasks.incrementAndGet();

            logger.info("{}-pool-monitor: " +
                          "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, ActiveCount: {}, " +
                          "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                          "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                    this.poolName,
                    costTime, this.getPoolSize(), this.getCorePoolSize(), super.getActiveCount(),
                    this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                    this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
        }

        public int getAc() {
            return ac;
        }

        /**
         * 线程平均耗时
         *
         * @return
         * */
        public float getAverageCostTime() {
            return totalCostTime.get() / totalTasks.get();
        }

        /**
         * 线程最大耗时
         * */
        public long getMaxCostTime() {
            return maxCostTime;
        }

        /**
         * 线程最小耗时
         * */
        public long getMinCostTime() {
            return minCostTime;
        }

        /**
         * 生成线程池所用的线程,改写了线程池默认的线程工厂
         */
        static class EventThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;

            /**
             * 初始化线程工厂
             *
             * @param poolName 线程池名称
             */
            EventThreadFactory(String poolName) {
                SecurityManager s = System.getSecurityManager();
                group = Objects.nonNull(s) ? s.getThreadGroup() :   Thread.currentThread().getThreadGroup();
                namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
            }

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    }

通过继承线程池来自定义线程池,并在构造函数中加入了`poolName`标明是哪一个线程池,同时重写了`beforeExecute`、`afterExecute`、`terminated`等方法,在`beforeExecute`方法中记录线程池执行的时间,在`afterExecute`方法中计算线程执行的耗时、最大耗时、最小耗时、平均耗时。重写线程池生成线程的方法,指定了生成的线程名

* 定义一个MBeanThreadPoolParamMBean.java
MBean其实并不是一个实体类而是一个接口,里面定义了监控的指标

    public interface ThreadPoolParamMBean {
        /**
         * 线程池中正在执行任务的线程数量
         *
         * @return
         */
        int getActiveCount();

        /**
         * 线程池已完成的任务数量
         *
         * @return
         */
        long getCompletedTaskCount();

        /**
         * 线程池的核心线程数量
         *
         * @return
         */
        int getCorePoolSize();

        /**
         * 线程池曾经创建过的最大线程数量
         *
         * @return
         */
        int getLargestPoolSize();

        /**
         * 线程池的最大线程数量
         *
         * @return
         */
        int getMaximumPoolSize();

        /**
         * 线程池当前的线程数量
         *
         * @return
         */
        int getPoolSize();

        /**
         * 线程池需要执行的任务数量
         *
         * @return
         */
        long getTaskCount();

        /**
         * 线程最大耗时
         *
         * @return
         * */
        long getMaxCostTime();

        /**
         * 线程最小耗时
         *
         * @return
         * */
        long getMinCostTime();

        /**
         * 线程平均耗时
         *
         * @return
         * */
        float getAverageCostTime();
    }

  • 定义一个MBean实现类:ThreadPoolParam.java
    定义的是静态MBean,所以接口实现类必须满足规定,即xxxMBean,实现类为xxx
    public class ThreadPoolParam implements ThreadPoolParamMBean  {
        private ThreadPoolMonitor threadPoolMonitor;

        public ThreadPoolParam(ExecutorService es) {
            this.threadPoolMonitor = (ThreadPoolMonitor) es;
        }

        /**
         * 线程池中正在执行任务的线程数量
         *
         * @return
         */
        @Override
        public int getActiveCount() {
            return threadPoolMonitor.getAc();
        }

        /**
         * 线程池已完成的任务数量
         *
         * @return
         */
        @Override
        public long getCompletedTaskCount() {
            return threadPoolMonitor.getCompletedTaskCount();
        }

        /**
         * 线程池的核心线程数量
         *
         * @return
         */
        @Override
        public int getCorePoolSize() {
            return threadPoolMonitor.getCorePoolSize();
        }

        /**
         * 线程池曾经创建过的最大线程数量
         *
         * @return
         */
        @Override
        public int getLargestPoolSize() {
            return threadPoolMonitor.getLargestPoolSize();
        }

        /**
         * 线程池的最大线程数量
         *
         * @return
         */
        @Override
        public int getMaximumPoolSize() {
            return threadPoolMonitor.getMaximumPoolSize();
        }

        /**
         * 线程池当前的线程数量
         *
         * @return
         */
        @Override
        public int getPoolSize() {
            return threadPoolMonitor.getPoolSize();
        }

        /**
         * 线程池需要执行的任务数量
         *
         * @return
         */
        @Override
        public long getTaskCount() {
            return threadPoolMonitor.getTaskCount();
        }

        /**
         * 线程最大耗时
         *
         * @return
         * */
        @Override
        public long getMaxCostTime() {
            return threadPoolMonitor.getMaxCostTime();
        }

        /**
         * 线程最小耗时
         *
         * @return
         * */
        @Override
        public long getMinCostTime() {
            return threadPoolMonitor.getMinCostTime();
        }

        /**
         * 线程平均耗时
         *
         * @return
         * */
        @Override
        public float getAverageCostTime() {
            return threadPoolMonitor.getAverageCostTime();
        }
    }

监控的参数指标通过线程池得到

* 测试类:Test.java

    public class Test {
        private static Random random = new Random();
        public static void main(String[] args) throws MalformedObjectNameException,  InterruptedException {
            ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1");
            ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1);

            ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2");
            ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2);

            MBeanServerUtil.registerMBean(threadPoolParam1, new ObjectName("test-pool-1:type=threadPoolParam"));
            MBeanServerUtil.registerMBean(threadPoolParam2, new ObjectName("test-pool-2:type=threadPoolParam"));

            //http连接的方式查看监控任务
            HtmlAdaptor.start();

            executeTask(es1);
            executeTask(es2);
            Thread.sleep(1000 * 60 * 60);
        }

        private static void executeTask(ExecutorService es) {
            new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    int temp = i;
                    es.submit(() -> {
                        //随机睡眠时间
                        try {
                            Thread.sleep(random.nextInt(60) * 1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(temp);
                    });
                }
            }).start();
        }
    }

说明:

 *  `MBeanServerUtil.registerMBean()`注册监控的类
 *  `HtmlAdaptor.start()`开启`HTTP`连接的方式查看监控任务

启动程序后打开`http://localhost:8082/`如下图:

69_2.png

点击test-pool-1下的type=threadPoolParam

69_3.png

通过刷新获取线程池最新的监控指标 test-pool-1type=threadPoolParam这些属性是在ObjectName中定义的属性值

总结

使用JMX监控线程池只是JMX一个功能,本篇文章只是学以致用,更多有关JMX以及线程池的内容可以查阅其他资料。文章若有错误欢迎指正

69_4.png

最后附:项目代码,欢迎forkstar,【我都划重点了就star一下】

未经允许不得转载:搜云库技术团队 » JMX可视化监控线程池

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们