Gulimall
一、异步&线程池【ExecutorService】
异步场景:为了节约时间和提高系统吞吐量 做一些异步任务。
异步在java中一般是使用 Thread 开启一个线程的方式;
一、线程池
线程池(Thread Pool),是一种基于池化思想的管理线程的工具,可以实现线程的复用,避免线程使用中频繁创建和销毁所带来的资源消耗。
- 使用线程池的优点
- 重用线程池中的线程,避免频繁地创建和销毁线程带来的性能消耗 (降低资源的消耗)
- 有效控制线程的最大并发数量,防止线程过大导致抢占资源造成系统阻塞 ( 提高响应速度 )
- 提高线程的可管理性,可以对线程进行一定的管理 (提高线程的可管理性)
1.1 初始化线程的 4 种方式
1)、继承 Thread
2)、实现 Runnable 接口
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池
方式 1 和 方式 2:主进程无法获取线程的运算结果。不适合当前场景
方式 3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。
方式 4:通过如下两种方式初始化线程池
Executors.newFiexedThreadPool(3);
或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit,
workQueue, threadFactory, handler);
区别:1)有无返回值:继承 Thread、实现 Runnable 接口不能得到返回值,实现 Callable 接口 + FutureTask可以获取 返回值。
2)是否能控制资源:继承 Thread、实现 Runnable 接口、实现 Callable 接口 + FutureTask都不能控制资源。但 是线程池能够控制资源,且性能稳定。
通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。
package com.thread;
import java.util.concurrent.*;
public class ThreadTest {
// 线程中10个空闲着
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start....");
/**
* 1.1 初始化线程的 4 种方式
* 1)、继承 Thread
* Thread01 thread = new Thread01();
* thread.start();//启动线程
*
* 2)、实现 Runnable 接口
* Runnable01 runnable01 = new Runnable01();
new Thread(runnable01).start();
*
*
* 3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
*
* FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
* new Thread(futureTask).start();
*
* //阻塞等待我们整个线程执行完成,获取返回结果
* Integer integer = futureTask.get();
*
* 4)、线程池【ExecutorService】
* 给线程池直接提交任务。
* executorService.execute(new Runnable01());
*
*
* 区别:1)有无返回值:继承 Thread、实现 Runnable 接口不能得到返回值,
实现 Callable 接口 + FutureTask可以获取返回值。
2)是否能控制资源:继承 Thread、实现 Runnable 接口、实现 Callable 接口 + FutureTask
都不能控制资源。但是线程池能够控制资源,且性能稳定。
*/
//我们以后在业务代码中,以上三种启动线程的方式都不用。即不再new Thread().start();
//【 将所有的异步任务都交给线程池执行 】
// new Thread(()-> System.out.println("hello")).start();
//当前系统中线程池只有一两个,每个异步任务提交给线程池,让异步任务自己去执行
// execute方法为 void ,没有返回值
// 执行
executorService.execute(new Runnable01());
System.out.println("main....end....");
}
public static class Thread01 extends Thread{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i =10/2;
System.out.println("运行结果:"+ i);
}
}
public static class Runnable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i =10/2;
System.out.println("运行结果:"+ i);
}
}
public static class Callable01 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i =10/2;
System.out.println("运行结果:"+ i);
return i;
}
}
}
1.2 创建线程池(ExecutorService)
1. Executors 工具类创建
//当前系统中池只有一两个,每一个异步任务直接提交给线程池,让他自己去执行
ExecutorService service = Executors.newFixedThreadPool(10);
//执行
service.execute(new Runnable01());
2. 原生方法创建线程池
ThreadPoolExecutor需要传入七大参数
-
**corePoolSize **:
核心线程数【一直存在,除非设置了允许线程超时的设置:allowCoreThreadTimeOut】,保留在池中的线程数,线程池创建后好后就准备就绪的线程数,就等待异步任务去执行,new 好了 Thread,等待异步任务
-
maximumPoolSize
池中最大线程数量,控制资源并发
线程池允许创建的最大线程数量
当添加一个任务时,核心线程数已满,工作队列已满的情况下,线程池还没达到最大线程数,并且没有空闲线程,创建一个新线程并执行
最大线程数=核心线程+非核心线程。
-
keepAliveTime
空闲线程存活时间,当前正在运行的线程数量大于核心线程数,就会释放空闲的线程,只要线程空闲大于指定存活时间,释放的线程是指最大的线程数量减去核心线程数 (**maximumPoolSize **- corePoolSize)。
当一个可被回收的线程的空闲时间大于
keepAliveTime
,就会被回收可被回收的线程:
- 设置
allowCoreThreadTimeout = true
的核心线程 - 大于核心线程数的线程(非核心线程)
- 设置
-
unit
时间单位
keepAliveTime 的时间单位:
TimeUnit.NANOSECONDS // 纳秒
TimeUnit.MICROSECONDS // 微秒
TimeUnit.MILLISECONDS // 毫秒
TimeUnit.SECONDS // 秒
TimeUnit.MINUTES // 分钟
TimeUnit.HOURS // 小时
TimeUnit.DAYS // 天 -
BlockingQueue workQueue (工作队列)
线程阻塞队列,如果任务有很多,就会将目前多的队伍放在队列里面,只要有空闲的线程,就会去队列里面取出新的任务继续执行。
- new LinkedBlockingQueue<>() 默认值是Integer的最大值,会导致内存不够,一定要传入业务定制的大小,可以通过压测得出峰值
- 作用:存放待执行任务的队列。当提交的任务数超过核心线程数大小后,再提交的任务就存放在工作队列,任务调度时再从队列中取出任务。它仅仅用来存放被 execute() 方法提交的 Runnable 任务。工作队列实现了 BlockingQueue 接口
deque为双端队列
JDK默认的工作队列有五种:
- ArrayBlockingQueue
数组型阻塞队列。数组结构,初始化时传入大小(有界),FIFO(先进先出策略)。使用一个重入锁(ReentrantLock),默认使用非公平锁,入队和出队共用一个锁,互斥
final ReentrantLock lock = this.lock;
- LinkedBlockingQueue
链表型阻塞队列。链表结构,默认初始化大小为Integer.MAX_VALUE,有界(近似无界),FIFO(先进先出策略)。使用两个重入锁分别控制元素的入队和出队,用 Condition 进行线程间的唤醒和等待
// 初始化构造,也有自定义大小capacity参数 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 两把锁 /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
- SynchronousQueue
同步移交队列。容量为0,添加任务必须等待取出任务,这个队列相当于通道,不存储元素
- PriorityBlockingQueue
优先级阻塞队列。无界,在 put 的时候会tryGrow,要说它有界也没问题,因为界是 Integer.MAX_VALUE,但其实上这个队列应该是无界的。默认采用元素自然顺序升序排列(可以自定义Comparator)。使用一个重入锁分别控制元素的入队和出队
/** * Default array capacity. 默认初始化大小 11 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /** * Lock used for all public operations */ private final ReentrantLock lock; /** * Condition for blocking when empty */ private final Condition notEmpty;
- DelayQueue
延时队列。无界,队列中的元素有过期时间,过期的元素才能被取出。使用一个重入锁分别控制元素的入队和出队,用 Condition 进行线程间的唤醒和等待。任务调度时候可以使用
private final transient ReentrantLock lock = new ReentrantLock(); private final Condition available = lock.newCondition();
-
threadFactory
线程的创建工厂,(所有的线程创建都是由指定的 factory 创建的),可以设定线程名、线程编号等
默认创建的线程工厂,通过
Executors.defaultThreadFactory()
获取 -
handler
拒绝策略;如果队列满了,按照我们指定的拒绝策略拒绝执行任务
当前线程数 大于 最大线程数( maximumPoolSize )执行拒绝策略
被线程池拒绝的任务将交由RejectedExecutionHandler类来处理。
RejectedExecutionHandler提供了四种任务拒绝策略:
- **AbortPolicy(默认):**当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
- **CallerRunsPolicy:**当任务添加到线程池中被拒绝时,会在线程池当前正在运行的调用线程中处理被拒绝的任 务。(即不用线程池中的线程执行,而是交给调用方的线程来执行)
- **DiscardOldestPolicy:**当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
- **DiscardPolicy:**当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
1.3 ThreadPoolExecutor的运行流程遵循这样的规则:
- 当线程池中的核心线程数量未达到最大线程数时,启动一个核心线程去执行任务;
- 如果线程池中的核心线程数量达到最大线程数时,那么任务会被插入到任务队列中排队等待执行;
- 如果在上一步骤中任务队列已满,但是线程池中线程数量未达到 (max最大线程数) 限定线程总数,那么启动一个非核心线程来处理任务;
- 如果上一步骤中线程数量达到了 (max最大线程数) 限定线程总量,那么线程池则拒绝执行该任务,且ThreadPoolExecutor会调用RejectedtionHandler 的 rejectedExecution方法来通知调用者。
简单地说,线程池运行调度的优先顺序为:核心线程—>阻塞队列—>非核心线程—>拒绝策略。下面这个流程图。
工作顺序:
以上通俗来讲:
1)、线程池创建,准备好core数量的核心线程数,准备接受任务
1.1 、core 满了,就将再进来的任务放入阻塞队列中,空闲的core就会自己去阻塞队列获取任务执行
1.2、阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量
1.3、max 满了就用 RejectedExecutionHandler 拒绝任务
1.4、max 都执行完成,有很多空闲线程,在指定的时间 keepAliveTime 以后,释放max-core这些线程
1.4 常见的 4 种线程池
-
Executors.newCacheThreadPool():
可缓存线程池,先查看线程池中有没有以前建立的线程,如果有直接使用,没有则新建一个新的线程加入池中,缓存线程池通常用于执行一些生存期很短的异步型任务。线程池无限大,当执行当前任务时上一个任务已经完成,会复用执行上一个任务的线程,而不是每一次新建
core 是 0 ,所有都可回收
-
Excetors.newFixedThreadPool(int n):
创建一个可重用的固定数的线程池,以共享无界队列方式来运行这些线程
固定大小、core = max ;都不可回收
-
Executors.newScheduledThreadPool(int n) :
创建一个定长线程池,支持定时及周期性任务执行
定时任务的线程池
-
Executors.newSingleThreadExecutor()
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证按照指定顺序(FIFO,LIFO,优先级)执行。
单线程的线程池,后台从队列里面中获取任务,挨个执行
面试题:
一个线程池 core :7 、max:20、queue:50,当100个并发进来怎么分配?
7 个会立即执行、50个进入阻塞队列、再开 13个 非核心线程进行执行。剩下的30个就使用拒绝策略。
如果不想抛弃还要执行。CallerRunsPolicy策略;
二、CompletableFuture (异步编排)
CompletableFuture是JDK8的新特性之一,是异步调用相关的API,用于简化异步调用,提高异步调用的效率
CompletableFuture 是一个在 [Java 8](https://m.baidu.com/s?word=Java 8&sa=re_dqa_zy) 中引入的新增异步编程工具类。
异步编排是一种编程模式,它允许程序在执行某些任务时,同时执行其他任务,从而提高程序的效率和响应速度。线程池是一种常见的实现异步编排的技术,它可以管理一组线程,将任务分配给这些线程来执行,从而避免了频繁创建和销毁线程的开销。
异步编排解决什么问题?
异步编排主要用于解决传统同步编程中遇到的问题。在传统的同步编程中,程序执行的顺序通常是固定的,即一旦开始执行某项操作,直到该操作完成之前,程序会一直等待并无法进行其他操作。这可能导致在处理大量并发请求时,程序容易陷入阻塞状态,影响效率并可能导致服务崩溃。
异步编排的核心思想是通过将耗时操作放入独立的线程中,使得主线程能够在这些操作完成后继续执行其他任务,从而允许并发处理和异步执行。这种方式可以在等待异步操作的同时处理其他请求,从而提高了程序的并发处理能力和响应速度。
异步编排的应用场景包括但不限于网络通讯、数据库操作以及图形用户界面(GUI)交互。在网络通信中,异步编排允许程序在等待数据传输的同时执行其他操作;在数据库操作中,它可以避免程序因为等待长时间的查询或事务处理而阻塞;对于GUI操作,异步编排有助于改善用户体验,如减少因等待对话框关闭或其他GUI更新导致的卡顿现象。
Java中的异步编程可以通过多种方式实现,包括利用内置的线程池功能和使用各种框架。此外,合理的线程设计和管理工作也是保证程序正确性和可维护性的重要环节。
总结来说,异步编排的主要优点是可以大幅提升程序的并发处理能力,特别是在处理大量并发请求时,能够使程序在等待某些操作的同时,继续处理其他请求,从而提高响应速度和吞吐量,适用于现代高并发和高性能的网络应用程序开发。
2.1 创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作。
- runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
- 可以传入自定义的线程池,否则就用默认的线程池;
2.2 计算完成时回调方法
方法完成后的感知
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executorService).whenComplete((res,ex)->{
System.out.println("异步任务成功完成了...结果是:"+res+";异常是:"+ex);
}).exceptionally(throwable -> {
//感知异常,并且可以指定返回默认值;
return 10;
});
whenComplete 和 whenCompleteAsync 的区别:
-
whenComplete:让执行当前任务的线程继续执行 whenComplete 的任务。
-
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
2.3 handle 方法
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
/**
* 方法执行完成后的处理 handle(最终处理)
*/
System.out.println("main....start....");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executorService).handle((res,throwable)->{
if(res != null){
return res*2;
}
if(throwable != null){
return 0;
}
return 0;
});
Integer integer = future.get();
System.out.println("main....end...." +integer);
//int i = 10 / 2; 无异常时 输出结果如下
main....start....
当前线程:12
运行结果:5
main....end....10
// int i = 10 / 0; 出现异常时,输出结果如下
main....start....
当前线程:12
main....end....0
2.4 线程串行化方法
- thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
- thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
- thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作。没有返回值。
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成
小结:
XXXApply 有返回值,XXXRun、 XXXAccept无返回值;
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
2.5 两任务组合 - 都要完成
两个任务必须都完成,触发该任务。
- thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值。
- thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
- runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。没有返回值
thenCombineAsync
/** * 两个都完成 */ System.out.println("main....start...."); CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("任务1结束:" ); return i; }, executorService); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程:" + Thread.currentThread().getId()); System.out.println("任务2结束:" ); return "Hello"; }, executorService); CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> { return f1 + ":" + f2 + "->HAHA"; }, executorService); System.out.println("main....end...."+future.get() );
输出结果 main....start.... 任务1线程:12 任务1结束: 任务2线程:13 任务2结束: main....end....2:Hello->HAHA
thenAcceptBothAsync
/** * 两个都完成 */ CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("任务1结束:" ); return i; }, executorService); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程:" + Thread.currentThread().getId()); System.out.println("任务2结束:" ); return "Hello"; }, executorService); future01.thenAcceptBothAsync(future02,(f1,f2)->{ System.out.println("任务3开始...之前的结果:"+ f1 +"-->" + f2); },executorService); System.out.println("main....end...." );
输出结果: main....start.... 任务1线程:12 任务1结束: 任务2线程:13 任务2结束: main....end.... 任务3开始...之前的结果:2-->Hello
runAfterBothAsync
/** * 两个都完成 */ System.out.println("main....start...."); CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("任务1结束:" ); return i; }, executorService); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程:" + Thread.currentThread().getId()); System.out.println("任务2结束:" ); return "Hello"; }, executorService); future01.runAfterBothAsync(future02,()->{ System.out.println("任务3开始..."); },executorService); System.out.println("main....end...." );
输出结果: main....start.... 任务1线程:12 任务1结束: 任务2线程:13 任务2结束: main....end.... 任务3开始...
2.6 两任务组合 - 一个完成
当两个任务中,任意一个 future 任务完成的时候,执行任务。
- applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
- acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
- runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。
2.7 多任务组合
- allOf:等待所有任务完成。
- anyOf:只要有一个任务完成。
allOf : 等待所有future任务都完成,才可以做接下来的事。无返回值
System.out.println("main....start...."); CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品的图片信息"); return "hello.jpg"; },executorService); CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品的属性"); return "黑色+256G"; },executorService); CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> { // 模拟业务响应时间不一致 try { Thread.sleep(3000); System.out.println("查询商品的介绍"); } catch (InterruptedException e) { e.printStackTrace(); } return "华为"; },executorService); CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc); allOf.get();//等待所有结果完成 System.out.println("main....end...."+futureImg.get()+"->"+futureAttr.get()+"->"+futureDesc.get());
输出结果 main....start.... 查询商品的图片信息 查询商品的属性 查询商品的介绍 main....end....hello.jpg->黑色+256G->华为
anyOf : 任意一个任务完成,就可以做接下来的事。返回object
System.out.println("main....start...."); CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品的图片信息"); return "hello.jpg"; },executorService); CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品的属性"); return "黑色+256G"; },executorService); CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> { // 模拟业务响应时间不一致 try { Thread.sleep(3000); System.out.println("查询商品的介绍"); } catch (InterruptedException e) { e.printStackTrace(); } return "华为"; },executorService); CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc); anyOf.get(); System.out.println("main....end...."+anyOf.get());
main....start.... 查询商品的图片信息 查询商品的属性 main....end....hello.jpg 查询商品的介绍
futureAttr = CompletableFuture.supplyAsync(() -> {
标签:....,System,任务,CompletableFuture,println,线程,ExecutorService,out From: https://blog.csdn.net/ylm1205625299/article/details/143193225System.out.println("查询商品的属性"); return "黑色+256G"; },executorService); CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> { // 模拟业务响应时间不一致 try { Thread.sleep(3000); System.out.println("查询商品的介绍"); } catch (InterruptedException e) { e.printStackTrace(); } return "华为"; },executorService); CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc); anyOf.get(); System.out.println("main....end...."+anyOf.get());
main....start.... 查询商品的图片信息 查询商品的属性 main....end....hello.jpg 查询商品的介绍