首页 > 编程语言 >JAVA线程池监控以及动态调整线程池

JAVA线程池监控以及动态调整线程池

时间:2024-09-07 10:25:51浏览次数:5  
标签:执行 JAVA private 任务 线程 long 监控 超时

1 背景

Java线程池源码分析 里虽然介绍了线程池的核心配置(核心线程数、最大线程数和队列大小)该如何配置,但是实际上业界也没有一个统一的标准。虽然有些所谓的"公式",但是不同的业务场景复杂多变,配置原则也不尽相同。从实际经验来看,IO密集型、CPU密集型应用在线程配置上就比较悬殊,因此没有一个通用的适合所有场景的公式。

那么我们换一种思路,就是既然不能明确配置,那么能不能支持动态配置呢?答案是肯定的,因为线程池本身就支持核心线程数和最大线程数的修改,而且是实时生效的。 通常在生产环境中,我们可以实时监控线程池的运行状态,随时掌握应用服务的性能状况,以便在系统资源紧张时及时告警,动态调整线程配置,必要时进行人工介入,排查问题,线上修复。

也就是说,通过实时监控,然后动态修改。

2 监控

我们知道,线程池使用不当也会使服务器资源枯竭,导致异常情况的发生,比如固定线程池的阻塞队列任务数量过多、缓存线程池创建的线程过多导致内存溢出、系统假死等问题。因此,我们需要一种简单的监控方案来监控线程池的使用情况,比如完成任务数量、未完成任务数量、线程大小等信息。

线程池的监控分为2种类型,一种是在执行任务前后全量统计任务排队时间和执行时间,另外一种是通过定时任务,定时获取活跃线程数,队列中的任务数,核心线程数,最大线程数等数据。

2.1 MonitoredThreadPoolStatisticsExecutor全量统计

参数名称

说明

poolName

线程池的名称

timeout

预设的任务超时时间阈值

taskTimeoutFlag

是否记录任务超时次数

execTimeout

任务执行超时时间阈值

taskExecTimeoutFlag

是否记录任务执行超时次数

waitInQueueTimeout

任务在队列中等待的时间阈值

taskWaitInQueueTimeoutFlag

是否记录任务等待时间超时次数

queueSizeWarningPercent

任务队列使用率告警阈值

queueSizeWarningFlag

是否进行队列容量告警

queueSizeHasWarningFlag

是否需要队列容量告警(队列是否曾经达到过预警值)

taskTotalTime

任务总时长,以任务提交时间进行计时,单位 ms

taskTotalExecTime

任务总执行时长,以任务开始执行进行计时,单位 ms

minTaskTime

最短任务时长,以提交时间计时,单位 ms

maxTaskTime

最长任务时长,以提交时间计时,单位 ms

taskTimeoutCount

任务超时次数,以任务提交进行计时

taskExecTimeoutCount

任务执行超时次数,以任务开始执行时间进行计时

taskWaitInQueueTimeoutCount

任务等待时间超过设定的阈值的次数

minTaskExecTime

最短任务时长,以执行时间计时,单位 ms

maxTaskExecTime

最长任务时长,以执行时间计时,单位 ms

activeCount

线程池中正在执行任务的线程数量

completedTaskCount

线程池已完成的任务数量,该值小于等于taskCount

corePoolSize

线程池的核心线程数量

largestPoolSize

线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize

maximumPoolSize

线程池的最大线程数量

poolSize

线程池当前的线程数量

taskCount

线程池已经执行的和未执行的任务总数

为了简化,代码中的监控数据都是通过日志打印,实际中是通过kafka收集,然后做出可视化监控。

