前置推荐阅读:java并发之线程池使用-CSDN博客
自定义实现一个带监控的线程池
首先我们继承ThreadPoolExecutor,实现构造函数以及重写beforeExecute和afterExecute两个函数,具体调用我们会在代码实现层面进行详细的分析。
import java.util.concurrent.*;
public class AsyncThreadPool extends ThreadPoolExecutor {
/**
* 任务队列
*/
private BlockingQueue<Runnable> workerQueue;
public AsyncThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
this.workerQueue = workQueue;
}
/**
* 在任务执行之后
*
*
*
* @param r 执行任务
* @param t 异常信息
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("AsyncThreadPool afterExecute threadName:"+Thread.currentThread().getName()+", afterExecutor queueSize:"+workerQueue.size()+" !!!");
}
/**
* 在任务执行之前
*
*
*
* @param t 执行线程
* @param r 异常信息
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("AsyncThreadPool beforeExecute threadName:"+Thread.currentThread().getName()+", afterExecutor queueSize:"+workerQueue.size()+" !!!");
}
}
创建Util并重写ThreadFactory,代码如下:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class AsyncThreadPoolUtil {
/**
* 默认线程数(当前cpu核心数量)
*/
private static final int DEFAULT_CORE_THREAD_SIZE = Runtime.getRuntime().availableProcessors() * 2 + 1;
/**
* 默认工作队列
*/
private static final LinkedBlockingDeque<Runnable> DEFAULT_WORKER_QUEUE = new LinkedBlockingDeque<>(20);
private ThreadPoolExecutor threadPoolExecutor;
public AsyncThreadPoolUtil(String threadName){
this(DEFAULT_CORE_THREAD_SIZE,DEFAULT_CORE_THREAD_SIZE,DEFAULT_WORKER_QUEUE,threadName);
}
public AsyncThreadPoolUtil(int coreThreadSize, int maxThreadSize, BlockingQueue<Runnable> workerQueue,String threadName){
this(coreThreadSize,maxThreadSize,0L,TimeUnit.SECONDS,workerQueue,threadName);
}
public AsyncThreadPoolUtil(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
String threadName){
this.threadPoolExecutor = new AsyncThreadPool(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
new DefaultThreadFactory(threadName),new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory(String threadName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = threadName +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()){
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
/**
* 执行runnable 任务
* @param runnable 提交任务
*/
public void execute(Runnable runnable){
this.threadPoolExecutor.execute(runnable);
}
/**
* 提交异步任务
* @param task 异步任务
* @param <T> T
* @return Future
*/
public <T>Future<T> submit(Callable<T> task){
return this.threadPoolExecutor.submit(task);
}
}
编写Test进行验证
public class Test {
public static void main(String[] args) {
AsyncThreadPoolUtil pool = new AsyncThreadPoolUtil("demo-test-");
for (int i=0;i<200;i++){
pool.execute(()->{
try{
TimeUnit.MILLISECONDS.sleep(500);
}catch (Exception e){}
});
}
}
}
输出信息见截图,由此我们可以在任务执行前以及执行后进行任务的监控,同时可以队列情况。
源码分析
ThreadPoolExecutor类图:
我们从AsyncThreadPool 代码中调用super函数开始看起,该函数中传入:
1.核心线程数:默认情况下不会回收,可通过allowCoreThreadTimeOut函数设置回收,或者设置为0。若无需求,不建议进行核心线程回收。
2.最大线程数:该参数必须大于等于核心线程数,非核心线程数在队列中没有要继续执行任务时会进行回收。
3.非核心线程存活时间
4.非核心线程存活时间单位
5.任务存储队列:当无空闲线城时提交的任务会进入到队列进行等待执行。
6.创建线程工厂:用于创建初始化线程
7.拒绝策略:当无空闲线程且任务队列已满则执行决绝策略。
看完了构造函数创建之后,我们来看任务的提交。在Test中,我们通过pool.execute()函数来提交一个任务到线程池执行,在该函数我们看到线程池中的线程是在提交任务后才进行的初始化。
1. workerCountOf(c)统计当核心线程数量是否已经全部初始化了,如果没有,那么则直接通过addWorker()创建线程执行任务。
2.如果当前核心线程已经全部初始化了,那么则将任务快速添加到队列中,同时校验如果当前线程池已经关闭,那么则移除任务同时执行拒绝策略。如果当前线程池存活线程是0,那么添加工作线程进行任务执行。
3.如果在第2步中添加到任务队列时队列已满,则直接尝试创建非核心线程执行,如果非核心线程也无法创建,那么执行决绝策略。
接下来我们重点分析下 addWorker(Runnable firstTask, boolean core)的函数。
Runnable firstTask
:要执行的第一个任务,如果为null
,则表示新线程将从工作队列中获取任务。boolean core
:指示是否为核心线程,true
表示是核心线程,false
表示非核心线程。
-
循环尝试获取线程池状态(
runStateOf(ctl.get())
):- 如果线程池状态大于或等于
SHUTDOWN
(即线程池正在关闭或已关闭),并且不是在关闭状态下添加新任务到非空队列,那么返回false
,无法添加新工作线程。
- 如果线程池状态大于或等于
-
检查工作线程数量:
- 获取当前线程池的工作线程数量(
workerCountOf(c)
)。 - 如果线程数量已经达到最大容量(
CAPACITY
),或者对于核心线程来说达到了corePoolSize
,对于非核心线程来说达到了maximumPoolSize
,则返回false
,无法添加新工作线程。 - 如果当前线程数量小于上述限制,并且成功通过
compareAndIncrementWorkerCount(c)
方法增加工作线程计数,则跳出循环。
- 获取当前线程池的工作线程数量(
-
创建新工作线程:
- 尝试创建新的
Worker
对象,它是一个继承了Thread
的类,用于执行任务。 - 如果新线程
t
不为空,并且线程池状态允许新线程启动(即runStateOf(ctl.get())
小于SHUTDOWN
或者在关闭状态下且firstTask
为null
),则将新工作线程添加到线程池的workers
集合中,并标记为已添加(workerAdded = true
)。
- 尝试创建新的
-
启动新工作线程:
- 如果工作线程成功添加,调用
t.start()
启动新线程,并将workerStarted
标记为true
。
- 如果工作线程成功添加,调用
-
处理线程启动失败的情况:
- 如果新线程没有成功启动,调用
addWorkerFailed(w)
方法来处理失败情况,这可能包括移除工作线程计数和执行其他清理工作。
- 如果新线程没有成功启动,调用
-
返回结果:
- 返回
workerStarted
,表示新工作线程是否成功启动。
- 返回
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
我们接着分析创建执行任务Worker(),它继承自AbstractQueuedSynchronizer
并实现了Runnable
接口。Worker
类主要负责维护线程的中断状态和一些次要的记账工作,同时它也实现了任务的运行。
private static final long serialVersionUID
:序列化ID,用于序列化机制。final Thread thread
:当前Worker
线程运行的Thread
对象。如果线程工厂创建线程失败,则为null
。Runnable firstTask
:当前Worker
线程需要执行的第一个任务。如果没有初始任务,则为null
。volatile long completedTasks
:此线程完成的任务数量。
构造函数Worker(Runnable firstTask)
:
- 调用
setState(-1)
初始化锁状态为-1,表示在runWorker
方法执行之前禁止中断。 - 初始化
firstTask
为传入的第一个任务。 - 通过线程工厂创建新线程,并将其赋值给
thread
。
运行方法:
public void run()
:将控制权委托给外部的runWorker
方法,开始工作线程的主运行循环。
锁方法:
Worker
类继承自AbstractQueuedSynchronizer
,提供了锁的获取和释放方法。这些方法用于保护任务执行,防止在等待任务时被中断。
protected boolean isHeldExclusively()
:判断当前线程是否独占锁。protected boolean tryAcquire(int unused)
:尝试获取锁。protected boolean tryRelease(int unused)
:尝试释放锁。public void lock()
:获取锁。public boolean tryLock()
:尝试获取锁,如果锁被占用则立即返回false
。public void unlock()
:释放锁。public boolean isLocked()
:判断锁是否被占用。
中断方法:
void interruptIfStarted()
:如果线程已经开始运行并且尚未中断,则尝试中断该线程。这个方法用于在工作线程等待新任务时,如果线程池正在关闭,则中断工作线程。
Worker
类是ThreadPoolExecutor
线程池中每个工作线程的抽象表示。它负责维护线程的运行状态、锁状态和任务执行状态。通过继承AbstractQueuedSynchronizer
,Worker
类提供了一个简单的互斥锁,以确保在执行任务时不会被中断。此外,Worker
类还提供了中断控制,以确保在适当的时候中断
工作线程,特别是在线程池关闭时。
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
线程池的工作原理图解:
以下是对本篇文章的的总结:
主要功能和特点:
- 线程池管理:
ThreadPoolExecutor
允许你控制线程的创建和销毁,以及任务的执行和管理。 - 参数化配置:提供了多个参数来调整线程池的行为,包括核心线程数、最大线程数、线程存活时间、工作队列等。
- 任务执行:可以执行任何实现了
Runnable
接口的任务。 - 线程复用:通过重用线程来执行新任务,减少了线程创建和销毁的开销。
- 拒绝策略:当任务太多,无法被线程池及时处理时,可以定义拒绝策略来处理新提交的任务。
关键组件:
- 核心线程数:即使它们是空闲的,也会保持一定数量的线程。
- 最大线程数:线程池中允许的最大线程数量。
- 工作队列:用于存放待执行任务的队列。
- 线程工厂:用于创建新线程。
- 拒绝执行处理器:当任务太多,无法被线程池及时处理时,定义了如何处理新提交的任务。
方法概览:
execute(Runnable command)
:提交一个任务给线程池执行。shutdown()
:平滑地关闭线程池,不再接受新任务,但会处理完已提交的任务。shutdownNow()
:尝试立即停止所有正在执行的任务,并返回等待执行的任务列表。isShutdown()
、isTerminating()
、isTerminated()
:检查线程池的状态。awaitTermination(long timeout, TimeUnit unit)
:等待线程池终止。setCorePoolSize(int corePoolSize)
、setMaximumPoolSize(int maximumPoolSize)
:动态调整线程池的大小。getQueue()
:获取当前的任务队列。
拒绝策略:
AbortPolicy
:默认策略,当任务不能被接受时抛出异常。CallerRunsPolicy
:用调用者线程来运行任务。DiscardPolicy
:直接丢弃任务。DiscardOldestPolicy
:丢弃队列中最旧的任务,并尝试再次提交新任务。
扩展性:
ThreadPoolExecutor
提供了多个钩子方法,如beforeExecute(Thread t, Runnable r)
和afterExecute(Runnable r, Throwable t)
,允许在任务执行前后进行自定义操作。
这个类是Java并发包中的核心组件,为多线程编程提供了强大的工具,使得任务的并发执行更加高效和易于管理。
标签:Thread,int,剖析,任务,boolean,线程,原理,public From: https://blog.csdn.net/qq_36070104/article/details/142991659