首页 > 其他分享 >guava异步增强-ListenableFuture

guava异步增强-ListenableFuture

时间:2022-12-03 23:31:31浏览次数:56  
标签:info 异步 log Futures ListenableFuture result new guava


jdk原生的future已经提供了异步操作,但是不能直接回调。guava对future进行了增强,核心接口就是ListenableFuture。如果已经开始使用了jdk8,可以直接学习使用原生的CompletableFuture,这是jdk从guava中吸收了精华新增的类。

guava 对 jdk 的异步增强可以通过看 ​​MoreExecutors​​​ 和 ​​Futures​​ 两个类的源码入手

Guava 异步回调简单应用

private final Executor executor = Executors.newFixedThreadPool(20);

@GetMapping(value = "/doJob")
public Response doJob() {
// ListenableFutureTask 通过静态create方法返回实例, 还有一个重载方法, 不太常用
ListenableFutureTask<String> task = ListenableFutureTask.create(() -> {
log.info("模拟异步耗时任务, 休眠10秒");
Thread.sleep(10000);
return "Jaemon";
});

executor.execute(task);

// 增加回调方法, MoreExecutors.directExecutor()返回guava默认的Executor, 执行回调方法不会新开线程, 所有回调方法都在当前线程做(可能是主线程或者执行ListenableFutureTask的线程, 具体可以看最后面的代码)
// guava异步模块中参数有Executor的方法, 一般还会有一个没有Executor参数的重载方法, 使用的就是MoreExecutors.directExecutor()
task.addListener(() -> {
try {
String result = task.get();
log.info("获取异步结果, result=[{}]", result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, MoreExecutors.directExecutor());

return Response.success();
}

DirectExecutor 源码

public final class MoreExecutors {
// ...

public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}

// ...
}

enum DirectExecutor implements Executor {
INSTANCE;

@Override
public void execute(Runnable command) {
command.run();
}

@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}

 

一般使用异步模式的时候,都会用一个线程池来提交任务,不会像上面那样简单的开一个线程去做,那样效率太低下了,所以需要说说guava对jdk原生线程池的封装。guava对原生线程池的增强都在MoreExecutor类中,guava对ExecutorService和ScheduledExecutorService的增强类似,这里只介绍ExecutorService的增强。

Guava的异步回调

private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 20, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new CustomizableThreadFactory("asyncThread-"),
new ThreadPoolExecutor.DiscardPolicy());

@GetMapping(value = "/doJob")
public Response doJob() throws Exception {
final int count = 10;
List<String> seqNos = new ArrayList<>();
for (int i = 0; i < count; i++) {
seqNos.add(String.valueOf(i));
}
ListenableFuture<String> listenableFuture;
// guava的接口ListeningExecutorService继承了jdk原生ExecutorService接口,重写了submit方法,修改返回值类型为ListenableFuture
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

// 获得一个随着jvm关闭而关闭的线程池,通过Runtime.getRuntime().addShutdownHook(hook)实现
// 修改ThreadFactory为创建守护线程,默认jvm关闭时最多等待120秒关闭线程池,重载方法可以设置时间
// ExecutorService newPoolExecutor = MoreExecutors.getExitingExecutorService(threadPoolExecutor);

// 只增加关闭线程池的钩子,不改变ThreadFactory
// MoreExecutors.addDelayedShutdownHook(threadPoolExecutor, 120, TimeUnit.SECONDS);

final CountDownLatch countDownLatch = new CountDownLatch(count);
long startTime = System.currentTimeMillis();

for (String seqNo : seqNos) {
listenableFuture = listeningExecutorService.submit(() -> {
log.info("seqNo=[{}]", seqNo);
if ("5".equals(seqNo)) {
Thread.sleep(10000);
throw new IllegalArgumentException("无效的参数异常");
}
return String.format("result seqNo=%s", seqNo);
});


/**
* 注册任务回调函数
* 底层其实就是对于 listenableFuture.addListener() 方法的封装
* 对于可以在任何线程中安全执行的快速、轻量级侦听器(不是很耗时和占用资源), 可以考虑使用 MoreExecutors.directExecutor(), 否则请避免使用
*/
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
log.info("获取的返回结果为=[{}]", result);
countDownLatch.countDown();
}
@Override
public void onFailure(Throwable throwable) {
log.error("抛异常啦=[{}]", throwable.getMessage());
countDownLatch.countDown();
}
}, MoreExecutors.directExecutor());
}