/**
 * 自定义可监控的线程池
 */
 public class MonitoredThreadPoolStatisticsExecutor extends ThreadPoolExecutor implements DisposableBean {
 /**

 * 线程池的名称
 */
 private String poolName;
 /**
 * 预设的任务超时时间阈值,用于统计功能。     * 以任务提交时间进行计时,单位 ms,大于0则记录超时次数。
 */
 private long timeout = 120000l;
 /**
 * 是否记录任务超时次数
 */
 private boolean taskTimeoutFlag = false;
 /**
 * 任务执行超时时间阈值,用于统计功能。     * 以任务开始执行进行计时,单位 ms,大于 0 则记录任务执行超时次数。
 */
 private long execTimeout = 120000l;
 /**
 * 是否记录任务执行超时次数
 */
 private boolean taskExecTimeoutFlag = false;
 /**
 * 任务在队列中等待的时间阈值,用于统计功能。     * 以任务提交时间开始计时到开始执行为止,单位 ms。
 */
 private long waitInQueueTimeout = 60000l;
 /**
 * 是否记录任务等待时间超时次数
 */
 private boolean taskWaitInQueueTimeoutFlag = false;
 /**
 * 任务队列使用率告警阈值
 */
 private int queueSizeWarningPercent = 80;
 /**
 * 是否进行队列容量告警
 */
 private boolean queueSizeWarningFlag = false;
 /**
 * 是否需要队列容量告警(队列是否曾经达到过预警值)
 */
 private AtomicBoolean queueSizeHasWarningFlag = new AtomicBoolean(false);
 /**
 * 任务总时长,用于统计功能。以任务提交时间进行计时,单位 ms
 */
 private AtomicLong taskTotalTime = new AtomicLong(0);
 /**
 * 任务总执行时长,用于统计功能。以任务开始执行进行计时,单位 ms
 */
 private AtomicLong taskTotalExecTime = new AtomicLong(0);
 /**
 * 最短任务时长,以提交时间计时,单位 ms
 */
 private long minTaskTime = Long.MAX_VALUE;
 /**
 * 最长任务时长,以提交时间计时,单位 ms
 */
 private long maxTaskTime = 0;
 /**
 * 任务超时次数,以任务提交进行计时
 */
 private AtomicLong taskTimeoutCount = new AtomicLong(0);
 /**
 * 任务执行超时次数,以任务开始执行时间进行计时
 */
 private AtomicLong taskExecTimeoutCount = new AtomicLong(0);
 /**
 * 任务等待时间超过设定的阈值的次数
 */
 private AtomicLong taskWaitInQueueTimeoutCount = new AtomicLong(0);
 /**
 * 最短任务时长,以执行时间计时,单位 ms
 */
 private long minTaskExecTime = Long.MAX_VALUE;
 /**
 * 最长任务时长,以执行时间计时,单位 ms
 */
 private long maxTaskExecTime = 0;
 /**
 * 保存任务信息
 */
 private Map<String, TaskStatistics> taskInfoMap = new ConcurrentHashMap<String, TaskStatistics>();

 public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), handler);
 this.poolName = poolName;
 this.timeout = timeout;
 this.execTimeout = execTimeout;
 this.waitInQueueTimeout = waitInQueueTimeout;
 this.queueSizeWarningPercent = queueSizeWarningPercent;
 if (this.timeout > 0) {
 this.taskTimeoutFlag = true;
 }
 if (this.execTimeout > 0) {
 this.taskExecTimeoutFlag = true;
 }
 if (this.waitInQueueTimeout > 0) {
 this.taskWaitInQueueTimeoutFlag = true;
 }
 if (this.queueSizeWarningPercent > 0) {
 this.queueSizeWarningFlag = true;
 }

 ThreadPoolMonitor.monitor(this);
 }

 public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName));
 this.poolName = poolName;
 this.timeout = timeout;
 this.execTimeout = execTimeout;
 this.waitInQueueTimeout = waitInQueueTimeout;
 this.queueSizeWarningPercent = queueSizeWarningPercent;
 if (this.timeout > 0) {
 this.taskTimeoutFlag = true;
 }
 if (this.execTimeout > 0) {
 this.taskExecTimeoutFlag = true;
 }
 if (this.waitInQueueTimeout > 0) {
 this.taskWaitInQueueTimeoutFlag = true;
 }
 if (this.queueSizeWarningPercent > 0) {
 this.queueSizeWarningFlag = true;
 }
 ThreadPoolMonitor.monitor(this);
 }

 @Override
 public void execute(Runnable command) {
 this.taskInfoMap.put(String.valueOf(command.hashCode()), new TaskStatistics());
 if (this.queueSizeWarningFlag) {
 float f = (float) getQueue().size() / (getQueue().size() + getQueue().remainingCapacity());
 BigDecimal bd = new BigDecimal(f).setScale(2, BigDecimal.ROUND_HALF_UP);
 int usedPercent = bd.multiply(new BigDecimal(100)).intValue();
 if (usedPercent > this.queueSizeWarningPercent) {
 this.queueSizeHasWarningFlag.set(true);
 System.out.println("queueSize percent Warning!used:" + usedPercent + "%,qSize:" + getQueue().size() + ",remaining:" + getQueue().remainingCapacity());
 }
 }
 super.execute(command);
 }

 @Override
 protected void beforeExecute(Thread t, Runnable r) {
 TaskStatistics taskStatistics = this.taskInfoMap.get(String.valueOf(r.hashCode()));
 if (null != taskStatistics) {
 taskStatistics.setStartExecTime(System.currentTimeMillis());
 }
 super.beforeExecute(t, r);
 }

 @Override
 protected void afterExecute(Runnable r, Throwable t) {
 //重写此方法做一些统计功能
 long endTime = System.currentTimeMillis();
 TaskStatistics taskStatistics = this.taskInfoMap.remove(String.valueOf(r.hashCode()));
 if (null != taskStatistics) {
 long taskTotalTime = endTime - taskStatistics.getCommitTime();
 long taskExecTime = endTime - taskStatistics.getStartExecTime();
 long taskWaitInQueueTime = taskStatistics.getStartExecTime() - taskStatistics.getCommitTime();
 this.taskTotalTime.addAndGet(taskTotalTime);
 this.taskTotalExecTime.addAndGet(taskExecTime);
 if (this.minTaskTime > taskTotalTime) {
 this.minTaskTime = taskTotalTime;
 }
 if (this.maxTaskTime < taskTotalTime) {
 this.maxTaskTime = taskTotalTime;
 }
 if (this.taskTimeoutFlag && taskTotalTime > this.timeout) {
 this.taskTimeoutCount.incrementAndGet();
 }
 if (this.minTaskExecTime > taskExecTime) {
 this.minTaskExecTime = taskExecTime;
 }
 if (this.maxTaskExecTime < taskExecTime) {
 this.maxTaskExecTime = taskExecTime;
 }
 if (this.taskExecTimeoutFlag && taskExecTime > this.execTimeout) {
 this.taskExecTimeoutCount.incrementAndGet();
 }
 if (this.taskWaitInQueueTimeoutFlag && taskWaitInQueueTime > this.waitInQueueTimeout) {
 this.taskWaitInQueueTimeoutCount.incrementAndGet();
 }
 System.out.println("task cost info[ taskTotalTime:" + taskTotalTime + ",taskExecTime:" + taskExecTime + ",taskWaitInQueueTime:" + taskWaitInQueueTime + " ]");

 // 初始线程数、核心线程数、正在执行的任务数量、
 // 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、
 // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
 LOGGER.info("{}-pool-monitor: " +
 " PoolSize: {}, CorePoolSize: {}, Active: {}, " +
 "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
 "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
 this.poolName,
 this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
 this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
 this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());

 }
 super.afterExecute(r, t);
 }

 /**

 * Spring容器管理线程池的生命周期,线程池Bean销毁之前先关闭     * @throws Exception
 */
 @Override
 public void destroy() throws Exception {
 shutdown();
 }

 /**
 * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
 */
 @Override
 public void shutdown() {
 // 统计已执行任务、正在执行任务、未执行任务数量
 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
 this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
 super.shutdown();
 }

 /**
 * 线程池立即关闭时,统计线程池情况
 */
 @Override
 public List<Runnable> shutdownNow() {
 // 统计已执行任务、正在执行任务、未执行任务数量
 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
 this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
 return super.shutdownNow();
 }


 /**

 * 任务平均时长,无已完成任务时,返回 0
 */
 public long getTaskAvgTime() {
 if (this.getCompletedTaskCount() > 0) {
 return this.getTaskTotalTime().get() / this.getCompletedTaskCount();
 }
 return 0;
 }

 /**

 * 任务平均执行时长,无已完成任务时,返回 0
 */
 public long getTaskAvgExecTime() {
 if (this.getCompletedTaskCount() > 0) {
 return this.getTaskTotalExecTime().get() / this.getCompletedTaskCount();
 }
 return 0;
 }
 //省略setter/getter方法
}

