【自定义线程池】超详细!一文轻松理解JDK线程池 - java
通过手敲一遍自定义线程池代码,轻松理解jdk中线程池的原理,可以放心告诉面试官 研究过jdk线程池源码!
本文参考 b站黑马程序员 满一航老师的JUC课程 p200-208
https://www.bilibili.com/video/BV16J411h7Rd?p=207&vd_source=d108b78e319643eb4bdacfd7b6fd3916
首先明确 自定义线程池 需要哪几个模块?
- 线程池
- 任务队列
- 拒绝策略(任务队列满时,采取的入队操作)
而当向任务队列中加任务,任务队列满时,应该如何操作?
-
死等
-
带超时等待
-
用者放弃任务执行
-
让调用者抛弃异常
-
让调用者自己执行任务
这里采用了设计模式中的 策略模式,听起来可能高大上,其实就是通过函数式接口,在构造时由 lambda 表达式 传入具体的实现。
拒绝策略
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
任务队列
-
传入泛型,已适应不同的任务
-
双向链表实现
-
任务的出队和入队需要线程安全,定义锁
-
设定锁的条件变量,队列空时,出队需要等待;队列满时,入队需要等待
-
出队方法(获取任务)
- 阻塞获取
public T take()
- 带超时的阻塞获取
public boolean offer(T task, long timeout, TimeUnit timeUnit)
- 先将timeout统一为纳秒
long nanos = unit.toNanos(timeout);
- 虚假唤醒问题,通过awaitNanos方法返回剩余时间 可以重新赋值给nanos 下次只需等待剩余的时间
nanos = emptyWaitSet.awaitNanos(nanos)
- 超时
if(nanos <= 0) return null;
- 先将timeout统一为纳秒
- 阻塞获取
-
入队方法(添加任务)
- 阻塞添加
public void put(T task)
- 带有超时的阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit)
- 带有拒绝策略的添加
public void tryPut(RejectPolicy<T> rejectPolicy, T task)
- 不再使用while循环阻塞添加
- if队列满,调用
rejectPolicy.reject(this, task)
;抽象方法,具体怎么实现,在创建线程池时,通过线程池的构造方法传入lambda表达式
- 阻塞添加
@Slf4j
class BlockingQueue<T>{
//1. 任务队列 双向链表Deque实现
private Deque<T> queue = new ArrayDeque<>();
//2. 锁
// 从任务队列头部获取任务时 只能由一个线程获取到 其他线程等待
// 可能会有多个线程向队列尾部添加任务
private ReentrantLock lock = new ReentrantLock();
//3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
//将 timeout 统一转换为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
if(nanos <= 0) return null; //超时 返回null
nanos = emptyWaitSet.awaitNanos(nanos);//存在虚假唤醒问题 但awaitNanos方法返回剩余时间 可以重新赋值给nanos 下次只需等待剩余的时间
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 阻塞获取
public T take(){
lock.lock();
try {
while(queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 带有超时时间的阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while(queue.size() == capcity){
try {
log.debug("等待加入任务队列:{}", task);
if(nanos <= 0){
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("加入任务队列:{}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task){
lock.lock();
try {
while(queue.size() == capcity){
try {
log.debug("等待加入任务队列:{}", task);
fullWaitSet.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("加入任务队列:{}", task);
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
// 获取大小
public int size() {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
// 带有拒绝策略的 添加
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判断队列是否满
if(queue.size() == capcity){
//由rejectPolicy决定拒绝策略
rejectPolicy.reject(this, task);
} else{
log.debug("加入任务队列:{}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
线程池
- 成员变量
- 任务队列,泛型为 Runnable 作为任务
- 线程集合
- 线程数量
- 获取线程的超时时间
- 拒绝策略
- 执行任务的方法
public void execute(Runnable task)
- 对线程集合加锁
- 如果当前集合中线程数量小于线程池约定数量,直接创建线程
- 否则,将任务加入到任务队列
taskQueue.tryPut(rejectPolicy, task);
- 通过策略模式,决定如何加入到队列(队列满时怎么拒绝)
- 对线程集合加锁
- 线程内部类,对线程进行包装
- 重写run方法
- 执行任务
- 任务不为空,直接执行任务
- 任务执行完,将任务已经置空,从任务队列中取任务
(task = taskQueue.poll(timeout, timeUnit)) != null
- 注意:这里如果采用 无超时时间的取任务方法,task会一直不为空,阻塞在这里,就无法执行后面 移除当前任务 的代码
- 最后将当前线程移除,要对线程集合加锁
@Slf4j
class ThreadPool{
// 任务可以抽象为Runnable
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务的超时时间 如果超时就可以释放线程
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
// 执行任务
public void execute(Runnable task){
// 当任务数没有超过coreSize时 直接交给work对象执行
// 如果超过 加入任务队列暂存
synchronized (workers){
if(workers.size() < coreSize){
Worker worker = new Worker(task);
log.debug("新增worker{}, task{}", worker,task);
workers.add(worker);
worker.start();
}else{
//taskQueue.put(task);
// 当队列满时可以采取的操作:
// 1) 死等
// 2) 带超时等待
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
// 实现代码可以不写死在threadpool类中,采取策略模式 将其抽象为接口中的抽象方法
taskQueue.tryPut(rejectPolicy, task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
// 包装线程
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1. task不为空,直接执行任务 新建的
// 2. task执行完毕,接着从任务队列中取任务并执行 重用的
//while (task != null || (task = taskQueue.take()) != null) {
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker 被移除{}", this);
workers.remove(this);
}
}
}
}
main方法测试-明确拒绝策略的调用过程
-
创建线程池,通过传入lambda表达式确定线程池的策略模式
-
RejectPolicy<T>
的reject(BlockingQueue<T> queue, T task)
方法接收两个参数 -
传入lambda表达式,实际上就传给了线程池的构造方法
this.rejectPolicy = rejectPolicy;
-
线程池在执行任务时
execute(Runnable task)
,队列满时就执行taskQueue.tryPut(rejectPolicy, task);
-
任务队列的
tryPut
方法在判断队列满时,就执行rejectPolicy.reject(this, task);
,即执行lambda表达式所写的函数方法
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10,
(queue, task) -> {
// 1.死等
// queue.put(task);
// 2.带超时等待
// queue.offer(task, 500, TimeUnit.MILLISECONDS);
// 3.让调用者放弃任务执行
// log.debug("放弃:{}", task);
// 4.让调用者抛弃异常
// throw new RuntimeException("任务执行失败" + task);
// 5.让调用者自己执行任务
task.run(); //主线程自己执行
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("{}", j);
});
}
}
标签:task,java,自定义,队列,lock,任务,线程,public
From: https://blog.csdn.net/m0_60791400/article/details/139359800