首页 > 其他分享 >开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖

时间:2023-02-21 16:07:53浏览次数:39  
标签:异步 执行 worker 并行 wrapper 线程 new 多线程 public


netty是一个经典的网络框架,提供了基于NIO、AIO的方式来完成少量线程支持海量用户请求连接的模型。netty里面充斥了大量的非阻塞回调模式,主要是靠Future/Promise异步模型来实现的。

Future是java.util.concurrent.Future,是Java提供的接口,可以用来做异步执行的状态获取,它避免了异步任务在调用者那里阻塞等待,而是让调用者可以迅速得到一个Future对象,后续可以通过Future的方法来获取执行结果。

Jdk的Future不便之处

Java的Future有一个比较尴尬的问题,就是当你想获取异步执行结果时,要通过future.get()方法,这一步还是阻塞的!而且我们无法确定到底异步任务何时执行完毕,提前get了,就还是阻塞,get晚了,可能会漏掉执行结果,写个死循环,不停去轮询是否执行完毕,又浪费资源。所以,这个Future并不好用。

先来看一下Java的future使用:

import java.util.concurrent.*;

/**
* @author wuweifeng wrote on 2019-12-10
* @version 1.0
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
Future future = executor.submit(new Task());
//这一步get会阻塞当前线程
System.out.println(future.get());

executor.shutdown();
}

private static class Task implements Callable<Integer> {

@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(2000);
return 1;
}

}

}

代码很简单,就是将一个Runnable、Callable的实例放到一个线程池里,就会返回一个Future对象。后续通过future.get()取得执行结果,但事实上代码并没有达到异步回调的结果,而是get时阻塞了。

Netty future无法单独抽出来使用

理想状态其实是netty的future,可以添加Listener,当异步任务执行完毕后,主动回调一下自己就可以了,不必在那苦等get()方法的执行结果。

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖_bootstrap

看一个netty的回调的小例子:

EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();

bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
//10秒没消息时,就发心跳包过去
.addLast(new IdleStateHandler(10, 0, 0), new NettyClientHandler())
;
}
});
ChannelFuture channelFuture = bootstrap.connect().sync().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
//do your job
}
});

我们可以理解为bootstrap.connect这一步是一个耗时操作,我不想在那等待它执行完毕,而是希望它执行完毕后主动给我一个回调即可。所以,在connect后面有个addListener,当connect完成后,会回调operationComplete方法。

可以看到netty的这种回调方式比较优雅,不像java的future那样需要阻塞get。整个netty里面大量充斥着类似的回调,但是如果我们要用,仅仅是针对一个或多个异步任务,希望能有个类似的回调,netty就帮不上忙了。

打开netty的源码,想搞明白future、promise的逻辑,一眼看去,心情是这样的:

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖_bootstrap_02

如何自己实现一个简单带回调的异步任务

netty是为特定的场景设计的,里面的各种逻辑也是为了服务于netty本身。当看不懂,或难以理解它的工作逻辑时,我们可以考虑自己实现一个对任意异步线程进行回调的框架。

首先我们来拆分一下需求,我有N个耗时任务,可能是一次网络请求,可能是一个耗时文件IO,可能是一堆复杂的逻辑,我在主线程里发起这个任务的调用,但不希望它阻塞主线程,而期望它执行完毕(成功\失败)后,来发起一次回调,最好还有超时、异常的回调控制。

据此,我们拆分出几个角色,master主线程,调度器(发起异步调用),worker(异步工作线程)。然后就是将他们组合起来,完成各种异步回调,以及每个worker的正常、异常、超时等的回调。

 

下面来看一下worker的定义:

/**
* @author wuweifeng wrote on 2019-12-13
* @version 1.0
*/
public interface Worker {
String action(Object object);
}

一个worker,它需要有个方法,来代表这个worker将来做什么,action就可以理解为一个耗时任务。action可以接收一个参数。

再看一下回调器的定义:

/**
* @author wuweifeng wrote on 2019-12-13
* @version 1.0
*/
public interface Listener {
void result(Object result);
}

这个listener用来做为回调,将worker的执行结果,放到result的参数里。

此外,我们还需要一个包装器Wrapper,来将worker和回调器包装一下。

public class Wrapper {
private Object param;
private Worker worker;
private Listener listener;

public Object getParam() {
return param;
}

public void setParam(Object param) {
this.param = param;
}

public Worker getWorker() {
return worker;
}

public void setWorker(Worker worker) {
this.worker = worker;
}

public Listener getListener() {
return listener;
}

public void addListener(Listener listener) {
this.listener = listener;
}
}

OK,下面就是主逻辑了。

