转载自:https://blog.csdn.net/weixin_48271092/article/details/124419062
线程池
一、线程存在的问题
之前我们已经学习过了线程,线程的创建方式我们讲了7种,感兴趣的小伙伴可以去看看前面的讲解。线程的创建
那么我们学习了那么多关于线程的知识,为什么还要再用到线程池呢?那么我们就要来思考一下,线程有什么问题吗?
- 首先,每次有任务来时就要创建线程,任务结束就要将这个线程销毁,频繁的创建和销毁需要一定的开销。
- 当任务数远远大于线程可以承载的数量之后,不能友好的进行任务拒绝。
因此引出了线程池~~
二、什么是线程池?
线程池是使用池化技术管理和使用线程的一种机制。
池化技术:提前准备一些资源,在需要时可以重复使用使用提前准备的资源。
池化技术的应用也比较多:
- 内存池:预先申请内存,在 使用时提升申请内存的速度,减少内存碎片。
- 数据库连接池:预先申请数据库连接,提升申请连接的速度,降低系统开销。
三、线程池的优点
线程池相比线程来说,它不需要频繁的创建和销毁线程,而是提前将线程创建好休眠在线程池中,有任务需要它时再唤醒它,线程一旦创建就不会销毁,会放入线程池中,如下图:
因此,他有以下几个优点:
- 复用线程,避免线程重复创建和销毁线程的性能开销。
- 控制线程的数量 ,从而避免了因创建线程过多而导致的内存溢出(OOM).
- 提供了任务管理的功能。从而可以实现任务缓存和任务拒绝的情况。
- 线程池提供了更多功能,比如延时任务。
四、线程池的使用
线程池的实现总的来说有两类:
- 通过ThreadPoolExecutor创建的线程池;
- 通过Executors创建的线程池。
线程池的创建方式总共包含以下7种(其中6种是通过Executors 创建的,1种是通过ThreadPoolExecutor创建的):
- Executors.newFixedThreadPool:创建一个固定大小的线程池,可以控制并发的线程数,超出的线程会在队列中等待;
- Executors.newCachedThreadPool:创建一个可缓存的线程池,若线程超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;
- Executors.newScheduledThreadPool:创建一个可以执行延迟任务的线程池;
- Executors.newSingleThreadExecutor:创建单个线程的线程池,它可以保证先进先出的执行顺序
- Executors.newSingleThreadScheduledExecutor:创建一个可以执行延迟任务的单个线程的线程池;
- Executors.newWorkStealingPool:创建一个抢占式执行的线程池;
- ThreadPoolExecutor:手动创建线程池 的方式,它最多包含了7个参数可供设置,最少可设置5个参数。
4.1 固定数量的线程池
4.1.1 具体实现
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 创建一个固定大小的线程池
*/
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//1.创建了一个包含了5个线程的线程池
ExecutorService threadPool= Executors.newFixedThreadPool(5);
//2,使用线程池执行任务二(给线程池添加任务)
for (int i = 0; i < 10; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程名称:"+Thread.currentThread().getName());
}
});
}
}
}
1234567891011121314151617181920212223
public class ThreadPoolDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.创建了一个包含了5个线程的线程池
ExecutorService threadPool= Executors.newFixedThreadPool(5);
//2,使用线程池执行任务(给线程池添加任务)
for (int i = 0; i < 5; i++) {
Future<Integer> result = threadPool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int num=new Random().nextInt(9);
System.out.println("随机数:"+num);
return num;
}
});
System.out.println("线程池的返回结果:"+result.get());
}
}
}
12345678910111213141516171819
观察上面这两段代码,我们可以发现,向线程池中添加任务的方式有两种:
- execute:只能执行不带返回值的任务;
- submit:它可以执行有返回值的任务或者是没有返回值的任务 。
4.1.2 线程工厂
在上面的执行中,可以看到没有设置线程的名称时,它就会是一个默认名 ,线程工厂可以让我们自定义线程名称或优先级
下面来看一下线程工厂的实现吧~
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* 线程工厂的演示
*/
public class ThreadPoolDemo3 {
public static void main(String[] args) {
//1.创建线程工厂
ThreadFactory threadFactory=new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
//设置线程命名规则
thread.setName("我的线程-"+r.hashCode());
//设置线程优先级
thread.setPriority(Thread.MAX_PRIORITY);
return thread;
}
};
ExecutorService threadPool = Executors.newFixedThreadPool(5, threadFactory);
for(int i=0;i<5;i++){
threadPool.submit(()->{
//任务
Thread thread = Thread.currentThread();
System.out.println("线程池开始执行了:"+thread.getName()+" "+thread.getPriority());
});
}
}
}
1234567891011121314151617181920212223242526272829303132
在上面的代码中我们设置了线程的名称和优先级,看看他会生效吗?
答案是肯定的,但是要注意的是我们一定要将任务传入这个线程工厂,否则就会出错。
如果我们什么都不传入,就会是什么都不打印。
4.3 带缓存的线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 带缓存的线程池
*/
public class ThreadPoolDemo4 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
for(int i=0; i<1000; i++){
int finalI=i;
threadPool.submit(()->{
System.out.println("i:"+finalI+"|线程名称:"+Thread.currentThread().getName());
});
}
}
}
123456789101112131415161718
有1000个任务并不会创建1000个线程,通过执行可以发现最多会创建100多个线程,然后进行复用。
4.4 执行定时任务的线程池
4.4.1 延迟执行(一次)
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 创建执行定时任务的线程池
*/
public class ThreadPoolDemo5 {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
System.out.println("添加任务的时间:"+ LocalDateTime.now());
scheduleTest(service);//只执行一次的定时任务
/**
* 只执行一次的定时任务
* @param service
*/
private static void scheduleTest(ScheduledExecutorService service) {
//执行定时任务(延迟3秒执行)
service.schedule(new Runnable() {
@Override
public void run() {
System.out.println("执行了任务:"+LocalDateTime.now());
}
},3, TimeUnit.SECONDS);
}
}
1234567891011121314151617181920212223242526272829
这是延迟执行任务,他只能执行一次。
创建这个线程池时有3个参数:
- 执行任务
- 延迟 n秒执行
- 配合2执行的时间单位
4.4.2 固定频率执行
我们可以让线程以以固定频率间隔n秒执行,创建这个线程池有四个参数:
- 执行任务
- 延迟n秒执行
- 执行定时任务的频率
- 配合3执行的时间单位
4.4.2.1 scheduleAtFixedRate
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 创建执行定时任务的线程池
*/
public class ThreadPoolDemo5 {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
System.out.println("添加任务的时间:"+ LocalDateTime.now());
//2S之后开始执行定时任务,定时任务每4s执行一次
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("执行任务:"+LocalDateTime.now());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},2,4,TimeUnit.SECONDS);
}
}
12345678910111213141516171819202122232425262728
4.4.2.2 scheduleWithFixedDelay
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 创建执行定时任务的线程池
*/
public class ThreadPoolDemo5 {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
System.out.println("添加任务的时间:"+ LocalDateTime.now());
// //2S之后开始执行定时任务,定时任务每4s执行一次
// service.scheduleAtFixedRate(new Runnable() {
// @Override
// public void run() {
// System.out.println("执行任务:"+LocalDateTime.now());
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// },2,4,TimeUnit.SECONDS);
//2s之后开始执行,每次执行间隔4s
service.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("执行任务:"+LocalDateTime.now());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},2,4,TimeUnit.SECONDS);
}
}
123456789101112131415161718192021222324252627282930313233343536373839404142
4.4.2.3 scheduleAtFixedRate VS scheduleWithFixedDelay
看出这两个方法的区别了吗?
scheduleAtFixedRate是以上一次任务开始的时间,加上任务的执行周期,作为下一次定时任务的开始时间。
scheduleWithFixedDelay是以上一次任务的结束时间,加上任务的执行周期,作为下次定时任务的开始时间。
注意:
在scheduleAtFixedRate中,有时任务的执行时间大于延迟任务设定的时间间隔,那么当任务执行完之后才会开始执行下次任务,此时并不是以设定执行周期来执行任务。
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
System.out.println("添加任务的时间:"+ LocalDateTime.now());
// scheduleTest(service);//只执行一次的定时任务
//2S之后开始执行定时任务,定时任务每4s执行一次
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("执行任务:"+LocalDateTime.now());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},2,2,TimeUnit.SECONDS);
12345678910111213141516
4.5 单线程的线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 单个线程的线程池
*/
public class ThreadPoolDemo7 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
int finalI=i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("任务名:"+finalI+"线程名:"+Thread.currentThread().getName());
}
});
}
}
}
123456789101112131415161718192021
那么肯定有同学有疑问了,单线程的线程池与单线程有什么区别呢?他有什么作用呢?
- 首先因为他是线程池,所有它能够提供任务队列和任务管理的功能,防止内存溢出。
- 其次它可以自定义拒绝策略(本文最后会讲到拒绝策略)
4.6 定时任务的单线程线程池
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 创建单线程的执行定时任务的线程池
*/
public class ThreadPoolDemo6 {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
System.out.println("执行任务的时间:"+LocalDateTime.now());
service.schedule(new Runnable() {
@Override
public void run() {
System.out.println("执行任务:"+ LocalDateTime.now());
}
},2, TimeUnit.SECONDS);
}
}
123456789101112131415161718192021
4.7 根据当前CPU生成线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 根据当前设备的配置自动生成线程池
*/
public class ThreadPoolDemo8 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newWorkStealingPool();
for (int i = 0; i < 100; i++) {
int finalI=i;
executorService.submit(()->{
System.out.println("任务:"+finalI+"线程名:"+Thread.currentThread().getName());
});
}
while (!executorService.isTerminated()){};
}
}
12345678910111213141516171819
4.8 ThreadPoolExecutor的使用
4.8.1 Executors自动创建线程池可能存在的问题
根据阿里巴巴Java开发手册规定:
4.8.2 ThreadPoolExecutor使用
参数说明:
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数(包含核心线程数)
- keepAliveTime:空闲线程的存活时间
- TimeUnit:时间单位
- BlockingQueue:任务队列,用于存储线程池的待执行任务的
- ThreadFactory:线程工厂,用于生成线程
- handler:拒绝策略
具体实现:
import java.util.concurrent.*;
/**
* 手动方式创建线程池
*/
public class ThreadPoolDemo10 {
public static void main(String[] args) {
ThreadFactory factory=new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
return thread;
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(2,5,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(2),factory,new ThreadPoolExecutor.DiscardPolicy());
for (int i = 1; i < 6; i++) {
int finalI=i;
executor.submit(()->{
try {
Thread.sleep(10*finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务:"+finalI);
});
}
}
}
123456789101112131415161718192021222324252627282930
4.8.3 线程池的执行流程
关键执行步骤:
- 当线程池中有任务时,先判断线程池是否在运行状态;
- 判断线程池中的线程数是否小于核心线程数,如果小于则添加工作线程并执行;
- 若线程池中的线程数不小于核心线程数,则判断阻塞队列是否已满,如果未满,则添加到阻塞队列,等待工作线程获取执行;
- 若阻塞队列已满,则判断线程数是否小于最大线程数,若小于,则添加工作线程并执行;
- 否则执行拒绝策略。
4.8.4 拒绝策略
线程池的拒绝策略共有5种(4种JDK提供的+1种自定义拒绝策略):
1.AbortPolicy()
提示异常拒绝执行,是默认的拒绝策略:
import java.util.concurrent.*;
/**
* 手动方式创建线程池
*/
public class ThreadPoolDemo10 {
public static void main(String[] args) {
ThreadFactory factory=new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
return thread;
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(2,2,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(2),factory,new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; i++) {
int finalI=i;
executor.submit(()->{
try {
Thread.sleep(10*finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务:"+finalI);
});
}
}
}
123456789101112131415161718192021222324252627282930
2.DiscardPolicy()
丢弃任务但不抛出异常。如果线程数大于最大核心线程数,则后续提及的任务都会被丢弃,且是静默丢弃。
import java.util.concurrent.*;
/**
* 手动方式创建线程池
*/
public class ThreadPoolDemo10 {
public static void main(String[] args) {
ThreadFactory factory=new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
return thread;
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(2,2,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(2),factory,new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 5; i++) {
int finalI=i;
executor.submit(()->{
try {
Thread.sleep(10*finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务:"+finalI);
});
}
}
}
123456789101112131415161718192021222324252627282930
3.DiscardOldestPolicy()
丢弃阻塞队列最前面的任务,然后重新提交被拒绝的任务。
import java.util.concurrent.*;
/**
* 手动方式创建线程池
*/
public class ThreadPoolDemo10 {
public static void main(String[] args) {
ThreadFactory factory=new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
return thread;
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(2,2,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(2),factory,new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 5; i++) {
int finalI=i;
executor.submit(()->{
try {
Thread.sleep(10*finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务:"+finalI);
});
}
}
}
123456789101112131415161718192021222324252627282930
4.CallerRunsPolicy()
如果任务被拒绝了,则由调用线程(提交任务的线程)直接执行此任务。
import java.util.concurrent.*;
/**
* 手动方式创建线程池
*/
public class ThreadPoolDemo10 {
public static void main(String[] args) {
ThreadFactory factory=new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
return thread;
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(2,2,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(2),factory,new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 5; i++) {
int finalI=i;
executor.submit(()->{
try {
Thread.sleep(10*finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务:"+finalI);
});
}
}
}
123456789101112131415161718192021222324252627282930
5.自定义拒绝策略
除了上面四种JDK提供的拒绝策略外,我们还可以自定义拒绝策略,具体实现:
import java.util.concurrent.*;
/**
* 手动方式创建线程池
*/
public class ThreadPoolDemo10 {
public static void main(String[] args) {
ThreadFactory factory=new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
return thread;
}
};
//自定义拒绝策略
ThreadPoolExecutor executor=new ThreadPoolExecutor(2, 2, 10,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(2), factory, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("我执行了自定义拒绝策略");
}
});
for (int i = 0; i < 5; i++) {
int finalI1 = i;
executor.submit(()->{
try {
Thread.sleep(100*finalI1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务名:"+Thread.currentThread().getName());
});
}
}
}
123456789101112131415161718192021222324252627282930313233343536
五、线程池的状态
可以看到线程池的状态共有5种,分别为:
- RUNNING:线程池创建之后的状态,这种状态下可以执行任务;
- SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行结束;
- STOP:该状态下线程池不再接受新任务,并且会中断线程;
- TIDYING:该状态下所有任务都已终止,将会执行terminated方法;
- TEIMINATED:执行完terminated方法之后。
5.1 Shutdown VS ShutdownNow
- shutdown执行时线程池终止接收新任务,并且会将任务队列中的任务处理完;
- shutdoNow执行时线程池终止接收新任务,并且会终止执行任务队列中的任务。