首页 > 其他分享 >自定义线程池实现(一)

自定义线程池实现(一)

时间:2024-07-31 19:53:15浏览次数:11  
标签:INFO 自定义 实现 队列 线程 main com define

预期目标

1.实现一个相对完备的线程池
2.自定义拒绝策略(下一节)

线程池的基本参数

1.核心线程数
2.超时时间
3.拒绝策略(在下一篇中添加)
4.工作队列
5.任务队列
在这里插入图片描述

工作机制

当添加一个任务到线程池中时,线程池会判断工作线程数量是否小于核心线程数,若小于创建工作线程,执行任务;反之将其添加到任务队列,若是当前任务队列已经满了,可以执行拒绝策略(拒绝策略有很多种,例如死等[会阻塞main线程],放弃任务,抛出异常等等)

工作线程执行过程

工作线程会先将手头上的任务干完,然后到工作队列当中取,如果工作队列中还有任务,取出来继续执行…(周而复始)
但是有可能在一段时间内,工作队列中没任务执行,这个时候我们可以选择让它死等,或者超出指定时间之后自己销毁。

了解这些之后,正式开始coding…

1.构建一个阻塞队列

在前面博客中已经实现过了,需要锁,两个条件变量[生产者,消费者],普通队列这三个参数。

@Slf4j
class BlockQueue<T> {
    //1.任务队列
    private Deque<T> tDeque = new ArrayDeque<>();
    //2.锁
    private ReentrantLock lock = new ReentrantLock();
    //3.两个条件变量(生产者消费者)
    private Condition notEmpty;
    private Condition notFull;
    private int capacity;

    public BlockQueue(int capacity) {
        this.notEmpty = lock.newCondition();
        this.notFull = lock.newCondition();
        this.capacity = capacity;
    }
    //带超时的阻塞获取
    public T poll(long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            //将timeout转换
            long nanos = timeUnit.toNanos(timeout);
            while (tDeque.isEmpty()) {
                try {
                    //返回的是剩余的时间
                    if (nanos <= 0) return null;
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    log.error("error{}",e.getMessage());
                }
            }
            notFull.signal();
            return tDeque.removeFirst();
        } finally {
            lock.unlock();
        }
    }

    //消费者
    public T take() {
        lock.lock();
        try {
            while (tDeque.isEmpty()) {
                try {
                    notEmpty.await();
                } catch (InterruptedException e) {
                    log.error("error{}",e.getMessage());
                }
            }
            notFull.signal();
            return tDeque.removeFirst();//消费对头
        } finally {
            lock.unlock();
        }
    }

    //阻塞添加
    //生产者
    public void put(T ele) {
        lock.lock();
        try {
            while (tDeque.size() == capacity) {
                try {
                    log.info("等待加入任务队列......");
                    notFull.await();
                } catch (InterruptedException e) {
                    log.error("error{}",e.getMessage());
                }
            }
            log.info("已加入任务队列");
            tDeque.addLast(ele);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
    //非阻塞式添加
    //即使失败也不会阻塞住主线程
    public boolean offer(T ele, long timeout, TimeUnit timeUnit){
        lock.lock();
        try {
            long nanosTime = timeUnit.toNanos(timeout);
            while (tDeque.size() == capacity) {
                try {
                    if (nanosTime <= 0) return false;
                    nanosTime = notFull.awaitNanos(nanosTime);
                } catch (InterruptedException e) {
                    log.error("error{}",e.getMessage());
                }
            }
            log.info("已加入任务队列");
            tDeque.addLast(ele);
            notEmpty.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }
    //获取大小
    public int size() {
        lock.lock();
        try {
            return tDeque.size();
        } finally {
            lock.unlock();
        }
    }
}

2.写线程池

@Slf4j
class ThreadPool {
    //任务队列
    private BlockQueue<Runnable> taskQueue;
    //线程集合 我们需要对线程做一个包装
    private HashSet<Worker> workers = new HashSet<>();
    //核心线程数量
    private long coreSize;
    //超时时间
    private long timeout;
    //时间单位
    private TimeUnit timeUnit;
    //自定义拒绝策略
    //private RejectPolicy rejectPolicy;
    public ThreadPool(int queueCapacity,long coreSize,long timeout,TimeUnit timeUnit){
        taskQueue = new BlockQueue<>(queueCapacity);
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }

    //执行任务
    public void execute(Runnable task){
        //当任务数量尚未超过coreSize
        synchronized (workers){
            if (workers.size() < coreSize){
                log.info("创建工作线程{}",task);
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            }else{
                log.info("加入到任务队列{}",task);
                //有可能会阻塞在这里 进而将主线程阻塞掉
                taskQueue.put(task);
                //这里会有很多种策略自定义策略
                //1.死等
                //2.带超时等待
                //3.让调用者放弃任务执行
                //4.让调用者抛出异常
                //5.让调用者自己执行任务
                //策略模式:操作抽象成接口实现代码是传过来不会写死
            }
        }
    }

    class Worker extends Thread{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }

        @Override
        public void run() {
           while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
               try {
                   log.info("正在执行...{}",task);
                   //执行任务
                   task.run();
               }catch (Exception e){
                   System.out.println(e.getMessage());
               }finally {
                   //不要忘记这一步
                   task = null;
               }
           }
           synchronized (workers){
               log.info("worker被移除{}",this);
               workers.remove(this);
           }
        }
    }
}