标签:执行,JAVA,private,任务,线程,long,监控,超时
From: https://blog.51cto.com/u_13529088/11944364

相关文章

  • 基于ssm+vue电商直播订单管理系统(开题报告+程序+论文+java)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展,电子商务行业迎来了前所未有的繁荣。近年来,电商直播作为一种新兴的购物模式,凭借其直观性、互动性和即时性,迅速成为电商领域......
  • 【Java基础】程序填空题
    这个系列主要是对历年的考试题目中容易模糊的点进行汇总,其中很多内容也附带的了解析。这个系列的所有内容应该是全网最详细的内容了,希望可以帮助大家考试顺利。2024-042023-102023-042022-102022-042021-102021-042020-102020-082019-102019-04求三连!!感谢~~......
  • 企业邮箱监控设置指南:企业邮箱怎么设置邮箱监控(企业邮箱外发邮件如何监控)?
    邮件监控成为了一项重要的信息安全措施。通过合理的邮箱监控设置,企业不仅能有效防止信息泄露,还能提升工作效率和管理透明度。本文将为您详细介绍企业邮箱如何设置邮箱监控,特别是针对外发邮件的监控方法。一、选择合适的邮件监控工具首先,企业需要根据自身需求和预算选择合......
  • 计算机毕业设计必看必学!! 10192 springboot巡更系统,原创定制程序, java、PHP、python
    摘 要目前,在日常生活中随处可见社区巡更人员对特定的区域进行定期或者不定期的安全巡查管理。包括勤前训示、必到点签到、巡更路线等,各项勤务工作均由巡更员本人在执勤日志本中手工填写,且工作点分散,不利于统一监管,存在信息化手段不足,勤务信息获取、输入复杂,监管信息不能......
  • java计算机毕业设计旅游服务管理系统(开题+程序+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着旅游业的蓬勃发展,旅游市场的竞争日益激烈,旅游服务管理面临着前所未有的挑战与机遇。传统的人工管理模式已难以满足旅游企业高效、精准、个性化的......
  • 计算机毕业设计必看必学!! 09446 Springboot基于小程序的校园招聘系统的设计与实现,原
    摘 要随着智能手机的普及和4G网络的发展,以O20为代表的互联网+服务模式从衣食住行等方方面面改变着我们的生活方式。基于小程序的校园招聘系统主要功能模块包括用户管理,招聘资讯、招聘职位、简历投递、面试邀请等,采取面对对象的开发模式进行软件的开发和硬体的架设,能很好的......
  • 计算机毕业设计必看必学!! 09259 ssm网上评教系统,原创定制程序, java、PHP、python、
    ssm网上评教系统摘 要随着互联网趋势的到来,各行各业都在考虑利用互联网将自己推广出去,最好方式就是建立自己的互联网系统,并对其进行维护和管理。在现实运用中,应用软件的工作规则和开发步骤,采用Java技术建设网上评教系统。本设计主要实现集人性化、高效率、便捷等优点于......
  • java计算机毕业设计美食商城(开题+程序+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网的飞速发展,电子商务已成为现代生活不可或缺的一部分,尤其在餐饮领域,消费者对于便捷、多样化及个性化的美食需求日益增长。传统的美食消费模......
  • java计算机毕业设计留学服务管理平台(开题+程序+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景:在全球化日益加深的今天,留学已成为众多学生拓宽国际视野、提升综合素质的重要途径。然而,留学过程中的信息繁杂、流程复杂、服务需求多样化等问题,给广......
  • Java并发编程实战 06 | 为什么不建议使用线程优先级?
    什么是线程优先级?线程优先级是一个标识,用来提示操作系统或线程调度器哪个线程更重要、更需要优先执行。优先级通常是一个整数值。在Java中,线程优先级的设置范围从1到10,其中1是最低优先级,10是最高优先级。Java默认情况下,线程的优先级是5。优先级高的线程通常会......