线程池有如下四个优点:
- 降低资源消耗: 重用已经创建的线程, 线程的创建和销毁需要消耗计算机资源,特别是在有大量创建线程请求且线程的处理过程是轻量级的,例如:大多数的服务器。
- 提高响应速度:重用已经创建的线程。
- 提高线程的稳定性:可创建的线程数量是由有限制的,限制值是有多个因素制约,例如:JVM启动参数,Thread构造参数的请求栈大小,底层操作系统对线程的限制。“ 为每一个任务分配一个线程”没有限制可创建线程的数量,可能会创建过多的线程,造成资源消耗,出现稳定性问题。
- 提高线程的可管理性:可以使用线程池统一分配、调优和监控。
1. 线程池如何处理一个任务
线程池如何处理任务可以使用如下一个流程图来描述。(该图选自 Java并发编程的艺术 / 方腾飞老师,魏鹏老师,魏晓明老师著,下同 )。
线程池处理一个接受的任务:
- 线程池先判断核心线程池是否已经已满(即是否全部都在执行任务),如果不是则创建工作线程执行该任务,否则进入下一步。
- 判断工作队列是否已经满了,如果没有满(即工作队列还有存储空间),则将该任务入队,否则进入下一步。
- 判断线程池中的线程是否都处于工作状态,如果不是则创建工作线程执行该任务,否则拒绝这个任务。
2. 使用 ThreadPoolExecutor 创建线程池
2.1 ThreadPoolExecutor的构造函数和参数
ThreadPoolExecutor的四个构造函数如下所示:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
对上面的参数进行讲解:
参数 | 描述 |
int corePoolSize | 核心线程数,设置核心池的大小。 (1)线程池中的线程小于corePoolSize当有任务来之后,就会创建一个线程去执行任务。 (2)当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。 |
int maximumPoolSize | 最大线程数,它表示在线程池中最多能创建多少个线程,超过最大线程数任务会被拒绝。 |
long keepAliveTime | 空闲线程存活时间,即线程没有任务执行时最多保持多久时间会终止。 |
TimeUnit unit | keepAliveTime的时间单位,单位有:纳秒、微秒、毫秒、秒、 分钟、小时、天。 |
BlockingQueue<Runnable> workQueue; | 阻塞队列,该接口的子类:ArrayBlockingQueue,DelayQueue,LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue。 |
ThreadFactory threadFactory | 创建线程的工厂,源码如下: public interface ThreadFactory { Thread newThread(Runnable r); } |
RejectedExecutionHandler handler | 饱和策略,队列和线程池都满了,执行饱和策略。 这是一个接口:
四个子类: (1)ThreadPoolExecutor.AbortPolicy : 直接抛出异常 (2)ThreadPoolExecutor.CallerRunsPolicy : 只用调用者所在线程来运行任务。 (3)ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列里最近的一个任务,执行当前任务 (4)ThreadPoolExecutor.DiscardPolic:不处理,丢弃掉。 |
2.2 阻塞队列(BlockingQueue)
这里简单的介绍一下阻塞队列(BlockingQueue),阻塞队列是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。
阻塞的插入:队列满时,队列会阻塞插入元素的线程。
阻塞的移除:队列为空时,获取线程会阻塞,等待队列中不为空。
阻塞队列有如下几种:ArrayBlockingQueue,DelayQueue,LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue。
2.3 execute(Runnable command) 方法
execute(Runnable command) 方法,执行给定的任务。
execute 方法执行的示意图如下所示,主要有以下四种情况:
(1)如果当前运行的线程小于 corePoolSize,则创建新线程执行任务(执行这一步需要获得全局锁)。
(2)如果运行的线程等于多于 corePoolSize,将任务加入到阻塞队列中。
(3)如果阻塞队列已满,则创建新的线程处理任务(执行这一步需要获得全局锁)。
(4)创建的线程数大于 maximumPoolSize,任务会被拒绝,并调用 RejectedExecutionHandler handler 进行拒绝。
2.4 关闭线程池
可以通过 shutdown 和 shutdownNow 两个方法关闭线程池。他们的工作原理都是遍历线程池中的工作线程,然后调用 interrupt 去中终端。
两者的区别:shutdownNow 把线程池状态设置为stop(即去尝试停止所有线程),shutdown 把线程池状态设置为 SHOWDOWN , 只去中断没有执行任务的线程。
调用了上面两个方法的其中一个,isShuttdown 返回 true 。所有任务都已经关闭 isTerminaed 返回 true。
3. 代码示例
核心线程数为1,最大线程数为2,最长存活时间2毫秒,阻塞队列长度为1。
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 2,
2, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));
执行1个任务,测试代码如下所示。当前运行的线程小于 corePoolSize,则创建新线程执行任务。
package threadPool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPool implements Runnable{
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 2,
2, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));
ThreadPool threadPool01= new ThreadPool("thread01");
poolExecutor.execute(threadPool01);
}
private String name;
ThreadPool(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( this.getName()+ "执行完毕");
}
}
运行截图如下所示:
执行2个任务,测试代码如下所示。运行的线程等于多于 corePoolSize,将任务加入到阻塞队列中。
public class ThreadPool implements Runnable{
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 2,
2, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));
ThreadPool threadPool01= new ThreadPool("thread01");
ThreadPool threadPool02 = new ThreadPool("thread02");
poolExecutor.execute(threadPool01);
poolExecutor.execute(threadPool02);
}
}
运行截图如下所示:
执行3个任务,测试代码如下所示。如果阻塞队列已满,则创建新的线程处理任务(执行这一步需要获得全局锁)。
public class ThreadPool implements Runnable{
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 2,
2, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));
ThreadPool threadPool01= new ThreadPool("thread01");
ThreadPool threadPool02 = new ThreadPool("thread02");
ThreadPool threadPool03 = new ThreadPool("thread03");
poolExecutor.execute(threadPool01);
poolExecutor.execute(threadPool02);
poolExecutor.execute(threadPool03);
}
}
运行截图,如下所示:
执行4个任务,测试代码如下所示。创建的线程数大于 maximumPoolSize,任务会被拒绝,并调用 RejectedExecutionHandler handler 进行拒绝。
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 2,
2, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));
ThreadPool threadPool01= new ThreadPool("thread01");
ThreadPool threadPool02 = new ThreadPool("thread02");
ThreadPool threadPool03 = new ThreadPool("thread03");
ThreadPool threadPool04 = new ThreadPool("thread04");
poolExecutor.execute(threadPool01);
poolExecutor.execute(threadPool02);
poolExecutor.execute(threadPool03);
poolExecutor.execute(threadPool04);
}
运行截图如下所示,第四个线程被拒绝了。
4. 线程池线程数量
线程数量计算公式:Nthread = Ncpu * Ucpu * (1+ W/C),
各字段含义:
Nthreads:线程数量
Ncpu:CPU的数量,Runtime.getRuntime().availableProcessors()
Ucpu:CPU使用率,范围在[0,1]
W/C:等待时间与计算时间的比率
公式解读:其实就是要分清是计算密集型还是IO密集型,从公式可以看到如果是 如果是C无限大也就是计算密集型的那么线程太多意义不大,因为需要CPU计算,起多了也没用。如果是IO密集型那么可以设置更多的线程,因为等待时间过多。
参考文献:
- Java并发编程的艺术 / 方腾飞,魏鹏,魏晓明著 . ——北京:机械工业出版社,2015.7
- Java并发编程实战/(美)盖茨等著;童玉兰等译.——北京:机械工业出版社,2012.2