测试:

[main] INFO com.define.ThreadPool - 创建工作线程com.define.TestPool$$Lambda$1/1880587981@65b3120a
[main] INFO com.define.ThreadPool - 创建工作线程com.define.TestPool$$Lambda$1/1880587981@4783da3f
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@49097b5d
[Thread-0] INFO com.define.ThreadPool - 正在执行...com.define.TestPool$$Lambda$1/1880587981@65b3120a
[main] INFO com.define.BlockQueue - 已加入任务队列
[Thread-1] INFO com.define.ThreadPool - 正在执行...com.define.TestPool$$Lambda$1/1880587981@4783da3f
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@6e2c634b
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@37a71e93
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@7e6cbb7a
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@7c3df479
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@7106e68e
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@7eda2dbb
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@6576fe71
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@76fb509a
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@300ffa5d
[main] INFO com.define.BlockQueue - 已加入任务队列
[main] INFO com.define.ThreadPool - 加入到任务队列com.define.TestPool$$Lambda$1/1880587981@1f17ae12
[main] INFO com.define.BlockQueue - 等待加入任务队列......

测试没什么问题,但是能发现如果当前工作线程都是busy,并且任务队列也满了,当执行put的时候,就会阻塞在这里,put阻塞—>execute阻塞---->main线程阻塞,当然阻塞也是一种方式那如果不想让它阻塞,比如我添加不进去想让他直接丢弃或者抛出异常应该怎么办,那就需要自定义一套拒绝策略,下一节继续。

标签:INFO,自定义,实现,队列,线程,main,com,define
From: https://blog.csdn.net/m0_59925573/article/details/140830835

相关文章

  • js中两种设置子类原型(prototype)来实现原型式继承的本质区别
    在JavaScript中,这两种设置子类(Child)原型(prototype)的方法虽然都能实现继承的基本功能,但它们之间存在一些重要的区别和潜在的陷阱。1. Child.prototype=Object.create(Parent.prototype);这个方法使用Object.create()来创建一个新对象,其原型(__proto__)被设置为Parent.prototy......
  • 懂个锤子Vue 自定义指定、插槽:
    Vue自定义指定、插槽......
  • 技术干货|如何轻松实现小型化超构表面天线的特征模式分析和设计?
    摘要无线通信技术的迅速发展对天线小型化提出了苛刻的要求。本文介绍了一种小型化超构表面天线的设计,该天线由四层等间距平行放置的电磁超构表面组成,每层超构表面由3×3的方形贴片组成的阵列组成。文章基于Feko对该天线进行了特征模式分析(CMA),并通过特征模式的近、远场和表面......
  • Quart自定义文件导出名
    直接上代码fromquartimportQuart,send_fileimportioimportxlwtapp=Quart(__name__)@app.route('/download-excel',methods=["POST"])asyncdefdownload_excel():#创建一个简单的Excel文件workbook=xlwt.Workbook()sheet=workb......
  • Java并发(十六)一文搞懂Java 线程池原理
    简介什么是线程池线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。为什么要用线程池如果并发请求数量很多,但每个线程执行的时间很短,就会出现频繁的创建和销毁线程。如此一来,会大大降低系统的效率,可能频繁创建和销毁线程的时间......
  • Javaweb项目|基于SpringBoot的企业客户管理系统的设计与实现【源码+论文+PPT+部署视频
    我们提供多元化的技术项目服务,涵盖Java、PHP、Python等编程语言,以及前端开发、人工智能、大数据、单片机开发、ASP.NET、物联网等领域。我们还提供简历模板、面试题库和学习资料,帮助用户提升技术能力和就业竞争力。我们的服务内容包括:免费功能设计、任务书和开题报告撰写、中......
  • Javaweb项目|springboot基于JavaWeb技术的在线考试系统设计与实现【源码+论文+PPT+部
    我们提供多元化的技术项目服务,涵盖Java、PHP、Python等编程语言,以及前端开发、人工智能、大数据、单片机开发、ASP.NET、物联网等领域。我们还提供简历模板、面试题库和学习资料,帮助用户提升技术能力和就业竞争力。我们的服务内容包括:免费功能设计、任务书和开题报告撰写、中......
  • Javaweb项目|基于SpringBoot的企业客户管理系统的设计与实现
    收藏点赞不迷路 关注作者有好处文末获取源码一、系统展示二、万字文档展示 基于基于SpringBoot的企业客户管理系统的设计与实现开发语言:Java数据库:MySQL技术:Spring+SpringMVC+MyBatis+Vue工具:IDEA/Ecilpse、Navicat、Maven 编号:springboot024一、系统展示二......
  • Javaweb项目|springboot基于JavaWeb技术的在线考试系统设计与实现
    收藏点赞不迷路 关注作者有好处文末获取源码一、系统展示二、万字文档展示 基于springboot基于JavaWeb技术的在线考试系统设计与实现开发语言:Java数据库:MySQL技术:Spring+SpringMVC+MyBatis+Vue工具:IDEA/Ecilpse、Navicat、Maven 编号:springboot072一、系统展......
  • python实现jenkins凭据录入
    #新增配置importjenkins.model.*importjenkins.plugins.publish_over_ssh.BapSshHostConfigurationdefinst=Jenkins.getInstance()defpublish_ssh=inst.getDescriptor("jenkins.plugins.publish_over_ssh.BapSshPublisherPlugin")defconfiguration=new......