/**
* @author wuweifeng wrote on 2019-12-13
* @version 1.0
*/
public class Bootstrap {

public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();

Worker worker = bootstrap.newWorker();

Wrapper wrapper = new Wrapper();
wrapper.setWorker(worker);
wrapper.setParam("hello");

bootstrap.doWork(wrapper).addListener(new Listener() {
@Override
public void result(Object result) {
System.out.println(Thread.currentThread().getName());
System.out.println(result);
}
});

System.out.println(Thread.currentThread().getName());

}

private Wrapper doWork(Wrapper wrapper) {
new Thread(() -> {
Worker worker = wrapper.getWorker();
String result = worker.action(wrapper.getParam());
wrapper.getListener().result(result);
}).start();

return wrapper;
}

private Worker newWorker() {
return new Worker() {
@Override
public String action(Object object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return object + " world";
}
};
}

}

执行结果如下:

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖_多线程任意编排_03

可以看到主线程没有被耗时的线程阻塞掉,耗时线程在执行完毕后,进行了回调。

这就是一个简单的设计模式——“监听器模式”,再来认识一下这种设计模式的三个要素:事件源(被监听的对象)、事件对象(事件完毕这个动作)、监听器(我们的Listener)。

完成了这样的小demo,立马从netty的复杂中恢复了过来,心情变成了这样:

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖_多线程任意编排_04

实现一个简单带回调、超时的异步任务

public class BootstrapNew {

public static void main(String[] args) {
BootstrapNew bootstrap = new BootstrapNew();

Worker worker = bootstrap.newWorker();

Wrapper wrapper = new Wrapper();
wrapper.setWorker(worker);
wrapper.setParam("hello");
//添加结果回调器
wrapper.addListener(new Listener() {
@Override
public void result(Object result) {
System.out.println(result);
}
});

CompletableFuture future = CompletableFuture.supplyAsync(() -> bootstrap.doWork(wrapper));
try {
future.get(800, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
//超时了
wrapper.getListener().result("time out exception");
}

}

private Wrapper doWork(Wrapper wrapper) {
Worker worker = wrapper.getWorker();
String result = worker.action(wrapper.getParam());
wrapper.getListener().result(result);

return wrapper;
}

private Worker newWorker() {
return new Worker() {
@Override
public String action(Object object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return object + " world";
}
};
}

}

执行结果如下:

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖_并行框架_05

这就是一个简单的带超时回调的小demo了。

更复杂的场景:要有任务的顺序编排,要有超时控制,要支持N个线程并行、串行、串并行结合

上面的demo过于简单,也不能实战于复杂的业务场景。那么需求来了,希望有这样一个并发框架:

以下的执行单元就是worker,可以理解为一个任务,一段耗时代码。

> 1 提供任何形式的串行、并行执行单元的组合。如a、b、c的串行,a、b的串行同时与c并行,a、b、c的并行

所以这一组执行单元可能长如下的样子:

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖_多线程任意组合_06

> 2 为每个执行单元提供执行成功、失败、超时、异常的回调

方便对整个流程的执行进行控制,当有很多个执行单元时,我们会非常关注每一个执行单元的执行状况,而不仅仅是全部执行完毕后的结果汇总。

> 3 支持为单个执行单元设置超时和失败后的默认值

有了默认值,可以进一步减少bug产生的概率

 

> 4 支持下一个执行单元获取上一个执行单元的返回值(计算结果),作为自己的入参

譬如a-b-c串行,可以在任务编排时,就让b的入参为a的执行结果,即便此时各任务都还未执行

 

> 5 支持为整个group(多个任意组合的执行单元)设置超时时间。单个执行单元失败,不影响其他单元的回调和最终结果获取

防止整个流程无限时的执行下去,要给它设置超时的阈值。

> 6 执行顺序的强依赖和弱依赖

如上图3,A和B并发执行,最后是C。

有些场景下,我们希望A和B都执行完毕后,才能执行C,CompletableFuture里有个allOf(futures...).then()方法可以做到。

有些场景下,我们希望A或者B任何一个执行完毕,就执行C,CompletableFuture里有个anyOf(futures...).then()方法可以做到。

那么,可以指定依赖的任务是否must执行完毕。如果依赖的是must要执行的,那么就一定会等待所有的must依赖项全执行完毕,才执行自己。

如果依赖的都不是must,那么就可以任意一个依赖项执行完毕,就可以执行自己了。

> 7 支持每个group独享线程池,或所有group共享线程池

交给调用者来决定将这组任务,放到共享线程池,还是独享线程池。如果你熟悉hystrix的话,应该明白线程池隔离的重要性。

 

> 8 更少的线程数,更高的性能表现

充分复用依赖的任务的线程,不为每个任务单元开辟新线程,而是复用依赖项的线程。从而减少线程数量,减少cpu轮转切换,细微之处,压榨性能。

 

整体上要实现以上所有还是有点麻烦的,这里我挑一个图3为例,简单描述一下实现方式。

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖_并发框架_07

执行A比较简单,直接在主线程里执行它,或者新开线程执行它都可以,主要是A执行完毕后,当发现自己的nextWrappers有多个(即自己后面有多个执行单元)时,该怎么办。

还好,CompleteableFuture提供了allOf这个方法,它可以让你传入多个future,并且能够等待这多个future都完成时再统一返回。见下图代码。

开源异步并行框架,完成任意的多线程编排、阻塞、等待、串并行结合、强弱依赖_多线程任意编排_08

其他的场景实现细节可以后续看代码或者联系作者。

 

框架具备了上面的功能后,既可以回调,也能同步返回结果,还能为一组任务配置超时时间。任务失败了、超时了,还会返回设定的默认值。

在业务中就可以将框架应用于如下的一些场景了:

1 客户端请求服务端接口,该接口需要调用其他N个微服务的接口。譬如 请求我的订单,那么就需要去调用用户的rpc、商品详情的rpc、库存rpc、优惠券等等好多个服务。同时,这些服务还有相互依赖关系,譬如必须先拿到用户的某个字段后,再去某rpc服务请求数据。 最终全部获取完毕后,或超时了,就汇总结果,返回给客户端。

2 任务是工作流性质的,希望一次编排后,就不用管它了,让它按照规则执行,直至成功或失败。譬如,数据清洗时经常有类似场景,从多个数据源拉取数据,各种合并组合,最后清洗完毕后结束。

3 爬虫的相关场景。

 

目前,基于线程池和CompletableFuture已经完成了这样的一个并发框架,由于代码较多,不便于贴在文章内,有需要的,或者有其他需求该并发框架不能满足的,可以联系 ​​[email protected]​​ .

后续会推出框架的测试数据、性能对比、使用场景指南等,敬请留意。

 


标签:异步,执行,worker,并行,wrapper,线程,new,多线程,public
From: https://blog.51cto.com/u_13706148/6076638

相关文章

