系统启动一个线程的成本是比较高的,因为启动线程的操作要与操作系统交互。如果程序中需要创建大量生存期较短的线程,那么使用线程池将会大幅度提高程序的运行效率。线程池中保存了一定数目可重复使用的线程,因此可以在使用时直接从线程池中获得一个线程,用完之后还可以把线程放回线程池以便后面再次使用。
线程池在系统启动时就创建大量线程,程序将一个Runnable或Callable实现类对象传递给线程池,线程池就会启动一个空闲线程来执行它们的run()方法或call()方法。当run()方法或call()方法执行完之后,线程并不会死亡,而是再次返回线程池中等待执行下一个对象的run()方法或call()方法。
使用线程池还有一个好处就是能够控制系统中并发线程的数量。当系统包含大量并发线程时,会导致系统性能下降,而线程池的最大线程数参数控制系统中并发线程不会超过设定值。
14.8.1使用Executors产生线程池
Java语言使用ExecutorService和ScheduledExecutorService这两个接口来表示线程池,而使用Executors类生成线程池。Executors位于java.util.concurrent包下,这个类提供了一些静态方法来产生线程池,这些方法如表14-6所示。
表14-6 Executors类的方法
方法 | 功能 |
newCachedThreadPool() | 创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中 |
newFixedThreadPool(int nThreads) | 创建一个可重用的、具有固定线程数的线程池 |
newSingleThreadExecutor() | 创建一个只有单线程的线程池,它相当于调用newFixedThread Pool()方法时传入参数为1 |
newScheduledThreadPool(int corePoolSize) | 创建具有指定线程数的线程池,它可以在指定的时长后执行线程任务 |
newSingleThreadScheduledExecutor() | 创建只有一个线程的线程池,它可以在指定的时长后后执行线程任务 |
newWorkStealingPool(int parallelism) | 创建持有足够线程的线程池来支持给定的并行级别 |
newWorkStealingPool() | 该方法是前一个方法的简化版本,相当于为前一个方法传入4作为参数 |
以上这些方法中,newScheduledThreadPool()newSingleThreadScheduledExecutor()这两个方法的返回值是ScheduledExecutorService类型,其他方法的返回值是ExecutorService类型。
ExecutorService代表尽快执行的线程池,也就是说只要把代表任务的Runnable对象或Callable对象交给线程池,线程池就会尽快执行该任务。ExecutorService提供了三个版本的submit()方法用于把任务提交给线程池,如表14-7所示。
表14-7 ExecutorService提交任务的方法
编号 | 方法 |
1 | Future<?> submit(Runnable task) |
2 | <T> Future<T> submit(Runnable task, T result) |
3 | <T> Future<T> submit(Callable<T> task) |
1号版本的submit()方法作用是是将task提交给线程池,线程池会尽快执行任务。这个方法的返回值类型是Future,程序员可以用这个Future类型的返回值获得线程执行的返回值。由于run()方法没有返回值,所以用Future的get()方法所获得的返回值为null,但能用Future的isDone()和isCancelled()方法得到线程的执行状态。
2号版本的submit()方法与前一个版本作用一样,区别只在于程序员可以把Future的返回值显示的指定为result。
3号版本的submit()方法用于把一个Callable对象提交给线程池,线程池会尽快执行任务,而Future的get()方法能够获得call()方法的返回值。
与ExecutorService不同,ScheduledExecutorService表示可以在指定延迟后或周期性执行任务的线程池,它提供了如表14-8所示的4个方法用于向线程池提交任务。
表14-8 ScheduledExecutorService提交任务的方法
编号 | 方法 |
1 | ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit) |
2 | <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) |
3 | ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit) |
4 | ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit) |
以上这些方法中,1号方法是让command任务在指定的delay延迟后执行。2号方法是让callable任务在指定的delay延迟后执行。3号方法是让command任务在指定的delay延迟后执行,并且以period为周期重复执行。4号方法是让command任务在指定的delay延迟后执行,并且以period为周期重复执行,每两次执行任务之间也会有delay时间长度的间隔。
用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭操作。调用shutdown()方法后的线程池不再接收新任务,但会将以前所有已提交任务执行完成。当线程池中的所有任务都执行完成后,池中的所有线程都会死亡,另外也可以调用线程池的shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
使用线程池来执行线程任务的步骤如下。
- 调用Executors类的静态工厂方法创建一个ExecutorService对象,该对象代表一个线程池
- 创建Runnable实现类或Callable实现类的实例,作为线程执行任务
- 调用ExecutorService对象的submit()方法来提交Runnable实例或Callable实例
- 当不想提交任何任务时,调用ExecutorService对象的shutdown()方法来关闭线程池
下面的【例14_17】展示了如何使用线程池处理任务。
【例14_17 线程池】
Exam14_17.java
import java.util.concurrent.*;
class Task implements Runnable{
@Override
public void run() {
try {
for (int i=1;i<=5;i++){
System.out.println(Thread.currentThread().getName()+":"+i);
Thread.sleep(200);
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
public class Exam14_17 {
public static void main(String[] args){
ExecutorService pool = Executors.newFixedThreadPool(5);
Task task1 = new Task();
Task task2 = new Task();
pool.submit(task1);//把任务提交给线程池
pool.submit(task2);//把任务提交给线程池
pool.shutdown();//①关闭线程池
}
}
【例14_17】的main()方法中,把两个任务task1和task2提交给线程池,线程池就会立刻分配线程执行这两个任务,之后关闭线程池。【例14_17】的运行结果如图14-16所示。
图14-16【例14_17】运行结果
从图14-16可以看出:语句①在两个任务刚被提交给线程池的时候就调用shutdown()方法关闭线程池,但线程池中的线程仍然顺利的执行了两个任务。这是因为shutdown()方法的作用并不是停止线程的执行,而是“拒绝”新的任务进入池内。当线程池中的线程开始执行任务后,即使调用了shutdown()方法,线程池中的线程也不会中止执行,线程会全部执行完毕。
14.8.2 FolkJoinPool线程池
当今的计算机已经向着“多核化”的方向发展,很多计算机都有不止一个CPU。如果要以最快的速度执行完任务,最好的方式就是把一个任务分解成多个小任务分配给多个CPU执行,然后把每个小任务的执行结果组合成整个任务的执行结果。
Java语言中,ForkJoinPool就是一种能把一个任务分解成多个小任务的线程池。ForkJoinPool是ExecutorService接口的实现类,它有以下两个构造方法:
public ForkJoinPool(int parallelism)
public ForkJoinPool()
第一个构造方法创建一个包含parallelism个并行线程的线程池,而第二个构造方法也是创建一个线程池,它的并行线程数量是Runtime.getRuntime().availableProcessors()的返回值,这个返回值实际上就是系统逻辑CUP的数量。从JDK1.8开始,ForkJoinPool又增加了一个commonPool()静态方法来获得ForkJoinPool对象,通过这个方法获得的ForkJoinPool对象被称为通用池。通用池的运行状态不会受shutdown()或shutdownNow()方法的影响。但如果程序员调用System.exit(0)来终止虚拟机,通用池以及通用池中正在执行的任务都会被自动终止。
创建了ForkJoinPool 实例之后,就可调用ForkJoinPool 的submit()或 invoke()方法来执行指定的任务了。这两个方法的参数类型都是ForkJoinTask,ForkJoinTask代表一个可以并行、合并的任务,它的fork()方法用于将新创建的子任务放入当前线程的工作队列中。ForkJoinTask是一个抽象类,并且它还有两个抽象子类:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任务,而RecursiveAction代表没有返回值的任务,下面的【例14_18】演示了如何把一个大任务拆分成小任务并交给ForkJoinPool线程池执行。
【例14_18 ForkJoinPool线程池1】
Exam14_18.java
import java.util.concurrent.*;
class PrintTask extends RecursiveAction{//可拆解的任务
private static int THRESHOLD = 50;//每个小任务最多打印50个数字
private int start;//要打印的第一个数字
private int end;
public PrintTask(int start,int end){
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if(end-start<THRESHOLD){//要打印的数字量小于等于50
for (int i=start;i<end;i++){
System.out.println(Thread.currentThread().getName()+":"+i);
}
}else{//要打印的数字量大于50
//拆解任务
int middle = (start+end)/2;
PrintTask left = new PrintTask(start,middle);
PrintTask right = new PrintTask(middle,end);
//把两个拆分出的小任务放入工作队列
left.fork();
right.fork();
}
}
}
public class Exam14_18 {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
PrintTask task = new PrintTask(0,300);
pool.submit(task);//把任务提交给线程池
try {
pool.awaitTermination(2, TimeUnit.SECONDS);//①等待2秒钟
}catch (InterruptedException e){
e.printStackTrace();
}
pool.shutdown();
}
}
【例14_18】中,PrintTask表示一个可拆分的任务,它负责打印[start,end)这个区间内的整数。如果要打印的数字超过50个,则把任务拆分成两小任务,每个小任务各打印一半,如果每个小任务要打印的数字仍然超过50个,则继续拆分,直到任务小到打印数字不超过50个为止。需要注意:ForkJoinPool表示的线程池在调用shutdown()方法后会立刻结束池中线程的运行,因此语句①调用sleep()方法等待了2秒钟以保证池中线程全部执行完毕。【例14_18】的运行结果图14-17所示。
图14-17【例14_18】运行结果
由于运行结果很长,图14-17仅展示了运行结果的一部分。从图14-7可以看出:线程池启动了4个线程来完成这个任务,线程数量之所以是4,是因为线程数量一般与逻辑CPU数量相等,而逻辑CPU又是物理CPU数量的2倍。此外还可以看出:虽然打印了0~299这300个数字,但并不是按顺序连续打印的,这恰好证明了多个线程是并行执行的。
【例14_18】中的任务没有返回值,如果一个任务是有返回值的,可以让任务类继承RecursiveTask,这个类是一个泛型类,类型参数就是返回值的类型。下面的【例14_19】展示了使用RecursiveTask类定义一个任务,这个任务要计算1~100累加和。
【例14_19 ForkJoinPool线程池2】
Exam14_19.java
import java.util.concurrent.*;
class AddTask extends RecursiveTask<Integer> {//可拆解的任务
private static int THRESHOLD = 20;//每个小任务最多累加20个数字
private int start;
private int end;
public AddTask(int start,int end){
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if(end-start<THRESHOLD){
for (int i=start;i<end;i++){
sum = sum + i;
}
return sum;
}else{
//拆解任务
int middle = (start+end)/2;
AddTask left = new AddTask(start,middle);
AddTask right = new AddTask(middle,end);
//把两个拆分出的小任务放入工作队列
left.fork();
right.fork();
return left.join()+right.join();//把两个任务的累加和加起来
}
}
}
public class Exam14_19 {
public static void main(String[] args) {
//创建通用池
ForkJoinPool pool = ForkJoinPool.commonPool();
AddTask task = new AddTask(1,101);//①
//把任务提交到线程池,并把运行结果保存到future中
Future<Integer> future = pool.submit(task);
try {
System.out.println("累加结果为:"+future.get());
} catch (Exception e) {
e.printStackTrace();
}
pool.shutdown();//关闭线程池
}
}
【例14_19】与【例14_18】很相似,任务只是把打印数字改成了累加数字。需要注意:compute()方法在进行累加时不会把end这个数字加进去,所以语句①在给构造方法传递参数时第二个参数的值是101而不是100。此外,本例中的pool对象是一个通用池,因此在线程运行结束前调用shutdown()方法并不会终止池中线程的运行。【例14_19】的运行结果如图14-18所示。
图14-18【例14_19】运行结果