log.info("等待所有任务执行完毕");
countDownLatch.await(3, TimeUnit.SECONDS);
log.info("[{}]个异步任务共计耗时=[{}]ms", count, (System.currentTimeMillis() - startTime));
return Response.success();
}

 

Guava中Futures对于Future扩展

  • transform: 对于ListenableFuture的返回值进行转换
  • allAsList: 对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
  • successfulAsList: 和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。
  • immediateFuture/immediateCancelledFuture: 立即返回一个待返回值的ListenableFuture。
  • makeChecked: 将ListenableFuture 转换成CheckedFuture。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易
  • JdkFutureAdapters.listenInPoolThread(future): guava同时提供了将JDK Future转换为ListenableFuture的接口函数。

 

Guava 异步链式执行

private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 20, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new CustomizableThreadFactory("asyncThread-"),
new ThreadPoolExecutor.DiscardPolicy());


@GetMapping(value = "/doJob")
public Response doJob() throws Exception {
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

ListenableFutureTask<String> task1 = ListenableFutureTask.create(() -> {
TimeUnit.SECONDS.sleep(5);
log.info("任务1执行中...");
return "1";
});

listeningExecutorService.execute(task1);

// 当task1执行完毕会回调执行Function的apply方法, 如果有task1有异常抛出, 则task2也抛出相同异常, 不执行apply
ListenableFuture<String> task2 = Futures.transform(task1, input -> {
log.info("任务2执行中, input=[{}]", input);
return input + "-Answer";
}, threadPoolExecutor);

ListenableFuture<String> task3 = Futures.transform(task2, input -> {
log.info("任务3执行中, input=[{}]", input);
return input + "-Jaemon";
}, threadPoolExecutor);

// 处理最终的异步任务
Futures.addCallback(task3, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
log.info("成功啦, result=[{}]", result);
}

@Override
public void onFailure(Throwable t) {
log.error("失败啦。", t);
}
}, threadPoolExecutor);


return Response.success();
}

运行结果

2019-12-09 19:56:19.735 [Answer-AI-L] [jaemon-service]  INFO 9388 -- [  asyncThread-1] c.j.controller.MyController  :[93] 任务1执行中...
2019-12-09 19:56:19.740 [Answer-AI-L] [jaemon-service] INFO 9388 -- [ asyncThread-2] c.j.controller.MyController :[101] 任务2执行中, input=[1]
2019-12-09 19:56:19.742 [Answer-AI-L] [jaemon-service] INFO 9388 -- [ asyncThread-3] c.j.controller.MyController :[106] 任务3执行中, input=[1-Answer]
2019-12-09 19:56:19.744 [Answer-AI-L] [jaemon-service] INFO 9388 -- [ asyncThread-4] c.j.controller.MyController :[114] 成功啦, result=[1-Answer-Jaemon]

并行编程 Futures

private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 20, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new CustomizableThreadFactory("asyncThread-"),
new ThreadPoolExecutor.DiscardPolicy());


@GetMapping(value = "/doJob")
public Response doJob() throws Exception {
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

ListenableFuture<Integer> future1 = listeningExecutorService.submit(() -> {
log.info("任务1执行中...");
Thread.sleep(2000);
return 1;
});

ListenableFuture<Integer> future2 = listeningExecutorService.submit(() -> {
log.info("任务2执行中...");
Thread.sleep(3000);
// 如果此处抛出异常?
// if (true)
// throw new RuntimeException("dddddddddd");
return 2;
});

// 对多个 ListenableFuture 的合并, 如果某一个 ListenableFuture 执行抛出异常, 则失败
final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, future2);
// 和 allAsList 相似, 但某一个 ListenableFuture 执行抛出异常, 则返回null, 不影响程序继续执行
// final ListenableFuture<List<Integer>> allFutures = Futures.successfulAsList(future1, future2);

// 对于ListenableFuture的返回值进行转换
final ListenableFuture<String> transform = Futures.transform(allFutures, results -> {
log.info("results=[{}]", results);
return JSON.toJSONString(results);
}, threadPoolExecutor);
// final ListenableFuture<String> transform = Futures.transformAsync(allFutures, results -> Futures.immediateFuture(JSON.toJSONString(results)), threadPoolExecutor);

