1 背景
Java线程池源码分析 里虽然介绍了线程池的核心配置(核心线程数、最大线程数和队列大小)该如何配置,但是实际上业界也没有一个统一的标准。虽然有些所谓的"公式",但是不同的业务场景复杂多变,配置原则也不尽相同。从实际经验来看,IO密集型、CPU密集型应用在线程配置上就比较悬殊,因此没有一个通用的适合所有场景的公式。
那么我们换一种思路,就是既然不能明确配置,那么能不能支持动态配置呢?答案是肯定的,因为线程池本身就支持核心线程数和最大线程数的修改,而且是实时生效的。 通常在生产环境中,我们可以实时监控线程池的运行状态,随时掌握应用服务的性能状况,以便在系统资源紧张时及时告警,动态调整线程配置,必要时进行人工介入,排查问题,线上修复。
也就是说,通过实时监控,然后动态修改。
2 监控
我们知道,线程池使用不当也会使服务器资源枯竭,导致异常情况的发生,比如固定线程池的阻塞队列任务数量过多、缓存线程池创建的线程过多导致内存溢出、系统假死等问题。因此,我们需要一种简单的监控方案来监控线程池的使用情况,比如完成任务数量、未完成任务数量、线程大小等信息。
线程池的监控分为2种类型,一种是在执行任务前后全量统计任务排队时间和执行时间,另外一种是通过定时任务,定时获取活跃线程数,队列中的任务数,核心线程数,最大线程数等数据。
2.1 MonitoredThreadPoolStatisticsExecutor全量统计
参数名称 | 说明 |
poolName | 线程池的名称 |
timeout | 预设的任务超时时间阈值 |
taskTimeoutFlag | 是否记录任务超时次数 |
execTimeout | 任务执行超时时间阈值 |
taskExecTimeoutFlag | 是否记录任务执行超时次数 |
waitInQueueTimeout | 任务在队列中等待的时间阈值 |
taskWaitInQueueTimeoutFlag | 是否记录任务等待时间超时次数 |
queueSizeWarningPercent | 任务队列使用率告警阈值 |
queueSizeWarningFlag | 是否进行队列容量告警 |
queueSizeHasWarningFlag | 是否需要队列容量告警(队列是否曾经达到过预警值) |
taskTotalTime | 任务总时长,以任务提交时间进行计时,单位 ms |
taskTotalExecTime | 任务总执行时长,以任务开始执行进行计时,单位 ms |
minTaskTime | 最短任务时长,以提交时间计时,单位 ms |
maxTaskTime | 最长任务时长,以提交时间计时,单位 ms |
taskTimeoutCount | 任务超时次数,以任务提交进行计时 |
taskExecTimeoutCount | 任务执行超时次数,以任务开始执行时间进行计时 |
taskWaitInQueueTimeoutCount | 任务等待时间超过设定的阈值的次数 |
minTaskExecTime | 最短任务时长,以执行时间计时,单位 ms |
maxTaskExecTime | 最长任务时长,以执行时间计时,单位 ms |
activeCount | 线程池中正在执行任务的线程数量 |
completedTaskCount | 线程池已完成的任务数量,该值小于等于taskCount |
corePoolSize | 线程池的核心线程数量 |
largestPoolSize | 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize |
maximumPoolSize | 线程池的最大线程数量 |
poolSize | 线程池当前的线程数量 |
taskCount | 线程池已经执行的和未执行的任务总数 |
为了简化,代码中的监控数据都是通过日志打印,实际中是通过kafka收集,然后做出可视化监控。
/**
* 自定义可监控的线程池
*/
public class MonitoredThreadPoolStatisticsExecutor extends ThreadPoolExecutor implements DisposableBean {
/**
* 线程池的名称
*/
private String poolName;
/**
* 预设的任务超时时间阈值,用于统计功能。 * 以任务提交时间进行计时,单位 ms,大于0则记录超时次数。
*/
private long timeout = 120000l;
/**
* 是否记录任务超时次数
*/
private boolean taskTimeoutFlag = false;
/**
* 任务执行超时时间阈值,用于统计功能。 * 以任务开始执行进行计时,单位 ms,大于 0 则记录任务执行超时次数。
*/
private long execTimeout = 120000l;
/**
* 是否记录任务执行超时次数
*/
private boolean taskExecTimeoutFlag = false;
/**
* 任务在队列中等待的时间阈值,用于统计功能。 * 以任务提交时间开始计时到开始执行为止,单位 ms。
*/
private long waitInQueueTimeout = 60000l;
/**
* 是否记录任务等待时间超时次数
*/
private boolean taskWaitInQueueTimeoutFlag = false;
/**
* 任务队列使用率告警阈值
*/
private int queueSizeWarningPercent = 80;
/**
* 是否进行队列容量告警
*/
private boolean queueSizeWarningFlag = false;
/**
* 是否需要队列容量告警(队列是否曾经达到过预警值)
*/
private AtomicBoolean queueSizeHasWarningFlag = new AtomicBoolean(false);
/**
* 任务总时长,用于统计功能。以任务提交时间进行计时,单位 ms
*/
private AtomicLong taskTotalTime = new AtomicLong(0);
/**
* 任务总执行时长,用于统计功能。以任务开始执行进行计时,单位 ms
*/
private AtomicLong taskTotalExecTime = new AtomicLong(0);
/**
* 最短任务时长,以提交时间计时,单位 ms
*/
private long minTaskTime = Long.MAX_VALUE;
/**
* 最长任务时长,以提交时间计时,单位 ms
*/
private long maxTaskTime = 0;
/**
* 任务超时次数,以任务提交进行计时
*/
private AtomicLong taskTimeoutCount = new AtomicLong(0);
/**
* 任务执行超时次数,以任务开始执行时间进行计时
*/
private AtomicLong taskExecTimeoutCount = new AtomicLong(0);
/**
* 任务等待时间超过设定的阈值的次数
*/
private AtomicLong taskWaitInQueueTimeoutCount = new AtomicLong(0);
/**
* 最短任务时长,以执行时间计时,单位 ms
*/
private long minTaskExecTime = Long.MAX_VALUE;
/**
* 最长任务时长,以执行时间计时,单位 ms
*/
private long maxTaskExecTime = 0;
/**
* 保存任务信息
*/
private Map<String, TaskStatistics> taskInfoMap = new ConcurrentHashMap<String, TaskStatistics>();
public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), handler);
this.poolName = poolName;
this.timeout = timeout;
this.execTimeout = execTimeout;
this.waitInQueueTimeout = waitInQueueTimeout;
this.queueSizeWarningPercent = queueSizeWarningPercent;
if (this.timeout > 0) {
this.taskTimeoutFlag = true;
}
if (this.execTimeout > 0) {
this.taskExecTimeoutFlag = true;
}
if (this.waitInQueueTimeout > 0) {
this.taskWaitInQueueTimeoutFlag = true;
}
if (this.queueSizeWarningPercent > 0) {
this.queueSizeWarningFlag = true;
}
ThreadPoolMonitor.monitor(this);
}
public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName));
this.poolName = poolName;
this.timeout = timeout;
this.execTimeout = execTimeout;
this.waitInQueueTimeout = waitInQueueTimeout;
this.queueSizeWarningPercent = queueSizeWarningPercent;
if (this.timeout > 0) {
this.taskTimeoutFlag = true;
}
if (this.execTimeout > 0) {
this.taskExecTimeoutFlag = true;
}
if (this.waitInQueueTimeout > 0) {
this.taskWaitInQueueTimeoutFlag = true;
}
if (this.queueSizeWarningPercent > 0) {
this.queueSizeWarningFlag = true;
}
ThreadPoolMonitor.monitor(this);
}
@Override
public void execute(Runnable command) {
this.taskInfoMap.put(String.valueOf(command.hashCode()), new TaskStatistics());
if (this.queueSizeWarningFlag) {
float f = (float) getQueue().size() / (getQueue().size() + getQueue().remainingCapacity());
BigDecimal bd = new BigDecimal(f).setScale(2, BigDecimal.ROUND_HALF_UP);
int usedPercent = bd.multiply(new BigDecimal(100)).intValue();
if (usedPercent > this.queueSizeWarningPercent) {
this.queueSizeHasWarningFlag.set(true);
System.out.println("queueSize percent Warning!used:" + usedPercent + "%,qSize:" + getQueue().size() + ",remaining:" + getQueue().remainingCapacity());
}
}
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
TaskStatistics taskStatistics = this.taskInfoMap.get(String.valueOf(r.hashCode()));
if (null != taskStatistics) {
taskStatistics.setStartExecTime(System.currentTimeMillis());
}
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//重写此方法做一些统计功能
long endTime = System.currentTimeMillis();
TaskStatistics taskStatistics = this.taskInfoMap.remove(String.valueOf(r.hashCode()));
if (null != taskStatistics) {
long taskTotalTime = endTime - taskStatistics.getCommitTime();
long taskExecTime = endTime - taskStatistics.getStartExecTime();
long taskWaitInQueueTime = taskStatistics.getStartExecTime() - taskStatistics.getCommitTime();
this.taskTotalTime.addAndGet(taskTotalTime);
this.taskTotalExecTime.addAndGet(taskExecTime);
if (this.minTaskTime > taskTotalTime) {
this.minTaskTime = taskTotalTime;
}
if (this.maxTaskTime < taskTotalTime) {
this.maxTaskTime = taskTotalTime;
}
if (this.taskTimeoutFlag && taskTotalTime > this.timeout) {
this.taskTimeoutCount.incrementAndGet();
}
if (this.minTaskExecTime > taskExecTime) {
this.minTaskExecTime = taskExecTime;
}
if (this.maxTaskExecTime < taskExecTime) {
this.maxTaskExecTime = taskExecTime;
}
if (this.taskExecTimeoutFlag && taskExecTime > this.execTimeout) {
this.taskExecTimeoutCount.incrementAndGet();
}
if (this.taskWaitInQueueTimeoutFlag && taskWaitInQueueTime > this.waitInQueueTimeout) {
this.taskWaitInQueueTimeoutCount.incrementAndGet();
}
System.out.println("task cost info[ taskTotalTime:" + taskTotalTime + ",taskExecTime:" + taskExecTime + ",taskWaitInQueueTime:" + taskWaitInQueueTime + " ]");
// 初始线程数、核心线程数、正在执行的任务数量、
// 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、
// 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
LOGGER.info("{}-pool-monitor: " +
" PoolSize: {}, CorePoolSize: {}, Active: {}, " +
"Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
"MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
this.poolName,
this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
}
super.afterExecute(r, t);
}
/**
* Spring容器管理线程池的生命周期,线程池Bean销毁之前先关闭 * @throws Exception
*/
@Override
public void destroy() throws Exception {
shutdown();
}
/**
* 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
*/
@Override
public void shutdown() {
// 统计已执行任务、正在执行任务、未执行任务数量
LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}
/**
* 线程池立即关闭时,统计线程池情况
*/
@Override
public List<Runnable> shutdownNow() {
// 统计已执行任务、正在执行任务、未执行任务数量
LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}
/**
* 任务平均时长,无已完成任务时,返回 0
*/
public long getTaskAvgTime() {
if (this.getCompletedTaskCount() > 0) {
return this.getTaskTotalTime().get() / this.getCompletedTaskCount();
}
return 0;
}
/**
* 任务平均执行时长,无已完成任务时,返回 0
*/
public long getTaskAvgExecTime() {
if (this.getCompletedTaskCount() > 0) {
return this.getTaskTotalExecTime().get() / this.getCompletedTaskCount();
}
return 0;
}
//省略setter/getter方法
}
标签:执行,JAVA,private,任务,线程,long,监控,超时
From: https://blog.51cto.com/u_13529088/11944364