juc基础之线程池实现原理
线程池概念
线程池和数据库连接池非常类似,可以统一管理和维护线程,减少没必要的开销
为什么使用使用线程池?
因为频繁的开启线程或者停止线程,线程需要重新被cpu从就绪到运行状态调度,需要发生cpu的上下文切换,效率非常低
线程池是复用机制,提前创建好一些固定的线程数一直在运行状态,实现复用,从而可以减少就绪到运行状态的切换
![](/Users/pxw/Desktop/截屏2022-10-30 上午11.11.06.png)
哪些地方会使用到线程池?
实际项目中,禁止自己new Thread(),必须使用线程池来维护和创建线程
线程池有哪些作用?
核心点:复用机制,提前创建好固定的线程一直在运行状态,实现复用,限制线程创建数量
- 降低资源消耗:通过池化 技术重复利用已创建的线程,降低线程创建和销毁造成的损耗
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行
- 线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性,使用线程池可以进行统一的分配,调优,监控
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能,如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行
线程池的创建方式?
- Executors.newCachedThreadPool() 不使用
可缓存线程池
-
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
} -
缺点:Integer.MAX_VALUE,不能指定线程数,相当于无限制创建线程,没法实现线程复用
-
Executors.newScheduledThreadPool(10) 不使用
延迟线程池
-
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
} -
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
} -
new DelayedWorkQueue()是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义(设置maximumPoolSize的大小没有什么效果)
-
-
Executors.newFixedThreadPool(10) 使用
定长线程池
-
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
-
Executors.newSingleThreadExecutor() 不使用
单例线程池
-
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
} -
核心线程数为1,不能实现线程的复用
-
-
底层都是基于ThreadPoolExecutor构造函数封装
线程池底层是如何实现线程复用?
本质思想:创建一个线程,不会立马停止或者销毁而是一直实现复用
- 原理图解
- ![](/Users/pxw/Desktop/截屏2022-10-30 下午1.46.04.png)
纯手写线程池?
-
class MyExecutor{
ArrayList<Runnable> threads;
BlockingDeque<Runnable> tasks;
public MyExecutor(int threadNumber,int dequeSize) {
//创建线程实例容器ArrayList
threads = new ArrayList<>(threadNumber);
//创建任务容器LinkedBlockingDeque,设置其容量
tasks = new LinkedBlockingDeque<>(dequeSize);
//创建两个线程,将其加入到线程实例容器
for (int i = 0; i < threadNumber; i++) {
Thread thread= new Thread(() -> {
while (true) {
//线程容器里的线程不间断地从任务容器中获取任务
Runnable poll = tasks.poll();
if(poll!=null){
poll.run();
}
}
});
thread.start();
threads.add(thread);
}
}
public boolean execute(Runnable runnable){
//将任务加入到任务容器
return tasks.offer(runnable);
};
}
TreadPoolExector的核心参数?
-
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
} -
核心参数
- corePoolSize:核心线程数,一直保持运行状态的线程数
- maximumPoolSize:最大线程数,线程池允许创建的最大线程数
- keepAliveTime:超过corePoolSize创建的线程的最大存活时间
- unit:keepAliveTime的时间单位
- workQueue:任务队列,用以保存待执行的任务
- threadFactory:线程池内部创建线程所用的工厂
- handler:任务无法执行时的处理器
线程池创建的线程会一直处于运行状态吗?
不会
例如:核心线程数为2,最大线程数为5,我们可以通过配置超出corePoolSize创建的线程的最大存活时间,例如为60s,在60s内没有核心线程之外的线程在执行任务,则会停止该线程
线程池TreadPoolExector底层原理实现?
-
提交的任务数<核心线程数
-
提交的任务数>核心线程数
- 任务队列没满,则缓存到任务队列中,等待执行
- 任务队列满了,最多创建maximumPoolSize个线程
- 缓存的任务数+maximumPoolSize>提交的任务数,则正常执行
- 缓存的任务数+maximumPoolSize<提交的任务数,抛出异常,拒绝任务
实际上线程池最多能执行多少个任务?
- 核心线程数+任务队列容量+额外线程数
任务队列满了,任务会丢失吗?
如果提交任务数>核心线程数+任务队列容量+额外线程数,则会触发拒绝策略,可以自定义拒绝异常,将该任务缓存到redis,本地文件,mysql中后期项目启动实现补偿
-
AbortPolicy:丢弃任务,运行时抛出异常
-
CallerRunsPolicy:执行任务
-
DiscardPolicy:忽视,什么都不会发生
-
DiscardOldestPolicy:从队列中踢出最先进入队列(最后执行的任务)的任务
-
代码实现
-
class MyExecutor{
public ThreadPoolExecutor getExecutor() {
return new ThreadPoolExecutor(
2,
5,
30,
TimeUnit.SECONDS,
//千万要定义容量,不然是无界队列
new LinkedBlockingDeque<>(10),
new ThreadPoolExecutor.AbortPolicy());
}
}
-
-
实现RejectedExecutionHandler,可定义处理器
-
代码实现
-
class MyExecutor{
public ThreadPoolExecutor getExecutor() {
return new ThreadPoolExecutor(
2,
5,
30,
TimeUnit.SECONDS,
//千万要定义容量,不然是无界队列
new LinkedBlockingDeque<>(10),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义异常处理");
}
});
}
}
-