  • springboot 使用@Async注解实现异步多线程
    1、在启动类中添加注解@SpringBootApplication@EnableAsync//@ImportResource(locations={"classpath:spring/my.xml"})publicclassDemoApplication{publi......
  • Vue 中 Promise 的then方法异步使用及async/await 异步使用总结
    转载请注明出处:1.Promise的then方法使用then方法是 Promise中处理的是异步调用,异步调用是非阻塞式的,在调用的时候并不知道它什么时候结束,也就不会等到他返回......
  • 为啥Python多线程爬虫跑的慢?
    单线程和多线程进行数据抓取结果还是大有不同的,但是要值得注意的事,如果多线程没调配好可能连单线程的效率都比不上。本次就和大家一起聊一聊单线程多线程的一些需要注意的......
  • 多线程并发(二):聊聊AQS中的共享锁实现原理
    在上一篇文章多线程并发(一)中我们通过acquire()详细地分析了AQS中的独占锁的获取流程,提到独占锁,自然少不了共享锁,所以我们这边文章就以AQS中的acquireShared()方法为例,......
  • Python多线程如何保证数据安全
    之前有一篇文章分享了有关Python多线程的一次基础语法以及GIL的相关概念,今天我们重点讲解多线程的数据安全问题。数据安全问题我们首先来举一个例子,这里定义两个函数,一个......
  • 读Java实战(第二版)笔记16_组合式异步编程
    1. 同步API1.1. 阻塞式调用1.2. 调用了某个方法,调用方在被调用方执行的过程中会等待,被调用方执行结束返回,调用方取得被调用方的返回值并继续运行2. 异步API2.1. ......
  • 111、商城业务---订单服务---Feign异步调用丢失请求头问题
    原先我们是在ThreadLocal共享我们的用户的登录信息,但是只能在一个线程内取到,就比如上一节所介绍的那样。但是我们为了提高效率,使用了线程池这就出现了不同线程。因此在......
  • 多线程频繁上下文切换
    什么是上下文切换在单核cpu中,多线程的执行是通过cpu的时间片分配,每个线程会分配到一个时间片,循环执行这些线程,线程时间片消耗完了就会进入等待状态,直到分配到新的时间......
  • C# TPL之Parallel 并行库解密
    Parallel.For、Parallel.Foreach的要求:同样的数据类型,例如:List<T>,Dictionary<T,F>,IEnumerable<T>,等等集合类的操作执行相同的函数:在Parallel.For或者Parallel.Forea......
  • Java多线程分块下载器
    '''javaimportjava.io.*;importjava.net.HttpURLConnection;importjava.net.URL;importjava.nio.file.Files;importjava.nio.file.Path;importjava.nio.file.S......