首页 > 其他分享 >线程池原理 自定义线程池

线程池原理 自定义线程池

时间:2022-12-22 07:33:14浏览次数:44  
标签:自定义 队列 lock private 任务 线程 原理 public

这一节来自定义一个简单的线程池。

一、自定义阻塞队列

生产者创建任务添加到线程池中,线程池中有若干线程来执行任务,如果任务数大于线程数,线程池中要有一个地方来存储多余的任务

线程池中需要一个存放任务的阻塞队列,所以需要先定义一个阻塞队列

class BlockingQueue<T> {

    static Logger LOG = LoggerFactory.getLogger(BlockingQueue.class);

    //队列
    private Deque<T> queue = new ArrayDeque<>();
    //队列的容量
    private int capcity;

    private ReentrantLock lock= new ReentrantLock();

    //获取元素时队列为空就到这个Condition中等待
    private Condition emptySet = lock.newCondition();
    // 添加元素时如果队列已到达最大容量就到这个condition等待
    private Condition fullSet = lock.newCondition();

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    //添加元素
    public void put(T t){
        //queue是共享变量,多线程操作要加锁
        try {
            lock.lock();
            while(queue.size()==capcity){
                //队列中元素已达到最大容量,添加元素的线程等待
                try {
                    LOG.info("队列元素已满,添加元素线程等待");
                    fullSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //走到这里表示队列中有空位了
            queue.addLast(t);
            LOG.info("元素添加成功");
            //唤醒等待的获取元素的线程
            emptySet.signalAll();
        } finally {
            lock.unlock();
        }
    }

    //获取元素的方法
    public T take(){
        try {
            lock.lock();
            while (queue.size()==0){
                //队列中没有元素
                try {
                    LOG.info("队列为空,获取元素线程等待");
                    emptySet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //走到这里表示队列中有元素了
            T t = queue.removeFirst();
            //叫醒添加元素的等待线程
            fullSet.signalAll();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的获取元素的方法
    public T poll(long time, TimeUnit timeUnit){
        try {
            lock.lock();
            long nanos = timeUnit.toNanos(time);
            while (queue.size()==0){
                //队列中没有元素
                try {
                    if(nanos<=0){
                        LOG.info("等待超时时间到,返回null");
                        return null;
                    }
                    LOG.info("队列为空,获取元素线程等待");
                    //这个方法的返回值表示剩余的等待时间,例如本来等待5s,等了三秒被叫醒了,返回值就是2
                    nanos = emptySet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //走到这里表示队列中有元素了
            T t = queue.removeFirst();
            //叫醒添加元素的等待线程
            fullSet.signalAll();
            return t;
        } finally {
            lock.unlock();
        }
    }
}

我们的线程池中需要一个阻塞队列来存放任务,可以使用上边定义的这个

二、自定义线程池

线程池的代码

class ThreadPool {

    static Logger logger = LoggerFactory.getLogger(ThreadPool.class);

    //用来存储任务的队列
    private BlockingQueue<Runnable> taskQueue;

    //核心数,即线程中可以创建的最大线程数
    private int coreSize;

    //存放线程的集合
    private HashSet<Worker> workers = new HashSet<>();

    //线程的空闲时间,池中的一个线程如果在这段时间后还获取不到任务就会自动终止
    private long time;
    private TimeUnit timeUnit;

    /**
     *
     * @param coreSize
     * @param capacity 线程池中任务队列的容量
     * @param time
     * @param timeUnit
     */
    public ThreadPool(int coreSize,int capacity, long time, TimeUnit timeUnit) {
        this.coreSize = coreSize;
        this.time = time;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(capacity);
    }

    //提交任务的方法
    public void execute(Runnable task){
        synchronized (workers){
            if(workers.size()<coreSize){
                //线程数小于核心数,启动新线程来执行任务
                Worker worker = new Worker(task);
                workers.add(worker);
                logger.info("线程池新增线程执行任务");
                worker.start();

            } else{
                //线程数已达到核心数,把任务添加到队列中
                taskQueue.put(task);
                logger.info("线程池添加任务到队列中");
            }
        }
    }

    //用来描述线程池中工作线程的类
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable task){
            this.task =task;
        }

        @Override
        public void run() {
            //执行任务的逻辑
            //一个任务执行完后继续从任务队列中获取任务来执行
            while (task!=null || (task=taskQueue.poll(time,timeUnit))!=null){
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //任务执行完后清空
                    task = null;
                }
            }

            //走到这里表示没有从任务队列中获取到任务,当前线程将要结束
            synchronized (workers){
                //从线程集合中删除当前线程
                workers.remove(this);
            }
        }
    }

}

三、测试

public class Test9 {

    private static Logger LOG = LoggerFactory.getLogger(Test9.class);


    public static void main(String[] args) {

        ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS);
        for (int i = 0; i < 4; i++) {
            int a=i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    LOG.info("第{}个任务",a);

                }
            });
        }
    }
}

四、拒绝策略

上边的线程池存在一个问题,当有大量任务提交到线程池超过了任务队列的容量时,提交任务的线程就会一直阻塞等待,

//核心1,队列容量1		
ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS);
        for (int i = 0; i < 4; i++) {
            int a=i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    LOG.info("第{}个任务",a);
                    try {
                        Thread.sleep(1000000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

像这样把任务时间延长,提交的任务就会超过队列容量,这时主线程就会阻塞住

实际上应该提供一种拒绝策略来让提交任务的线程自己决定是阻塞死等还是放弃执行任务,

为了实现这个功能,先抽象一个接口来封装拒绝策略

interface RejectPolicy<T> {
    // 把任务队列和当前要提交的任务作为参数
    public void applyPolicy(BlockingQueue<T> queue,T task);
}

然后为了应用拒绝策略需要在阻塞队列BlockingQueue中添加一个tryPut方法

public void tryPut(RejectPolicy<T> rejectPolicy,T t){
        try {
            lock.lock();
            if(queue.size()>=capacity){
                //应用拒绝测试
                rejectPolicy.applyPolicy(this,t);
            } else {
                //还有空间正常添加任务
                queue.addLast(t);
            }
        } finally {
            lock.unlock();
        }
    }

然后修改线程池,添加一个rejectPolicy属性,在构造方法中由任务提交者来赋值

线程池新增的属性

private RejectPolicy<Runnable> rejectPolicy;

线程池的构造方法

public ThreadPool(int coreSize,int capacity, long time, TimeUnit timeUnit,RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.time = time;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(capacity);
        this.rejectPolicy=rejectPolicy;
}

线程池的execute方法

public void execute(Runnable task){
        synchronized (workers){
            if(workers.size()<coreSize){
                //线程数小于核心数,启动新线程来执行任务
                Worker worker = new Worker(task);
                workers.add(worker);
                logger.info("线程池新增线程执行任务");
                worker.start();

            } else{
                //线程数已达到核心数,把任务添加到队列中,传递拒绝策略
                taskQueue.tryPut(rejectPolicy,task);
                logger.info("线程池添加任务到队列中");
            }
        }
}

可以看到,拒绝策略是有提交任务的线程指定,最终是由阻塞队列来执行,阻塞队列不知道拒绝策略具体是什么,这也是java多态的一种体现,面向抽象编程。

标签:自定义,队列,lock,private,任务,线程,原理,public
From: https://www.cnblogs.com/chengxuxiaoyuan/p/16997537.html

相关文章

  • forms组件渲染标签 form表单展示信息 forms组件校验方式 form组件源码 modelform组件
    目录forms组件渲染标签方式一:全自动渲染表单as_pas_ulas_table表单类的label标签方式二:手动渲染方式三:for循环表单对象(推荐)查看源码渲染标签的注意事项form表单展示信息表......
  • 数据库的运行原理及SQL注入
    数据库的运行原理简单了解:数据库是什么?定义:在软件系统中,用于管理和存放所有数据内容那我们就好奇了,我们需要怎么样去管理?应用:我输入账号和密码,然后点击登录按钮,实现一......
  • 谈谈项目中单点登录的实现原理?
    注:单点登录原理是一个重要知识点,也常被问及,很多童鞋照葫芦画瓢搭建过单点登录,但是被问到原理时可能说不出来,下面简单介绍,抛砖引玉,希望对大家有所帮助。单点登录在现在的系......
  • 通过surging的后台托管服务编写任务调度并支持规则引擎自定义脚本
    简介    过去,如果在业务中需要处理任务调度的时候,大家都会使用第三方的任务调度组件,而第三方组件有一套自己的规则,在微服务的中显得那么格格不入,这样就会造成代码臃......
  • 谈谈这几个常见的多线程面试题
    创建线程有几种不同的方式?你喜欢哪一种?为什么?有三种方式可以用来创建线程:继承Thread类实现Runnable接口应用程序可以使用Executor框架来创建线程池实现Runnab......
  • mybatis-plus 多租户 自定义sql
    Mapper中增加注解即可实现自定义sqlpublicinterfaceSysCustomerConfigMapperextendsBaseMapper<SysCustomerConfig>{/***根据条件查询客户设置信息......
  • Java并发原理
    //并发产生的底层原理,从三个维度进行说明://1.内存维度,cpu内存有主存,缓存,寄存器,一般我们操作数据在寄存器上操作是最快的,但是直接在寄存器上操作的话,会造成,每个线程自己的......
  • 内核多线程
    Linux内核多线程(一)Linux内核多线程(二)Linux内核多线程(三)Linux内核多线程(四)Linux内核多线程——补充(各种平台下的多线程) kernel_thread()和kthread_run()/kthread_cre......
  • 通过GitHub和阿里云自定义域名实现https认证
    在GitHub中的操作登录GitHub,点击“Yourrepositories”,进入个人仓库页面;点击“new“,进入新建仓库页面;仓库名称填写<username>.github.io,<username>就是GitHub的账......
  • 虚拟DOM的概念和原理
    概念在vue、react出来之前,都是直接操作DOM,这样会引起重排和重绘。虚拟DOM是简称就是DOM对象,在每次更改的时候,对比新旧DOM对象的差异,然后一次更改,减少了重排和重绘,所以节约......