1 什么是线程池
线程池其实就是一种多线程处理形式,把一个或多个线程通过统一的方式进行调度和重复使用的技术,避免了因为线程过多而带来使用上的开销。
2 为什么使用线程池
2.1 场景
for (int i = 0; i < 100; i++) { new Thread(() -> { System.out.println("run thread->" + Thread.currentThread().getName()); userService.updateUserStatus(....) }).start(); }
业务场景就是我们想使用多线程异步的方式去处理某些事情,以提高处理效率。
这段代码是存在问题的
1)创建销毁线程资源消耗; 我们使用线程的目的本是出于效率考虑,可以为了创建这些线程却消耗了额外的时间,资源,对于线程的销毁同样需要系统资源。
2)cpu资源有限,上述代码创建线程过多,造成有的任务不能即时完成,响应时间过长。
3)线程无法管理,无节制地创建线程对于有限的资源来说似乎成了“得不偿失”的一种作用。
采用线程池来管理可以解决这些问题
2.2 线程池的作用
使用线程池最大的原因就是可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力;
- 线程和任务分离,提升线程重用性(线程池分为线程集合和任务队列两部分)
- 控制线程并发数量,降低服务器压力,统一管理所有线程;
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
3 线程池的创建方式
在JDK中rt.jar包下JUC(java.util.concurrent)创建线程池有两种方式:ThreadPoolExecutor 和 Executors,其中 Executors又可以创建 6 种不同的线程池类型
4 ThreadPoolExecutor源码分析
我们要想自定义线程池,必须先了解线程池的工作原理,才能自己定义线程池;
这里我们通过观察java中ThreadPoolExecutor的源码来学习线程池的原理;
4.1 ThreadPoolExecutor部分源码
构造方法: public ThreadPoolExecutor(int corePoolSize, //核心线程数量 int maximumPoolSize,// 最大线程数 long keepAliveTime, // 最大空闲时间 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 任务队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler // 饱和处理机制,也叫拒绝策略 ) { ... }
4.2 饱和处理机制说明
ThreadPoolExecutor 也提供了 4 种默认的拒绝策略:
1)AbortPolicy():丢弃任务并抛出 RejectedExecutionException 异常(默认)。
2)DiscardPolicy():丢弃掉该任务但是不抛出异常,不推荐这种(导致使用者没觉察情况发生)
3)DiscardOldestPolicy():丢弃队列中等待最久的任务,然后把当前任务加入队列中。
4)CallerRunsPolicy():那个线程调用的添加任务,就由那个线程执行
4.3 线程池工作流程图解
5 自定义线程池
5.1 步骤
1)编写任务类(MyTask),实现Runnable接口;
2)编写线程类(MyWorker),用于执行任务,需要持有所有任务;
3)编写线程池类(MyThreadPool),包含提交任务,执行任务的能力;
4)编写测试类(MyTest),创建线程池对象,提交多个任务测试;
5.2 MyTask
package com.itheima.demo01; /* 需求: 自定义线程池练习,这是任务类,需要实现Runnable; 包含任务编号,每一个任务执行时间设计为0.2秒 */ public class MyTask implements Runnable{ private int id; //由于run方法是重写接口中的方法,因此id这个属性初始化可以利用构造方法完成 public MyTask(int id) { this.id = id; } @Override public void run() { String name = Thread.currentThread().getName(); System.out.println("线程:"+name+" 即将执行任务:"+id); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程:"+name+" 完成了任务:"+id); } @Override public String toString() { return "MyTask{" + "id=" + id + '}'; } }
5.3 MyWorker
package com.itheima.demo01; import java.util.List; /* 需求: 编写一个线程类,需要继承Thread类,设计一个属性,用于保存线程的名字; 设计一个集合,用于保存所有的任务; */ public class MyWorker extends Thread{ private String name;//保存线程的名字 private List<Runnable> tasks; //利用构造方法,给成员变量赋值 public MyWorker(String name, List<Runnable> tasks) { super(name); this.tasks = tasks; } @Override public void run() { //判断集合中是否有任务,只要有,就一直执行任务 while (tasks.size()>0){ Runnable r = tasks.remove(0); r.run(); } } }
5.4 MyThreadPool
package com.itheima.demo01; import java.util.Collections; import java.util.LinkedList; import java.util.List; /* 这是自定义的线程池类; 成员变量: 1:任务队列 集合 需要控制线程安全问题 2:当前线程数量 3:核心线程数量 4:最大线程数量 5:任务队列的长度 成员方法 1:提交任务; 将任务添加到集合中,需要判断是否超出了任务总长度 2:执行任务; 判断当前线程的数量,决定创建核心线程还是非核心线程 */ public class MyThreadPool { // 1:任务队列 集合 需要控制线程安全问题 private List<Runnable> tasks = Collections.synchronizedList(new LinkedList<>()); //2:当前线程数量 private int num; //3:核心线程数量 private int corePoolSize; //4:最大线程数量 private int maxSize; //5:任务队列的长度 private int workSize; public MyThreadPool(int corePoolSize, int maxSize, int workSize) { this.corePoolSize = corePoolSize; this.maxSize = maxSize; this.workSize = workSize; } //1:提交任务; public void submit(Runnable r){ //判断当前集合中任务的数量,是否超出了最大任务数量 if(tasks.size()>=workSize){ System.out.println("任务:"+r+"被丢弃了..."); }else { tasks.add(r); //执行任务 execTask(r); } } //2:执行任务; private void execTask(Runnable r) { //判断当前线程池中的线程总数量,是否超出了核心数, if(num < corePoolSize){ new MyWorker("核心线程:"+num,tasks).start(); num++; }else if(num < maxSize){ new MyWorker("非核心线程:"+num,tasks).start(); num++; }else { System.out.println("任务:"+r+" 被缓存了..."); } } }
1)核心线程数(corePoolSize)
核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定,例如:执行一个任务需要0.1秒,系统百分之80的时间每秒都会产生100个任务,那么要想在1秒内处理完这100个任务,就需要10个线程,此时我们就可以设计核心线程数为10;当然实际情况不可能这么平均,所以我们一般按照8020原则设计即可,既按照百分之80的情况设计核心线程数,剩下的百分之20可以利用最大线程数处理;
2)任务队列长度(workQueue)
任务队列长度一般设计为:核心线程数/单个任务执行时间*2即可;例如上面的场景中,核心线程数设计为10,单个任务执行时间为0.1秒,则队列长度可以设计为200;
3)最大线程数(maximumPoolSize)
最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定:例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么,最大线程数=(最大任务数-任务队列长度)*单个任务执行时间;既: 最大线程数=(1000-200)*0.1=80个;
4)最大空闲时间(keepAliveTime)
这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可;
5.5 MyTest
package com.itheima.demo01; /* 测试类: 1: 创建线程池类对象; 2: 提交多个任务 */ public class MyTest { public static void main(String[] args) { //1:创建线程池类对象; MyThreadPool pool = new MyThreadPool(2,4,20); //2: 提交多个任务 for (int i = 0; i <30 ; i++) { //3:创建任务对象,并提交给线程池 MyTask my = new MyTask(i); pool.submit(my); } } }
6 Java内置线程池-ExecutorService
6.1 ExecutorService的获取
获取ExecutorService可以利用JDK中的Executors 类中的静态方法,常用获取方式如下: static ExecutorService newCachedThreadPool() 创建一个默认的线程池对象,里面的线程可重用,且在第一次使用时才创建 static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) 线程池中的所有线程都使用ThreadFactory来创建,这样的线程无需手动启动,自动执行; static ExecutorService newFixedThreadPool(int nThreads) 创建一个可重用固定线程数的线程池 static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建。 static ExecutorService newSingleThreadExecutor() 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) 创建一个使用单个 worker 线程的 Executor,且线程池中的所有线程都使用ThreadFactory来创建。
6.2 ExecutorService的常用方法
ExecutorService接口是java内置的线程池接口,通过学习接口中的方法,可以快速的掌握java内置线程池的基本使用 常用方法: void shutdown() 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。 List<Runnable> shutdownNow() 停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 <T> Future<T> submit(Callable<T> task) 执行带返回值的任务,返回一个Future对象。 Future<?> submit(Runnable task) 执行 Runnable 任务,并返回一个表示该任务的 Future。 <T> Future<T> submit(Runnable task, T result) 执行 Runnable 任务,并返回一个表示该任务的 Future。
7 Java内置线程池ScheduledExecutorService
ScheduledExecutorService是ExecutorService的子接口,具备了延迟运行定期执行任务的能力
7.1 ScheduledExecutorService的获取
常用获取方式如下: static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个可重用固定线程数的线程池且允许延迟运行或定期执行任务; static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) 创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建,且允许延迟运行或定期执行任务; static ScheduledExecutorService newSingleThreadScheduledExecutor() 创建一个单线程执行程序,它允许在给定延迟后运行命令或者定期地执行。 static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
7.2 ScheduledExecutorService的常用方法
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 延迟时间单位是unit,数量是delay的时间后执行callable。 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) 延迟时间单位是unit,数量是delay的时间后执行command。 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 延迟时间单位是unit,数量是initialDelay的时间后,每间隔period时间重复执行一次command。 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
8 常见问题
1) ThreadPoolExecutor 有哪些常用的方法?
submit()/execute():执行线程池 shutdown()/shutdownNow():终止线程池 isShutdown():判断线程是否终止 getActiveCount():正在运行的线程数 getCorePoolSize():获取核心线程数 getMaximumPoolSize():获取最大线程数 getQueue():获取线程池中的任务队列 allowCoreThreadTimeOut(boolean):设置空闲时是否回收核心线程
2)submit和 execute的区别
submit() 和 execute() 都是用来执行线程池的,只不过使用 execute() 执行线程池不能有返回方法,而使用 submit() 可以使用 Future 接收线程池执行的返回值。
3)shutdownNow() 和 shutdown() 两个方法有什么区别?
shutdownNow() 和 shutdown() 都是用来终止线程池的 使用 shutdown() 程序不会报错,也不会立即终止线程,它会等待线程池中的缓存任务执行完之后再退出,执行了 shutdown() 之后就不能给线程池添加新任务了; shutdownNow() 会试图立马停止任务,如果线程池中还有缓存任务正在执行,则会抛出 java.lang.InterruptedException: sleep interrupted 异常。
4) 线程池中核心线程数量大小怎么设置?
CPU密集型任务:比如像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,大部分场景下都是纯 CPU 计算。 尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。 IO密集型任务: 比如像 MySQL 数据库、文件的读写、网络通信等任务,这类任务不会特别消耗 CPU 资源,但是 IO 操作比较耗时,会占用比较多时间。 可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。 另外:线程的平均工作时间所占比例越高,就需要越少的线程;线程的平均等待时间所占比例越高,就需要越多的线程; 以上只是理论值,实际项目中建议在本地或者测试环境进行多次调优,找到相对理想的值大小。
5)线程池为什么需要使用阻塞队列
因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换。创建线程池的消耗较高。
6)线程池为什么要使用阻塞队列而不使用非阻塞队列
阻塞队列可以保证任务队列中没有任务时,阻塞获取任务的线程,线程进入wait状态,释放cpu资源。
当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。 使得在线程不至于一直占用cpu资源。 线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如下 while (task != null || (task = getTask()) != null) {}
7)了解线程池状态吗
通过获取线程池状态,可以判断线程池是否是运行状态、可否添加新的任务以及优雅地关闭线程池等。 RUNNING: 线程池的初始化状态,可以添加待执行的任务。 SHUTDOWN:线程池处于待关闭状态,不接收新任务仅处理已经接收的任务。 STOP:线程池立即关闭,不接收新的任务,放弃缓存队列中的任务并且中断正在处理的任务。 TIDYING:线程池自主整理状态,调用 terminated() 方法进行线程池整理。 TERMINATED:线程池终止状态。
8)知道线程池中线程复用原理吗?
线程池将线程和任务进行解耦,线程是线程,任务是任务,摆脱了之前通过 Thread 创建线程时的一个线程必须对应一个任务的限制。 在线程池中,同一个线程可以从阻塞队列中不断获取新任务来执行,其核心原理在于线程池对 Thread 进行了封装
不是每次执行任务都会调用 Thread.start()来创建新线程,而是让每个线程去执行一个“循环任务” 在这个“循环任务”中不停的检查是否有任务需要执行,如果有则直接执行,也就是调用任务中的 run 方法
将 run 方法当成一个普通的方法执行,通过这种方式只使用固定的线程就将所有任务的 run 方法串联起来。
9) 线程池参数设置
(1)corePoolSize
核心线程数 这个应该是最重要的参数,核心线程会一直存活,及时没有任务需要执行。 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理。 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。
如何设置好需要根据项目业务是CPU密集型和IO密集型的区别
CPU密集型 CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading 很高。 在多重程序系统中,大部分时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部分时间用在三角函数和开根号的计算,便是属于CPU bound的程序。 CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。
IO密集型 IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。 I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。
先看下机器的CPU核数,然后在设定具体参数: 自己测一下自己机器的核数 System.out.println(Runtime.getRuntime().availableProcessors()); 即CPU核数 = Runtime.getRuntime().availableProcessors()
分析下线程池处理的程序是CPU密集型还是IO密集型 CPU密集型:corePoolSize = CPU核数 + 1 IO密集型:corePoolSize = CPU核数 * 2
(2)maximumPoolSize
最大线程数 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务。 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常。
(3)keepAliveTime
线程空闲时间 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize。 如果allowCoreThreadTimeout=true,则会直到线程数量=0。
(4)queueCapacity
任务队列(阻塞队列) 当核心线程数达到最大时,新任务会放在队列中排队等待执行
无界队列; 队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列作为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。 阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而博主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。 当然这种队列,maximumPoolSize 的值也就无效了。 当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。 这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界队列; 当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。 常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。 PriorityBlockingQueue中的优先级由任务的Comparator决定。 使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
同步移交队列;如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。 SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。 只有在使用无界线程池或者有饱和策略时才建议使用该队列。
(5)allowCoreThreadTimeout
允许核心线程超时
(6)rejectedExecutionHandler
任务拒绝处理器
10)线程池默认值
corePoolSize = 1 maxPoolSize = Integer.MAX_VALUE queueCapacity = Integer.MAX_VALUE keepAliveTime = 60s allowCoreThreadTimeout = false rejectedExecutionHandler = AbortPolicy()
标签:队列,18,基础知识,任务,线程,创建,执行,CPU From: https://www.cnblogs.com/jthr/p/16735301.html