摘要:使用CountDownLatch和FutureTask解决主线程需要拿到多个子线程任务的执行结果之后再进行执行的问题。
综述
我们在工作中,经常遇到有些业务场景需要使用多线程异步执行任务,从而加快任务执行速度。本文探讨的业务场景如下:某一个业务接口,需要处理几百个请求,之后再由一个线程汇总每个请求的执行结果。解决办法是基于并发包中的FutureTask,用轮询方式判断 Future.isDone 任务是否结束,再获取结果。
FutureTask是什么?FutureTask表示一个异步运算的任务,是Java 5 新增的Future接口的一个基础实现,我们可以将它同Executors一起使用处理异步任务。
FutureTask里面可以传入一个Callable的具体实现类,可以对这个异步运算任务的结果进行等待获取、判断是否已经完成、取消任务等操作。当然,由于FutureTask也是Runnable接口的实现类,所以FutureTask也可以放入线程池中。
FutureTask表示的计算是通过Callable来实现的,相当于一种可生产结果的Runnable,并且可以处于以下3种状态:等待运行,正在运行和运行完成。运行完成表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会永远停止在这个状态上。Future.get的行为取决于任务的状态,如果任务已经完成,那么get会立刻返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者异常。
CountDownLatch方法详解
CountDownLatch是一个同步工具类,它通过一个计数器来实现,初始值为线程的数量。每当一个线程完成了自己的任务,计数器的值就务必减1;当计数器递减至0时,表示所有的线程都已执行完毕,然后在等待的线程就被唤醒。
CountDownLatch(int count):count为计数器的初始值(一般初始化为线程个数)。
countDown():每调用一次计数器值-1,直到count被减为0,代表所有线程全部执行完毕。
getCount():获取当前计数器的值。
await(): 等待计数器变为0,即等待所有异步线程执行完毕。
boolean await(long timeout, TimeUnit unit):无论计数器的值是否递减到0,只等待timeout时间就唤醒。它与await()区别:
① 至多会等待指定的时间,超时后自动唤醒,若 timeout 小于等于零,则不会等待;
② boolean 类型返回值:若计数器变为零了,则返回 true;若指定的等待时间过去了且计数器的值大于零,则返回 false。
CountDownLatch应用场景
常见应用场景如下:
- 某个线程需要在其它N个线程执行完毕后再执行。
- 某个线程需要等待其它N个线程执行 T 毫秒后再执行。
- 多个线程并行执行同一个任务,提高并发量。
Future类
Future类提供了方法来检查异步调用是否完成、等待异步调用完成并获取异步调用返回结果。get()方法可以对线程进行阻塞,直到异步调用完成并返回结果。cancel()方法可以取消异步方法的执行。
Future是一个接口,定义了异步线程执行结果的获取方法,以及异步线程执行的取消方法。下面看源码:
boolean cancel(boolean mayInterruptIfRunning);
尝试取消此任务的执行。如果任务已完成、已取消或由于其他原因无法取消,则此尝试将失败。如果成功,并且在调用cancel时此任务尚未启动,则此任务不应运行。如果任务已经启动,则mayInterruptIfRunning参数确定执行此任务的线程是否应该中断以尝试停止任务。
此方法返回后,对isDone的后续调用将始终返回true。如果此方法返回true,则对isCancelled的后续调用将始终返回true。
V get() throws InterruptedException, ExecutionException;
阻塞调用线程直到任务执行完成或者取消,然后检索其结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
如果需要,最多等待给定时间以完成任务,然后检索其结果(如果可用),此时如果任务未结束,则抛出TimeoutException
异常。
isDone()
是否执行结束,true 已结束,false 未结束。
夯实基础
学习并发部分的Future与CountDownLatch时有个疑惑:既然Future对象的get方法会挂起等待到该线程执行完并返回结果时才执行,那么有时候为什么还需配合使用CountDownLatch呢?接下来通过一个例子来说明为什么需要,本例模拟并行获取接口(工人工作时长)数据,最后Boss线程统计优秀员工工作时间。
基本程序基于帖子 java latch闭锁基本使用(结合future) 做了改动。我们在该程序上做进一步的小测试——某个线程需要等待其它N个线程执行M 毫秒后再执行。定义worker线程,它会执行工作并返回工作时长:
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @Author Wiener
* @Date 2022-12-25
* @Description: 实现Callable的工人
*/
public class WorkerWithResult implements Callable<Worker> {
private CountDownLatch downLatch;
private int workerId;
public WorkerWithResult(CountDownLatch downLatch, int workerId) {
this.downLatch = downLatch;
this.workerId = workerId;
}
@Override
public Worker call() {
int workTime = -1;
try {
workTime = new Random().nextInt(3000);
// 降低4号员工的工作效率
if (workerId == 4) {
workTime = workTime + 3000;
}
//通过睡眠时长模拟在处理复杂业务,如果不睡,效率刚刚的
TimeUnit.MILLISECONDS.sleep(workTime);
System.out.println(workerId + " used time: " + workTime);
} finally {
Worker oneWorker = new Worker();
oneWorker.setWorkerId(this.workerId);
oneWorker.setWorkTime(workTime);
downLatch.countDown();
return oneWorker;
}
}
}
boss线程统计所有优秀worker的工作时长,并在控制台打印优秀员工的工时情况。接收时以list传入它的构造器:
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* Boss Bean
*
* @author Wiener
*/
@Slf4j
public class BossWithResult implements Runnable {
private static Long givenWorkTime = 2000L;
private CountDownLatch downLatch;
//保存每个工人的工作时间
private List<Future> taskList;
public BossWithResult(CountDownLatch downLatch, List<Future> workTimeUseList) {
this.downLatch = downLatch;
this.taskList = workTimeUseList;
}
@Override
public void run() {
Integer totalTime = 0;
// 任务开始前预创建变量
List<Future> doneTasks = new ArrayList<>();
try {
// 等待指定时间后,查看工作完成情况,这期间完成工作说明工作效率杠杠的,优秀员工
Boolean finished = downLatch.await(givenWorkTime, TimeUnit.MILLISECONDS);
if (!finished) {
// 响应超时
log.warn("响应超时: {} ms", givenWorkTime);
}
} catch (InterruptedException e) {
log.error("任务失败,", e);
}
// 保留已完成的任务
for (Future<Worker> workerTask : taskList) {
if (workerTask.isDone()) {
doneTasks.add(workerTask);
}
}
log.info("已完成task:{}", doneTasks.size());
// 分析已经执行完的任务
int workTime = 0;
for (Future<Worker> excellentWorker : doneTasks) {
try {
Worker oneWorker = excellentWorker.get(1, TimeUnit.MILLISECONDS);
workTime = oneWorker.getWorkTime();
if (workTime > givenWorkTime) {
log.info(" ****** Worker id {} uses {}", oneWorker.getWorkerId(), workTime);
} else {
log.info("The work time of worker id {} is {}", oneWorker.getWorkerId(), workTime);
}
totalTime = workTime + totalTime;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("-----------------", e);
}
}
log.info("工人总的工作时长是:" + totalTime);
}
}
Worker Bean定义如下:
/**
* @Author Wiener
* @Date 2022-12-25
* @Description: 工人 Bean
*/
@Getter
@Setter
@ToString
public class Worker implements Serializable {
private static final long serialVersionUID = 8461989391177768538L;
/**
* 工人ID
*/
private Integer workerId;
/**
* 工作时长
*/
private Integer workTime;
}
基于线程池、CountDownLatch和FutureTask,定义boss线程与多个worker线程,并且使它们并发执行:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class LatchDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 使用future统计每个worker工作的时长,最后通过latch的await函数通知boss统计工时。
* 温馨提示,在给boss传递参数的时候,可以让boss直接获得future中的值,但是如果使用
* 这种方式,就没有必要使用latch了,因为在获取每个worker的值时需要使用future.get(),能够创建完成
* 参数的时候,worker线程应该已经结束了。所以就没有必要使用latch了。
*
* 如果像下面程序传递的是future,然后在boss的线程中对future进行取值,就是需要latch的。因为在boss线程
* 开始的时候future没有执行完成,需要latch等待2000ms后,才能保证有的future已经执行结束。
*
* 定义工人个数,数量越大,模拟的越逼真
*/
int workerNum = 21;
CountDownLatch downLatch = new CountDownLatch(workerNum);
ExecutorService executor = Executors.newFixedThreadPool(workerNum);
List<Future> workTimeList = new ArrayList<>();
for (int i = 0; i < workerNum; i ++) {
FutureTask<Worker> loopTask = new FutureTask<>(new WorkerWithResult(downLatch, i));
workTimeList.add(loopTask);
executor.submit(loopTask);
}
executor.submit(new BossWithResult(downLatch, workTimeList));
// 关闭线程池
executor.shutdown();
}
}
结果分析
随机抽取执行一次的结果,控制台打印信息如下:
18 used time: 115
14 used time: 208
20 used time: 327
16 used time: 411
15 used time: 437
17 used time: 759
6 used time: 867
7 used time: 937
13 used time: 1025
5 used time: 1029
19 used time: 1113
10 used time: 1125
1 used time: 1313
8 used time: 1316
0 used time: 1390
9 used time: 1581
3 used time: 2082
11 used time: 2137
18:08:56.125 [pool-1-thread-19] WARN com.swagger.demo.service.latch.BossWithResult - 响应超时: 2000 ms
18:08:56.139 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - 已完成task:18
18:08:56.140 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 0 is 1390
18:08:56.141 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 1 is 1313
18:08:56.141 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - ****** Worker id 3 uses 2082
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 5 is 1029
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 6 is 867
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 7 is 937
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 8 is 1316
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 9 is 1581
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 10 is 1125
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - ****** Worker id 11 uses 2137
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 13 is 1025
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 14 is 208
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 15 is 437
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 16 is 411
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 17 is 759
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 18 is 115
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 19 is 1113
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 20 is 327
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - 工人总的工作时长是:18172
2 used time: 2491
12 used time: 2919
4 used time: 4190
通过分析执行结果发现,批量执行的时候,由于需要处理之前的task,导致通过isDone()判断是否已经完成是不准确的,一部分超过超时时间才完成工作的工人也被定义为优秀员工。这一点需要各位老铁注意。如果你有好的规避方法,请不吝赐教,留言评论!
结束语
我确实最近比较忙,每个月挤不出几篇文章,虽然有点手痒痒,涂了皮炎平也抑制不住。
挡风玻璃为什么比后视镜大?因为前面的路比过去的更重要。你可以回头看,但别忘了前行。
标签:BossWithResult,解析,19,18,CountDownLatch,线程,time,FutureTask,latch From: https://www.cnblogs.com/east7/p/17008754.html