首页 > 其他分享 >线程池

线程池

时间:2024-03-27 15:58:40浏览次数:12  
标签:队列 -- 任务 线程 new ThreadPoolExecutor

线程池

引入

一个线程完成一项任务所需时间为:

  1. 创建线程时间 - Time1

  2. 线程中执行任务的时间 - Time2

  3. 销毁线程时间 - Time3

为什么需要线程池

  1. 线程池技术正是关注如何缩短或调整Time1和Time3的时间,从而提高程序的性能。项目中可以把Time1,T3分别安排在项目的启动和结束的时间段或者一些空闲的时间段

  2. 线程池不仅调整Time1,Time3产生的时间段,而且它还显著减少了创建线程的数目,提高线程的复用率

  3. 系统启动一个新线程的成本是比较高的,因为涉及与操作系统的交互,在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时,优先考虑使用线程池

Java提供的线程池

ExecutorService:线程池的接口

Executors:创建各种线程池的工具类

public class Test {
    
    public static void main(String[] args) {
        
        //创建单个线程的线程池
        //ExecutorService pool = Executors.newSingleThreadExecutor();
        //创建指定线程的线程池
        //ExecutorService pool = Executors.newFixedThreadPool(3);
        //创建可缓存线程的线程池,自动回收60s闲置线程
        ExecutorService pool = Executors.newCachedThreadPool();
        
        for (int i = 1; i <= 100; i++) {
            pool.execute(new Task(i));//提交任务,i为任务编号(方便理解)
        }
        pool.shutdown();//关闭线程池
    }
}
class Task implements Runnable{
    private int i;
    public Task(int i) {
        this.i = i;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "-->" + i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

深入源码

ExecutorService pool = Executors.newSingleThreadExecutor();

ExecutorService pool = Executors.newFixedThreadPool(3);

ExecutorService pool = Executors.newCachedThreadPool();

三种线程池底层都是ThreadPoolExecutor类的对象

-- 分析ThreadPoolExecutor类的构造方法源码--------------------------------
public ThreadPoolExecutor(
    int corePoolSize,       ------------- 核心线程数量 
    int maximumPoolSize,    ------------- 最大线程数量 
    long keepAliveTime,     ------------- 闲置时间,作用于核心线程数与最大线程数之间的线程
    TimeUnit unit,          ------------- keepAliveTime的时间单位(可以是毫秒、秒....)
    BlockingQueue<Runnable> workQueue, -- 任务队列
    ThreadFactory threadFactory, -------- 线程工厂
    RejectedExecutionHandler handler ---- 达到了线程界限和队列容量时的处理方案(拒绝策略)
) {}
执行步骤:
    1.创建线程池后
    2.任务提交后,查看是否有核心线程:
        3.1 没有 -> 就创建核心线程 -> 执行任务 -> 执行完毕后又回到线程池中
        3.2 有 -> 查看是否有闲置核心线程:
            4.1 有 -> 执行任务 -> 执行完毕后又回到线程池
            4.2 没有 -> 查看当前核心线程数是否核心线程数量:
                5.1 否 -> 就创建核心线程 -> 执行任务 -> 执行完毕后又回到线程池中
                5.2 是 -> 查看任务列表是否装载满:
                    6.1 没有 -> 就放入列表中,等待出现闲置线程
                    6.2 装满 -> 查看是否有普通线程(核心线程数到最大线程数量之间的线程)
                        7.1 没有 -> 就创建普通线程 -> 执行任务 -> 执行完毕后又回到线程池中
                        7.2 有 -> 查看是否有闲置普通线程
                            7.1.1 有 -> 执行任务 -> 执行完毕后又回到线程池中
                            7.1.2 没有 -> 查看现在所有线程数量是否为最大线程数:
                                8.1 是 -> 执行处理方案(默认处理抛出异常)
                                8.2 否 ->就创建普通线程-> 执行任务 -> 执行完毕后又回到线程池中              
注:
    1.为了更好的理解,在这里区分核心线程和普通线程,实际上区分的这么清楚,都是线程
    2.默认的处理方案就是抛出RejectedExecutionException
总结:核心线程满载 -> 任务队列 -> 普通线程 -> 拒绝策略
​
-- 分析单个线程的线程池的源码 --------------------------------
ExecutorService pool = Executors.newSingleThreadExecutor();
new ThreadPoolExecutor(
    1, -- 核心线程数量 
    1, -- 最大线程数量 
    0L, -- 闲置时间
    TimeUnit.MILLISECONDS, -- 时间单位(毫秒)
    new LinkedBlockingQueue<Runnable>() -- 无界任务队列,可以无限添加任务
)  
-- 分析指定线程的线程池的源码 --------------------------------
ExecutorService pool = Executors.newFixedThreadPool(3);
new ThreadPoolExecutor(
    nThreads, -- 核心线程数量 
    nThreads, -- 最大线程数量 
    0L, -- 闲置时间
    TimeUnit.MILLISECONDS, -- 时间单位(毫秒)
    new LinkedBlockingQueue<Runnable>()-- 无界任务队列,可以无限添加任务
)
-- 创建可缓存线程的线程池 -----------------------------------
new ThreadPoolExecutor(
    0, -- 核心线程数量 
    Integer.MAX_VALUE,-- 最大线程数量 
    60L, -- 闲置时间
    TimeUnit.SECONDS, -- 时间单位(秒)
    new SynchronousQueue<Runnable>() -- 直接提交队列(同步队列):没有容量队列

任务队列详解

队列名称详解
LinkedBlockingQueue无界任务队列使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题
SynchronousQueue 同步任务队列 直接提交任务队列使用直接提交任务队列,队列没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。 任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行了拒绝策略抛出异常。 使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的线程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;
ArrayBlockingQueue有界任务队列使用有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
PriorityBlockingQueue优先任务队列使用优先任务队列,它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

对优先队列的使用说明:

public class Test {
    
    public static void main(String[] args) {
        
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        
        for (int i = 1; i <= 10; i++) {
            pool.execute(new Task(i));
        }
        
        pool.shutdown();
        
    }
}
class Task implements Runnable,Comparable<Task>{
    
    private int priority;
    
    public Task(int priority) {
        this.priority = priority;
    }
​
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " priority:" + this.priority);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
​
    //当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
    @Override
    public int compareTo(Task o) {
        return (this.priority>o.priority)?-1:1;
    }
}
总结:除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

拒绝策略

ThreadPoolExecutor自带的拒绝策略有四种,都实现了RejectedExecutionHandler接口

比如:new ThreadPoolExecutor.AbortPolicy()

拒绝策略解释
AbortPolicy当有任务添加到线程池被拒绝时,会抛出RejectedExecutionException异常 线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy当有任务添加到线程池被拒绝时,直接丢弃,其他啥都没有
CallerRunsPolicy当有任务添加到线程池被拒绝时,线程池会将被拒绝的任务添加到线程池正在运行的线程中去运行。 一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
DiscardOledestPolicy当有任务添加到线程池被拒绝时,线程池会丢弃阻塞队列中末尾的任务(最老的任务),然后将被拒绝的任务添加到末尾。 如果项目中有允许丢失任务的需求,可以使用

自定义拒绝策略

public class Test {
    
    public static void main(String[] args) {
        
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"执行了拒绝策略");
            }
        });
        
        for (int i = 1; i <= 10; i++) {
            pool.execute(new Task());
        }
        
        pool.shutdown();
    }
}
class Task implements Runnable{
    
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

自定义线程池原因

在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面使线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活,而且有资源耗尽的风险(OOM - Out Of Memory )。

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但这种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况

自定义线程池

前提学习线程池中如何创建线程:

线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等

public class Test {
​
    public static void main(String[] args) {
​
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                10, 10, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), 
                new ThreadFactory() {
                    int threadNum = 1;
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r,"线程"+threadNum++);
                        return t;
                    }
                }, new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString()+"执行了拒绝策略");
                    }
            });
