首页 > 编程语言 >【自定义线程池】超详细!一文轻松理解JDK线程池 - java

【自定义线程池】超详细!一文轻松理解JDK线程池 - java

时间:2024-05-31 20:01:21浏览次数:30  
标签:task java 自定义 队列 lock 任务 线程 public

【自定义线程池】超详细!一文轻松理解JDK线程池 - java

通过手敲一遍自定义线程池代码,轻松理解jdk中线程池的原理,可以放心告诉面试官 研究过jdk线程池源码!

本文参考 b站黑马程序员 满一航老师的JUC课程 p200-208

https://www.bilibili.com/video/BV16J411h7Rd?p=207&vd_source=d108b78e319643eb4bdacfd7b6fd3916

首先明确 自定义线程池 需要哪几个模块?

  1. 线程池
  2. 任务队列
  3. 拒绝策略(任务队列满时,采取的入队操作)

而当向任务队列中加任务,任务队列满时,应该如何操作?

  1. 死等

  2. 带超时等待

  3. 用者放弃任务执行

  4. 让调用者抛弃异常

  5. 让调用者自己执行任务

这里采用了设计模式中的 策略模式,听起来可能高大上,其实就是通过函数式接口,在构造时由 lambda 表达式 传入具体的实现。

拒绝策略

@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

任务队列

  • 传入泛型,已适应不同的任务

  • 双向链表实现

  • 任务的出队和入队需要线程安全,定义

  • 设定锁的条件变量,队列空时,出队需要等待;队列满时,入队需要等待

  • 出队方法(获取任务)

    • 阻塞获取 public T take()
    • 带超时的阻塞获取 public boolean offer(T task, long timeout, TimeUnit timeUnit)
      • 先将timeout统一为纳秒 long nanos = unit.toNanos(timeout);
      • 虚假唤醒问题,通过awaitNanos方法返回剩余时间 可以重新赋值给nanos 下次只需等待剩余的时间 nanos = emptyWaitSet.awaitNanos(nanos)
      • 超时 if(nanos <= 0) return null;
  • 入队方法(添加任务)

    • 阻塞添加 public void put(T task)
    • 带有超时的阻塞添加 public boolean offer(T task, long timeout, TimeUnit timeUnit)
    • 带有拒绝策略的添加 public void tryPut(RejectPolicy<T> rejectPolicy, T task)
      • 不再使用while循环阻塞添加
      • if队列满,调用rejectPolicy.reject(this, task);抽象方法,具体怎么实现,在创建线程池时,通过线程池的构造方法传入lambda表达式
@Slf4j
class BlockingQueue<T>{
    //1. 任务队列 双向链表Deque实现
    private Deque<T> queue = new ArrayDeque<>();

    //2. 锁
    // 从任务队列头部获取任务时 只能由一个线程获取到 其他线程等待
    // 可能会有多个线程向队列尾部添加任务
    private ReentrantLock lock = new ReentrantLock();

