Flink RPC 整体架构
Flink集群间组件的通信底层是使用的actor system通信模型和动态代理来实现的,先简单看下Flink RPC相关的类UML图
通信组件
RpcGateway
Flink RPC远程调用网关,是Flink RPC定义远程调用的接口协议,对外提供可调用的接口,所有实现RPC的组件,都要实现这个接口
从RpcGateway继承关系看,Flink的核心组件各自都有相关RpcGateway接口对外接口的实现,组件之间的通信都是通过RpcGateway进行交互的。
FencedRpcGateWay是用于解决集群脑裂问题的,JobMaster、ResourceManager、Dispatcher在高可用的模式下,涉及Leader的选举,可能导致集群脑裂,所以这几个组件都要实现FencedRpcGateWay这个接口。
EndPoint
Endpoint实现了RpcGateway接口,在RpcGateway的基础上提供了RPC服务组件的生命周期管理,Flink中涉及到RPC通信的组件,都要继承Endpoint。
在Flink的设计中,同一个RpcEndpoint中的所有调用只有一个线程来处理,叫做Endpoint主线程,与Actor模型一样,所有对状态数据的修改在同一个线程中执行,所以不存在并发问题。
RpcService
RpcService是Endpoint的成员变量,作用如下:
- 启动和停止RpcServer、连接RpcEndpoint
- 根据指定的连接地址,连接到RpcServer会返回一个RpcGateway。分为带FencingToken和不带Token的版本
- 延迟\立刻调度Runnable和Callable
RpcService会在ClusterEntrypoint(JobMaster)和TaskManagerRunner(TaskExecutor)启动的过程中初始化启动
private RpcService commonRpcService;
ClusterEntrypoint .runCluster ->
initializeServices ->
commonRpcService =
RpcUtils.createRemoteRpcService(
rpcSystem,
configuration,
configuration.get(JobManagerOptions.ADDRESS),
getRPCPortRange(configuration),
configuration.get(JobManagerOptions.BIND_HOST),
configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
PekkoRpcServie是RpcService的唯一实现,PekkoRpcService中包含了一个Actor System,内部提供了RpcServer的创建和启动(自身代理),保存了ActorRef和RpcEndpoint之间的映射关系(其他组件的代理)。RpcService会根据Endpoint类型的不同(Fenced和非Fenced),构建不同的ActorRef(Fenced和非Fenced),并保存RpcActor和Endpoint的关系,创建出来的RpcActor是底层调用的实际接收者,Rpc的请求在客户端封装成RpcInvocation对象,以Pekko/Akka消息的形式发送.
public interface RpcService extends AutoCloseableAsync {
//xxxx省略其他方法
/**
* 获取自身代理对象
*/
<C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer);
//连接远程rpc服务,返回远程代理对象
<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
String address, F fencingToken, Class<C> clazz);
}
RpcServer
RpcServer是Rpcendpoint的成员变量,负责接收远端Rpc消息请求。其中有两个实现:PekkoInvocationHandler和FencedPekkoInvocationHandler
RpcServer的启动实质上通知自身的RpcActor切换到START状态(状态初始化为STOP),然后开始处理远程的请求
AkkaRpcActor/PekkoRpcActor
Flink集群内部的通信依赖于Pekko(Akka的一个开源分支),PekkoRpcActor是其具体实现,用来接收Rpc的请求和发送消息,负责处理如下几类消息:
- 本地Rpc调用用LocalRpcInvocation
LocalRpcInvocation类型的调用指派给RpcEndpoint进行处理,如果有结果响应,则将响应结果返回至sender
- RunAsync&CallAsync
RunAsync和CallAsync 类型的消息带有可执行的代码,直接在Actor的线程执行
- 控制类消息 ControllerMessage
ControllerMessage用来控制Actor的行为,ControllerMessags#START启动Actor开始处理消息,Controller#STOP停止处理消息
PekkoRpcServer在RpcService.startServer() 中被创建
AkkaInvocationHandler/PekkoInvocation
继承了InvocationHandler接口,用于生成动态代理的实例,PekkoInvocationHandler类实现了InvocationHandler并实现invoke方法,是将代理类的方法,参数类型,参数封装为RpcInvocation对象,之后通过Pekko.tell 、Pekko.ask方法将RpcInvocation作为消息发送到代理接口所在的进程中。
Flink RPC交互流程
Flink不同组件之间在运行时,需要频繁的进行RPC通信,如JobMaster向TaskManager发送Task,JobMaster向ResourceManager请求Slot资源等,在底层的RPC分为请求和响应两类
RPC请求发送
RPC消息是通过RpcEndpoint的RpcService绑定的ActorRef发送的,以下是请求发送的流程:
- RpcServer中调用connect()方法与对端的Endpoint进行连接,connect方法根据给的地址返回InvocationHandler代理接口的代理对象(PekkoInvocationHandler或者FencedPekkoInvocationHandler的实例)
- 调用代理对象invok方法并传入RPC调用的方法和参数信息
- 代理对象的invoke方法进行判断:如果是RPC方法,则调用invokeRpc方法,将方法封装成RpcInvocation消息。如果是本地则生成LocalRpcInvocation,本地消息不需要序列化,如果是远程则封装成RemoteRpcInvocation
- 判断远程方法是否需要等待结果,如果无需等待(void),则使用Actor tell发送消息,如果需要等待返回,则使用Actor的ask发送消息
RPC请求响应
PekkoRpcActor是消息接收的入口,负责将消息交给不同的放法进行处理,代码如下:
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
//握手消息
.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
//控制消息
.match(ControlMessages.class, this::handleControlMessage)
//RPC消息
.matchAny(this::handleMessage)
.build();
}
PekkoRpcActor收到消息有3种处理方式:
- 握手消息
在客户端构造时会通过ActorSelection传过来,用于确定server侧是否正常
- 控制消息
例如,RpcEndpoint调用start方法后,会向自身发送一条Processing.START消息转换当前Actor的状态
- RPC消息
通过解析RpcInvocation获取方法名和参数类型,并从RpcEndpoint类中找到Method对象,通过反射调用该方法,如果有返回结果,会以Akka消息的形式发送回发送者。
总结
Flink集群内部通信框架的最低层依赖Akka/Pekko,定义了不同类型的消息,并且设计了通用的RPC通信组件,使用了动态代理进行接口管理,利于扩展维护。
标签:flink,Flink,RPC,RpcService,rpc,调用,消息,组件 From: https://blog.csdn.net/qq_40689430/article/details/143135266