​
        for (int i = 1; i <= 10; i++) {
            pool.execute(new Task());
        }
​
        pool.shutdown();
​
    }
}
class Task implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个方法的重写, 通过这三个方法我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下

方法解释
beforeExecute线程池中任务运行前执行
afterExecute线程池中任务运行完毕后执行
terminated线程池退出后执行
ThreadPoolExecutor pool = new ThreadPoolExecutor(
                10, 10, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10), 
                new ThreadFactory() {
                    int threadNum = 1;
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r,"线程"+threadNum++);
                        return t;
                    }
                }, new ThreadPoolExecutor.AbortPolicy()
            ){
                @Override
                protected void beforeExecute(Thread t,Runnable r) {
                    System.out.println("准备执行:"+ Thread.currentThread().getName());
                }
                @Override
                protected void afterExecute(Runnable r,Throwable t) {
                    System.out.println("执行完毕:"+ Thread.currentThread().getName());
                }
                @Override
                protected void terminated() {
                    System.out.println("线程池退出");
                }
        };
​
        for (int i = 1; i <= 10; i++) {
            pool.execute(new Task());
        }
​
        pool.shutdown();
​
    }
}
class Task implements Runnable{
    
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println("执行中:" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

线程池线程数量

实际工作中使用 sysbench多线程性能测试工具

标签:队列,--,任务,线程,new,ThreadPoolExecutor
From: https://blog.csdn.net/secret010/article/details/137080301

相关文章

  • 创建与启动线程之二(继承Thread类)(实现Runnable接口)
    1.概述java的JVM允许程序运行多个线程.使用java.lang.Thread来表示线程.一个线程都直接或间接的继承于Thread类,即每个线程的对象要么是Thread的实例,要么是其子类的实例.2.Thread类的特性每个线程都是通过某个特定的Thread对象的run方法来完成操作的,run()被称为线程执行体.......
  • java声明不同线程池
    声明一个静态线程池publicstaticfinalExecutorServicePOOL;privatestaticThreadFactorynamedThreadFactory=newThreadFactoryBuilder().setNameFormat("COMMON-POOL-%d").build();static{POOL=newThreadPoolExecutor(20,40,300,......
  • spring+struts 配置和管理线程池
    <!--定义线程池--><beanid="taskExecutor"class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><propertyname="corePoolSize"value="5"/><propertyname="maxPoolSi......
  • linux 线程的一些简答概念
    基本概念1.临界资源:多线程执行流共享的资源2.临界区:访问临界资源的代码3.原子性:只有完成和未完成两种状态。4.互斥:同一时间只能允许一个线程访问临界资源,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。可以加锁实现。加锁可以保证单个线程对临界......
  • Java中创建线程的多种方式实例
    在Java中创建线程的方式有多种,以下是几种常见的创建线程的方式,选择哪种方式取决于具体的需求和设计。需要注意的是,在使用多线程时,要确保线程安全,避免出现并发问题。1、普通Thread创建//继承创建线程staticclassNewThreadextendsThread{@Override......
  • 异步VS多线程
    前段时间面试的时候被问到了异步和多线程的区别,回答的模棱两可,今天花时间系统的了解了下,如有错误,欢迎斧正~异步与多线程是在并发编程中两个比较关键的概念, 很容易弄混淆:异步编程(Asynchronous)概念:异步编程是并发编程的一种形式,即在程序运行逻辑中,一部分语句可以独立于主程序而......
  • C# 线程相关一点杂记
    相信很多人在实际开发中是不愿使用到多线程的,因为一旦引入多线程这个概念,对应功能就需要加很多关于线程的考虑措施,如锁,任务回调顺序等等。有事加了一些对应的措施,还是感觉程序出现偶发的不同问题,这里主要记录一下多线程任务时需要注意的一些毫秒相关的事情。for(inti=0......
  • 线程池的创建方式、七个参数、拒绝策略、阻塞队列、线程池大小设定
    线程池的创建方式1、通过Executors的静态方法可以创建四种类型的线程池。(不推荐)1)FixedThreadPool创建一个固定线程数的线程池,线程数量一直保持固定不变,如果任务提交时,没有空闲线程,那就进入任务队列,当有空闲时间时,再执行任务队列中的任务。2)SingleThreadExecutor线程池中......
  • Java面试必问题18:线程安全的集合类和有序的集合类
         精华篇:  极致精简解释有序的集合类包括:TreeMap-基于红黑树实现的有序Map。LinkedHashMap-基于哈希表和双向链表实现的有序Map。TreeSet-基于红黑树实现的有序Set。LinkedHashSet-基于哈希表和双向链表实现的有序Set。示例:有序Map:TreeMap有序Li......
  • 多任务之进程与线程
    多任务进程与线程一、多任务介绍​ 我们生活中有很多事情是同时进行的,比如开车的时候手和脚共同来驾驶汽车,再比如唱歌跳舞也是同时进行的;用程序来模拟:fromtimeimportsleepdefsing():foriinrange(3):print("正在唱歌...%d"%i)sleep(1)defda......