// 处理最终的异步任务
Futures.addCallback(transform, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
log.info("成功啦, result=[{}]",result);
}

@Override
public void onFailure(Throwable thrown) {
log.error("失败啦.", thrown);
}
}, threadPoolExecutor);

log.info("transform result=[{}]", transform.get());

return Response.success();
}

运行结果

2019-12-10 11:27:23.479 [Answer-AI-L] [jaemon-service]  INFO 7104 -- [  asyncThread-1] c.j.controller.MyController  :[90] 任务1执行中...
2019-12-10 11:27:23.479 [Answer-AI-L] [jaemon-service] INFO 7104 -- [ asyncThread-2] c.j.controller.MyController :[96] 任务2执行中...
2019-12-10 11:27:26.480 [Answer-AI-L] [jaemon-service] INFO 7104 -- [ asyncThread-3] c.j.controller.MyController :[111] results=[[1, 2]]
2019-12-10 11:27:26.527 [Answer-AI-L] [jaemon-service] INFO 7104 -- [nio-8888-exec-1] c.j.controller.MyController :[128] transform result=[[1,2]]
2019-12-10 11:27:26.527 [Answer-AI-L] [jaemon-service] INFO 7104 -- [ asyncThread-4] c.j.controller.MyController :[119] 成功啦, result=[[1,2]]
  • Futures.transform() 和 Futures.addCallback() 都是对 addListener 做了封装,进行回调的设置,但是 ​​transform更适合用在链式处理的中间过程,addCallback更适合用在处理最终的结果上。​
  • Futures.transform()和Futures.transformAsync()的区别在于一个参数为Function,一个是AsyncFuntion,AsyncFuntion的apply方法返回值类型也是ListenableFuture,也就是回调方法也是异步的。

 

参考网址


标签:info,异步,log,Futures,ListenableFuture,result,new,guava
From: https://blog.51cto.com/u_15891990/5908835

相关文章

  • .NET Socket开发之异步Socket
    在基于.NET的网络服务端的开发中,我们用到和听到的最多的恐怕就是异步Socket了。异步Socket的性能比同步高出很多,但是编写代码比较复杂。因此异步Socket也是网络上讨论比......
  • 搞定Dart的异步
    一.Dart的异步模型我们先来搞清楚Dart是如何搞定异步操作的1.1.Dart是单线程的1.1.1.程序中的耗时操作开发中的耗时操作:在开发中,我们经常会遇到一些耗时的操作......
  • vue多个方法的异步请求
    1、async和awaitasync/await是一种建立在Promise之上的编写异步或非阻塞代码的新方法。async是异步的意思,而await是asyncwait的简写,即异步等待。1//假设这是......
  • 重试机制的实现(Guava Retry)
    重试机制的实现重试作用:对于重试是有场景限制的,参数校验不合法、写操作等(要考虑写是否幂等)都不适合重试。远程调用超时、网络突然中断可以重试。外部RPC调用,或者数据......
  • 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
    需求:创建一个Stream类型的消息队列,名为stream.orders修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、order......
  • 秒杀优化-异步秒杀思路
    当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤1、查询优惠卷2、判断秒杀库存是否足够3、查询订单4、校验是否是......
  • 异步UI runOnUiThread 在Flutter中等价于什么
    异步UIrunOnUiThread在Flutter中等价于什么Dart是单线程执行模型,支持Isolates(在另一个线程上运行Dart代码的方式)、事件循环和异步编程。除非您启动一个Isolate,否则您的Da......
  • react中setState为什么设计成异步更新
    1.可以显著提升性能:因为每次调用setState进行更新,都会调用render函数,导致界面也会频繁更新,因此最好是获取到多个更新后,再进行批量更新。2.可以使state和props保持同步......
  • 异步方法代码片段
    VS=>工具=>代码片段管理器=>下拉选择语言==>导入/添加创建名为method.snippet的文件,然后将其导入<?xmlversion="1.0"encoding="utf-8"?><CodeSnippetsxmln......
  • springboot任务之异步任务
    1-新建工程,只选web模块2-新增service包,AsyncService类packagecom.example.springboottask.service;importorg.springframework.stereotype.Service;@Servicepublicclas......