    //3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    //4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    //5. 容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时的阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            //将 timeout 统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()){
                try {
                    if(nanos <= 0) return null; //超时 返回null
                    nanos = emptyWaitSet.awaitNanos(nanos);//存在虚假唤醒问题 但awaitNanos方法返回剩余时间 可以重新赋值给nanos 下次只需等待剩余的时间
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    // 阻塞获取
    public T take(){
        lock.lock();
        try {
            while(queue.isEmpty()){
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    // 带有超时时间的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while(queue.size() == capcity){
                try {
                    log.debug("等待加入任务队列:{}", task);
                    if(nanos <= 0){
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            log.debug("加入任务队列:{}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }


    // 阻塞添加
    public void put(T task){
        lock.lock();
        try {
            while(queue.size() == capcity){
                try {
                    log.debug("等待加入任务队列:{}", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            log.debug("加入任务队列:{}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    // 获取大小
    public int size() {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }

    // 带有拒绝策略的 添加
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            //判断队列是否满
            if(queue.size() == capcity){
                //由rejectPolicy决定拒绝策略
                rejectPolicy.reject(this, task);
            } else{
                log.debug("加入任务队列:{}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

线程池

  • 成员变量
    • 任务队列,泛型为 Runnable 作为任务
    • 线程集合
    • 线程数量
    • 获取线程的超时时间
    • 拒绝策略
  • 执行任务的方法 public void execute(Runnable task)
    • 对线程集合加锁
      • 如果当前集合中线程数量小于线程池约定数量,直接创建线程
      • 否则,将任务加入到任务队列 taskQueue.tryPut(rejectPolicy, task);
      • 通过策略模式,决定如何加入到队列(队列满时怎么拒绝)
  • 线程内部类,对线程进行包装
    • 重写run方法
    • 执行任务
      • 任务不为空,直接执行任务
      • 任务执行完,将任务已经置空,从任务队列中取任务 (task = taskQueue.poll(timeout, timeUnit)) != null
        • 注意:这里如果采用 无超时时间的取任务方法,task会一直不为空,阻塞在这里,就无法执行后面 移除当前任务 的代码
  • 最后将当前线程移除,要对线程集合加锁
@Slf4j
class ThreadPool{
    // 任务可以抽象为Runnable
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 获取任务的超时时间 如果超时就可以释放线程
    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;

    // 执行任务
    public void execute(Runnable task){
        // 当任务数没有超过coreSize时 直接交给work对象执行
        // 如果超过 加入任务队列暂存
        synchronized (workers){

            if(workers.size() < coreSize){

                Worker worker = new Worker(task);
                log.debug("新增worker{}, task{}", worker,task);
                workers.add(worker);
                worker.start();
            }else{
                //taskQueue.put(task);
                // 当队列满时可以采取的操作:
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                // 实现代码可以不写死在threadpool类中,采取策略模式 将其抽象为接口中的抽象方法
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    // 包装线程
    class Worker extends Thread{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1. task不为空,直接执行任务 新建的
            // 2. task执行完毕,接着从任务队列中取任务并执行 重用的
            //while (task != null || (task = taskQueue.take()) != null) {
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }
}

main方法测试-明确拒绝策略的调用过程

  1. 创建线程池,通过传入lambda表达式确定线程池的策略模式

  2. RejectPolicy<T> reject(BlockingQueue<T> queue, T task)方法接收两个参数

  3. 传入lambda表达式,实际上就传给了线程池的构造方法this.rejectPolicy = rejectPolicy;

  4. 线程池在执行任务时execute(Runnable task),队列满时就执行taskQueue.tryPut(rejectPolicy, task);

  5. 任务队列的tryPut方法在判断队列满时,就执行rejectPolicy.reject(this, task);,即执行lambda表达式所写的函数方法

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10,
                (queue, task) -> {
            // 1.死等
//                    queue.put(task);
            // 2.带超时等待
//                    queue.offer(task, 500, TimeUnit.MILLISECONDS);
            // 3.让调用者放弃任务执行
//                    log.debug("放弃:{}", task);
            // 4.让调用者抛弃异常
//                    throw new RuntimeException("任务执行失败" + task);
            // 5.让调用者自己执行任务
                    task.run(); //主线程自己执行
                });
        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                log.debug("{}", j);
            });
        }
    }

标签:task,java,自定义,队列,lock,任务,线程,public
From: https://blog.csdn.net/m0_60791400/article/details/139359800

相关文章

  • Java中常用的几个时间类
    一、Date类    Date类,代表日期,注意:这里的类是java.util.Date的日期类,不要导错包了(Alt+Enter自动导包)//创建当前时间Datenow=newDate();System.out.println("now="+now);//以指定毫秒值创建时间Datethat=newDate(1);System.out.println("that=......
  • java期末练习题,设计教师类,完成教授,副教授,讲师三个类的定义,在完成相应的测试。
        教授的基本工资为5000元,每学时补贴70元;     副教授的基本工资为3500元,每学时补贴60元;     讲师的基本工资2600元,每学时补贴55元。     已知每个教师的学时数,计算每个教师的每月工资数。输入1对应教授。输入2对应副教......
  • 【备战蓝桥杯】蓝桥杯省一笔记:算法模板笔记(Java)
    蓝桥杯0、快读快写模板1、回文判定2、前缀和3、差分4、二分查找5、快速幂6、判断素数7、gcd&lcm8、进制转换9、位运算10、字符串常用API11、n的所有质因子12、n的质因子个数13、n的约数个数14、n阶乘的约数个数15、n的约数和16、阶乘&双阶乘17、自定义升序降序18、动态......
  • java.lang.UnsatisfiedLinkError: no taos in java.library.path, TDengine 访问数
     TDengine linux部署连接驱动问题: java.lang.UnsatisfiedLinkError:notaosinjava.library.path解决方案有有两种:方法一:使用原生的连接需要安装客户端,docker应用的话需要安装tdengine客户端到相应应用容器里面:windows端的需要安装tdengine客户端注意使用driver驱动......
  • java检测字符串是否包含数字和字母
    在Java中,要检测一个字符串是否同时包含数字和字母,我们可以使用正则表达式(regex)或者通过遍历字符串并检查每个字符来实现。以下是两种方法的详细代码示例:1.方法一:使用正则表达式importjava.util.regex.Matcher;importjava.util.regex.Pattern;publicclassStringChec......
  • Java的JDBC编程
     博主主页: 码农派大星.  数据结构专栏:Java数据结构 数据库专栏:MySQL数据库关注博主带你了解更多数据结构知识1.Java的数据库编程:JDBC数据库驱动包:不同的数据库,对应不同的编程语言提供了不同的数据库驱动包,如:MySQL提供了Java的驱动包mysql-connector-java,需......
  • java多态——向下转型
    引入前面我尝试了一下这个代码packageb;publicclassmain_{ publicstaticvoidmain(String[]args){ //向上转型,父类的引用转向了子类的 father_animal=newgraduate(); Objectobj=newgraduate(); System.out.println(animal.name); System.out.print......
  • JavaScript语法(二):你知道哪些JavaScript语句?
    我们在上一节课中已经讲过了JavaScript语法的顶层设计,接下来我们进入到更具体的内容。JavaScript遵循了一般编程语言的“语句-表达式”结构,多数编程语言都是这样设计的。我们在上节课讲的脚本,或者模块都是由语句列表构成的,这一节,我们就来一起了解一下语句。在JavaScrip......
  • JavaScript语法(四):新加入的**运算符,哪里有些不一样呢?
    上一节我们已经给你介绍了表达式的一些结构,其中关于赋值表达式,我们讲完了它的左边部分,而留下了它右边部分,那么,我们这节课一起来详细讲解。在一些通用的计算机语言设计理论中,能够出现在赋值表达式右边的叫做:右值表达式(RightHandSideExpression),而在JavaScript标准中,规定了在等......
  • JavaScript执行(四):try里面放return,finally还会执行吗?
    在前面几篇文章中,我们已经了解了关于执行上下文、作用域、闭包之间的关系。今天,我们则要说一说更为细节的部分:语句。语句是任何编程语言的基础结构,与JavaScript对象一样,JavaScript语句同样具有“看起来很像其它语言,但是其实一点都不一样”的特点。我们比较常见的语句包括......