一、什么是线程池,为什么使用线程池?
线程池其实是一种池化的技术的实现,实现资源的一个复用,避免资源的重复创建和销毁带来的性能开销
在线程池中,线程池可以管理一堆线程,让线程执行完任务之后不会进行销毁,而是继续去处理其它线程已经提交的任务
线程池的优点:
-
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
-
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
-
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统 的稳定性,使用线程池可以进行统一的分配,调优和监控。
1、常见线程池
一般都是通过new Thread()来进行线程的创建,但是这样会有一些问题,如:
-
每次创建的new Thread()新建的对象性能差
-
线程缺乏统一的管理,可能无限制的创建新线程,相互之间竞争,极可能占用系统资源过低导致死机或OOM
-
缺乏功能,如定时执行、定期执行和线程中断
相比于new Thread(),Java提供的四种线程池的好处在于:
-
重用存在的线程,减少对象的创建、消亡的开销、性能佳
-
可有效地控制最大并发线程数,提高系统资源的使用率,同时避免过多的资源竞争,避免堵塞
-
提供定时执行、定期执行、单线程、并发数控制等功能
Java 定义了 Executor 接口并在该接口中定义了 execute() 用于执行一个线程任务,然后通过 ExecutorService 实现 Executor 接口并执行具体的线程操作。
ExecutorService 接口有多个实现类可用于创建不同的线程池,如下表:
(1)newCachedThreadPool
创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
});
}
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程
(2) newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。 定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache。
(3) newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
定期执行示例代码如下:
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
表示延迟1秒后每3秒执行一次。 ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。
(4) newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
2、禁止使用 Executors 创建线程池
可以看到上述四个线程池都是用Executors来创建的,Executors 大大的简化了我们创建各种类型线程池的方式,为什么还不让使用呢?
阿里巴巴Java开发手册说明:
线程池不允许使用Executors,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能更加明确线程池的运行规则,避免资源耗尽的风险。
那么先了解一下ThreadPoolExecutor吧
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor有7个核心参数,分别了解下:
序号 | 参数名称 | 参数解释 | |
---|---|---|---|
1 | corePoolSize | 表示常驻核心线程数,如果大于0,即使本地任务执行完也不会被销毁 | |
2 | maximumPoolSize | 表示线程池能够容纳可同时执行的最大线程数 | |
3 | keepAliveTime | 表示线程池中线程空闲的时间,当空闲时间达到该值时,线程会被销毁,只剩下 corePoolSize 个线程位置 | |
4 | unit | keepAliveTime 的时间单位,最终都会转换成【纳秒】,因为CPU的执行速度杠杠滴 | |
5 | workQueue | 当请求的线程数大于 maximumPoolSize 时,线程进入该阻塞队列 | |
6 | threadFactory | 顾名思义,线程工厂,用来生产一组相同任务的线程,同时也可以通过它增加前缀名,虚拟机栈分析时更清晰 | |
7 | handler | 执行拒绝策略,当 workQueue 达到上限,就要通过这个来处理,比如拒绝,丢弃等,这是一种限流的保护措施 |
ThreadPoolExecutor 提供了四种拒绝策略:
-
AbortPolicy:默认的拒绝策略,会 throw RejectedExecutionException 拒绝
-
CallerRunsPolicy:提交任务的线程自己去执行该任务
-
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列
-
DiscardPolicy:相当大胆的策略,直接丢弃任务,没有任何异常抛出
不同的框架(Netty,Dubbo)都有不同的拒绝策略,我们也可以通过实现 RejectedExecutionHandler
自定义的拒绝策略
再来看一下newFixedThreadPool的静态方法参数
public static ExecutorService newFixedThreadPool(int nThreads,
ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(), threadFactory);
}
而 LinkedBlockingQueue传入的workQueue 是一个边界为 Integer.MAX_VALUE
队列,也就是无界队列了,那么等待队列也是非常消耗内存的,可能堆积大量的请求,从而导致 OOM
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
这里使用的默认拒绝策略
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
那为什么不能使用默认拒绝策略呢?
因为对于不可预估的高并发量,比较重要的请求时直接拒绝肯定是不合理的,那么选择合理的拒绝策略是必不可少的步骤
对于采用何种策略,具体要看执行的任务重要程度。
如果是一些不重要任务,可以选择直接丢弃;
如果是重要任务,可以采用降级(所谓降级就是在服务无法正常提供功能的情况下,采取的补救措施)
例如将任务信息插入数据库或者消息队列,启用一个专门用作补偿的线程池去进行补偿
Executors
返回线程池对象也有OOM的风险:
-
FixedThreadPool
和SingleThreadExecutor
:使用的是无界的LinkedBlockingQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。 -
CachedThreadPool
:使用的是同步队列SynchronousQueue
, 允许创建的线程数量为Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。 -
ScheduledThreadPool
和SingleThreadScheduledExecutor
: 使用的无界的延迟阻塞队列DelayedWorkQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM
从上可以看出:应使用有界队列,控制线程创建数量
因此在项目开发中,使用ThreadPoolExecutor创建线程池,禁止使用 Executors 创建线程池
二、总结
当需要频繁的创建线程时,通过线程池统一管理线程资源,避免不可控风险以及额外的开销
标签:Java,int,创建,队列,任务,线程,执行 From: https://blog.csdn.net/qq_36451127/article/details/140574107