1、线程与线程池
我们可以使用new Thread() 的方式创建一个线程,一个线程的生命周期如下图所示,通过这种方式创建线程会造成消耗大量的资源,同时也无法进行线程的管理。所以在实际开发生产过程中,遇到异步或者并发场景多采用线程池的方案进行业务。
2、线程池的创建及工作流程
对比与new Thread()方式创建线程,线程池的优点主要有以下几点:
- 降低资源的消耗:线程可以重复使用,不需要在创建线程和消耗线程上浪费资源
- 提高响应速度:任务到达时,线程可以复用已有的线程,及时响应
- 可管理性:无限制的创建线程会降低系统效率,线程池可以对线程进行管理、监控、调优
使用ThreadPoolExecutor
创建一个线程池,ThreadPoolExecutor
有四个构造函数,下面是全参数的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数名 | 含义 | 解释 |
corePoolSize | 线程池核心线程数 | 默认情况下,线程池中是没有线程的,当还没有一次任务到达过时,初始化的线程数为0,当有任务初次来临,直接创建corePoolSize个线程;核心线程生命周期无限,即使空闲也不会死亡。 |
maximumPoolSize | 线程池能创建的最大线程数 | 当核心线程数已满,并且工作队列也已经存放满,才会去判断当前线程数是否小于maximumPoolSize,小于则继续创建线程处理任务,等于则执行拒绝策略。 |
keepAliveTime | 闲置超时时间 | 当线程池中的线程数大于corePoolSize时,此值才生效,即大于corePoolSize的线程在经过keepAliveTime的时间依然没有任务执行,则销毁线程。 |
unit | 超时时间单位 | TimeUnit常量 |
workQueue | 工作队列 | 当核心线程数已满时,新提交的任务放到任务队列中(前提是任务队列没满)。 |
threadFactory | 线程池创建新线程的工厂 | 创建线程,一般默认即可。 |
handler | 线程池达到饱和之后的拒绝策略 | 当线程数达到最大线程maximumPoolSize后(此时队列已经存满),再有新任务提交,执行的处理策略。 |
线程池的工作流程如下图
通过一个demo演示线程池的应用
package com.normal.notes.thead;
import java.util.concurrent.*;
/**
* 线程 在日常生产中不会以new Thread()的形式出现,这种显示表达引发的问题有以下三点
* 1、OOM OutOfMemery 内存溢出,频繁的创建销毁线程是
* 2、资源开销浪费耗时
* 3、不可管理性
* 所以生产过程中如果需要用到多线程,一般以线程池的方式出现。
*
*/
public class ThreadTest {
public static void main(String[] args) {
//初始化一个线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
3,
6,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5)
);
// 使用execute() 提交任务
poolExecutor.execute(()-> System.out.println("task1:"+Thread.currentThread().getName()));
poolExecutor.execute(()-> System.out.println("task2:"+Thread.currentThread().getName()));
poolExecutor.execute(()-> System.out.println("task3:"+Thread.currentThread().getName()));
poolExecutor.execute(()-> System.out.println("task4:"+Thread.currentThread().getName()));
poolExecutor.execute(()-> System.out.println("task5:"+Thread.currentThread().getName()));
poolExecutor.execute(()-> System.out.println("task6:"+Thread.currentThread().getName()));
poolExecutor.execute(()-> System.out.println("task7:"+Thread.currentThread().getName()));
// 关闭线程池
poolExecutor.shutdown();
}
}
执行结果为:核心线程数为3,且可以重复使用,在核心线程池有空闲的情况下,不会创建临时线程
3、工作队列(workQueue)参数详解
workQueque决定了缓存任务的排队策略,不同的]业务场景,我们可以选择不同的工作队列。类型为BlockingQueue。常用队列如下所示。
队列实现类 | 说明 |
ArrayBlockingQueue | 有界的队列,创建的时候必须要指定队列的大小 |
LinkedBlockingQueue | 默认情况下为无界的任务队列,默认值是Integer.MAX_VALUE,也可以指定队列的大小 |
SynchronousQueue | 无容量,直接提交队列,如果无可用线程会执行拒绝策略。使用SynchronousQueue阻塞队列一般要求maximumPoolSize为无界(无限大) |
DelayQueue | 延迟队列,无界队列中每个元素都有过期时间,当从队列获取元素时,只有过期的元素才会出队,而队列头部是最早过期的元素,若是没有过期,则进行等待。 典型场景:订单过期未支付自动取消 |
4、拒绝策略(handler)
触发任务拒接的条件:当前同时运行的线程数量达到最大线程数maximumPoolSize,并且队列也放满了任务,即触发饱和拒绝策略。ThreadPoolExecutor中定义了四个拒绝策略内部类。
拒绝策略 | 说明 |
DiscardPolicy | 当任务添加到线程池中被拒绝时,直接丢弃任务,不抛出异常 |
AbortPolicy | 当任务添加到线程池中被拒绝时,直接丢弃任务,并抛出RejectedExecutionException异常 |
DiscardOldestPolicy | 当任务添加到线程池中被拒绝时,判断如果线程池还在运行,则获取队列,让队首的元素出队直接抛弃,把当前任务添加执行。 |
CallerRunsPolicy | 当任务添加到线程池中被拒绝时,判断线程池是否还在运行,直接在主线程中运行此任务,即在调用execute或者submit的方法中执行,不再使用线程池来处理此任务。 |
demo演示CallerRunsPolicy策略
public static void main(String[] args) {
//初始化一个线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,//核心线程4个
2,//最大线程4个
10,//存活时间10s
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),//队列长度2
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 创建8个任务,每个任务休眠十秒,两个占用核心线程,两个占用队列排队,必然出现一个拒绝
for (int i = 1; i <= 5; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10);
System.out.println("执行当前任务的线程:" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 使用execute() 提交任务
poolExecutor.execute(runnable);
}
}
执行结果:同时执行5个任务,两个占队列,两个占核心线程,一个被拒绝根据策略,直接调用住线程处理。
5、提交任务的两种方式:
由Executor提供的executor()和ExecutorService提供的submit()
//execute没有返回值
public void execute(Runnable command) {}
//submit使用泛型来接收返回值
<T> Future<T> submit(Callable<T> task);
日常生产中常用的submit提交任务,有三个方法:
方法名 | 返回值类型 | 描述 |
submit(Runnable) | Future<?> | 提交Runnable任务 |
submit(Runnable,T) | Future<T> | 提交Runnable任务并指定执行结果 |
submit(Callable<T>) | Future<T> | 提交Callable任务 |
submit提交任务demo
// submit(Callable<T>) 提交任务 ,返回值Future<T>
for (int i = 1; i <= 5; i++) {
Callable<String> call = new Callable() {
@Override
public String call() {
try {
Thread.sleep(10);
System.out.println("执行当前任务的线程:" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "submit";
}
};
Future<String> submit = poolExecutor.submit(call);
}
6、线程池状态监控
ThreadPoolExecutor
提供个公共方法来获取线程池的指标信息,在生产过程中通常使用监控系统框架对线程池的状态进行监控,如springboot中的Spring Boot Actuator
这里我们通过日志打印的形式监控线程池的状态信息。
- 创建一个任务,用于线程池提交任务用
package com.normal.notes.thead;
import lombok.extern.slf4j.Slf4j;
/**
* 定义一个int类型的变量timeout,表示任务执行时长,重载构造方法用于初始化timeout,
* 任务内容是使当前线程休眠,以此来模拟任务执行时长。
*/
@Slf4j
public class Task implements Runnable{
/**
* 执行时间
*/
private int timeout;
public Task(int timeout) {
this.timeout = timeout;
}
@Override
public void run() {
try {
//打印自定义线程名
log.info("当前线程名称:{}",Thread.currentThread().getName());
//使当前线程休眠指定时间
Thread.sleep(timeout * 1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 创建一个线程池实现方法,继承
ThreadPoolExecutor
并实现一个monitor方法,用于监听线程池的各个状态
package com.normal.notes.thead;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* 创建一个有监听方法的 线程池,继承ThreadPoolExecutor
* beforeExecute\afterExecute\terminated 执行时,分别监听当前线程池状态
*/
@Slf4j
public class MonitorThreadPool extends ThreadPoolExecutor {
/**
* 自定义线程池
* @param corePoolSize
* @param maximumPoolSize
* @param keepAliveTime
* @param unit
* @param workQueue
*/
public MonitorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MonitorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public MonitorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public MonitorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* 重写before executor方法,该方法每次任务执行前调用,在他内部调用一遍monitor方法,每当有任务执行的时候,输出一次线程池的情况,
* @param t
* @param r
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
log.info("beforeExecute");
monitor();
}
/**
* 重写afterexecutor方法,该方法每次任务完成后调用,在它内部也调用一遍monitor方法,每当有任务完成的时候,输出一次线程池的情况,
* @param r
* @param t
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
log.info("afterExecute");
monitor();
}
/**
* 最后重写terminated的方法。该方法在线程池关闭前调用,同样的,在它的内部也调用一遍monitor方法,
*/
@Override
protected void terminated() {
log.info("terminated");
monitor();
}
/**
* 监控线程池情况
*/
public void monitor(){
log.info("正在工作的线程数:{}",getActiveCount());
log.info("当前存在的线程数:{}",getPoolSize());
log.info("历史最大的线程数:{}",getLargestPoolSize());
log.info("已提交的任务总数:{}",getTaskCount());
log.info("已完成的任务数:{}",getCompletedTaskCount());
log.info("队列中的任务数:{}",getQueue().size());
log.info("================本次监听内容输出完成=====分割线================");
}
}
- 创建一个main方法类,使用自定义线程池执行任务,并打印线程池各个阶段的状态
package com.normal.notes.thead;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.*;
/**
* 线程 在日常生产中不会以new Thread()的形式出现,这种显示表达引发的问题有以下三点
* 1、OOM OutOfMemery 内存溢出,频繁的创建销毁线程是
* 2、资源开销浪费耗时
* 3、不可管理性
* 所以生产过程中如果需要用到多线程,一般以线程池的方式出现。
*
*/
@Slf4j
public class ThreadTest {
public static void main(String[] args) {
ThreadFactory springThreadFactory = new CustomizableThreadFactory("springThread-pool-");
// 创建带监控的线程池
MonitorThreadPool monitorThreadPool = new MonitorThreadPool(
1,//核心线程数1
3,//最大线程数3
0,//存留时间为0,临时线程结束后立即销毁
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(2),//任务队列长度2
springThreadFactory,//使用springboot提供的线程工厂
new ThreadPoolExecutor.AbortPolicy()//拒绝策略为,线程池满任务列表满 提交新的任务进行报错
);
try {
// 提交多个任务
for (int i = 10; i > 0; i--) {
Task task = new Task(i);
// 提交任务
monitorThreadPool.submit(task);
// 每隔500毫秒提交一个
Thread.sleep(500);
}
// 使主线程休眠6秒钟
Thread.sleep(6000);
// 关闭线程池之前获取一次线程池情况
monitorThreadPool.monitor();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭线程池
monitorThreadPool.shutdown();
}
}
}
执行结果:
结果分析:
标签:Java,Thread,队列,int,任务,线程,解析,public From: https://blog.51cto.com/u_15742546/7526479任务休眠时间为10、9、8、7、6秒,线程池最大线程数为3、队列长度为2,第6个任务提交过来之后线程状态为全部占用,且执行队列全部占用,此时会被执行拒绝策略,已提交任务总数为5,不会再增加。
干货长文,建议收藏