线程池在开发中一定会用到,如果能像golang一样,java语言也有协程,也许java程序员就少了一种包袱。
回归正题,我们聊下到底有哪些线程池的使用方式,总结有以下几种。
- JDK 内置线程池
- Spring线程池
- 自己魔改封装
1、JDK 内置线程池
常用的有:
我们看下最全的线程池参数,探究为什么阿里规约不建议使用Executors创建默认个数的线程池。
/**
参数【7个】
corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut
maximumPoolSize - 池中允许的最大线程数
keepAliveTime - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。
unit - keepAliveTime参数的时间单位
workQueue - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。
threadFactory - 执行程序创建新线程时使用的工厂
handler - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量
四个预定义的处理程序策略
在默认ThreadPoolExecutor.AbortPolicy ,处理程序会引发运行RejectedExecutionException后排斥反应。
在ThreadPoolExecutor.CallerRunsPolicy中,调用execute本身的线程运行任务。 这提供了一个简单的反馈控制机制,将降低新任务提交的速度。
在ThreadPoolExecutor.DiscardPolicy中 ,简单地删除无法执行的任务。
在ThreadPoolExecutor.DiscardOldestPolicy中 ,如果执行程序没有关闭,则工作队列头部的任务被删除,然后重试执行(可能会再次失败,导致重复)。
**/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1.1 阿里规约原文
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
-
1)FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
-
2)CachedThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
1.2 阿里规约是否是多余的?不然jdk为啥提供这个api?
答案可以看下ThreadPoolExecutor类的上的注释,已翻译成中文:
为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 然而,程序员被敦促使用更方便的Executors工厂方法Executors.newCachedThreadPool() (无限线程池,具有自动线程回收), Executors.newFixedThreadPool(int) (固定大小的线程池)和Executors.newSingleThreadExecutor() (单个后台线程),可以预先配置最常用的使用场景设置。 否则,手动配置和调优此类时,请使用以下指南...
其实这个类的作者"Doug Lea",已经说明Executors.newCachedThreadPool(),Executors.newFixedThreadPool(int),应该是在什么情况下使用?
只是一般程序员是不会去仔细看类说明文档,都是跟着百度复制过来的代码就干起活来了,
说起来也惭愧之前在2010年培训的时候,其实是会看这个jdk api文档的,但那个时候似乎很懵,也不会去详尽的看文档背后的一些知识点。
所以说看第一手材料多么重要,作者已经把该注意的点都写到类说明文档注释里了,吓得我赶紧用到一个类都去细看下文档注释,解决不了再找度娘。
1.3 线程池不为人知的几个冷门知识点
从几个面试题来一一道来。
1.核心和最大线程池是否都可以动态修改?
可以使用setCorePoolSize(int)和setMaximumPoolSize(int)进行动态 更改
2.核心线程最初创建并且只有在新任务到达时才启动?
可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态地覆盖。如果您使用非空队列构建池,则可能需要预先提供线程。
3.核心线程是否一定不终止?
不一定,报错或者allowCoreThreadTimeOut(boolean)也可以用于将这个超时策略应用于核心线程,只要keepAliveTime值不为零
4.如何给线程池添加一个启动/暂停的功能?
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}}
2、Spring线程池
官方已经实现的全部7个TaskExecuter。Spring宣称对于任何场景,这些TaskExecuter完全够用了:
名字 | 特点 |
---|---|
SimpleAsyncTaskExecutor | 每次请求新开线程,没有最大线程数设置.不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。限流是通过 ConcurrencyThrottleSupport类的monitor的wait(),notify() 实现的 |
SyncTaskExecutor | 不是异步的线程.同步可以用SyncTaskExecutor,但这个可以说不算一个线程池,因为还在原线程执行。这个类没有实现异步调用,只是一个同步操作。 |
ConcurrentTaskExecutor | Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。 |
SimpleThreadPoolTaskExecutor | 监听Spring’s lifecycle callbacks,并且可以和Quartz的Component兼容.是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。 |
ThreadPoolTaskExecutor | 最常用。要求jdk版本大于等于5。可以在程序而不是xml里修改线程池的配置.其实质是对java.util.concurrent.ThreadPoolExecutor的包装。 |
TimerTaskExecutor | |
WorkManagerTaskExecutor |
- 使用ThreadPoolExecutorFactoryBean
package org.springframework.scheduling.concurrent;
public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean {
private int corePoolSize = 1;
private int maxPoolSize = 2147483647;
private int keepAliveSeconds = 60;
private boolean allowCoreThreadTimeOut = false;
private int queueCapacity = 2147483647;
private boolean exposeUnconfigurableExecutor = false;
@Nullable
private ExecutorService exposedExecutor;
public ThreadPoolExecutorFactoryBean() {
}
...
protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, (long)keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}
- 使用ScheduledExecutorFactoryBean
package org.springframework.scheduling.concurrent;
public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport implements FactoryBean<ScheduledExecutorService> {
private int poolSize = 1;
@Nullable
private ScheduledExecutorTask[] scheduledExecutorTasks;
private boolean removeOnCancelPolicy = false;
private boolean continueScheduledExecutionAfterException = false;
private boolean exposeUnconfigurableExecutor = false;
@Nullable
private ScheduledExecutorService exposedExecutor;
public ScheduledExecutorFactoryBean() {
}
-
使用ThreadPoolTaskExecutor
这个类可以设置回调方法 -
使用ThreadPoolTaskScheduler
这个类可以设置回调方法,包装错误处理类,错误不会影响执行下一个任务@Scheduled就是使用这个类包装的,注意核心默认一个线程,会阻塞任务
@Override
public void run() {
try {
this.delegate.run();
}
catch (UndeclaredThrowableException ex) {
this.errorHandler.handleError(ex.getUndeclaredThrowable());
}
catch (Throwable ex) {
this.errorHandler.handleError(ex);
}
}