FabricServer 主要是对于dremio内部rpc 的通信,包含了调度节点与执行节点的通信以及内部的一些命令处理
FabricServer 是通过FabricServiceImpl 进行实际服务的创建管理(dremio 服务组件的一个套路实现了service 接口会
包含start 方法)
FabricServer提供的能力
可以看到主要是服务的初始化,以及链接管理处理(FabricMessageHandler 进行处理的,内部会使用不同的FabricProtocol 协议实现处理)
参考类图
FabricServer 的启动
FabricServiceImpl 实现中处理的
protected FabricServer newFabricServer() throws Exception {
return new FabricServer(getAddress(), getHandler(), getRpcConfig(), getAllocator(), getRegistry(), getEventLoop());
}
FabricProtocol 协议实现
主要是包装的FabricProtocol,还是比较多,包含了各种组件之间通信的处理(协调节点到执行节点,执行节点到执行节点),具体的可以查看实际源码
命令处理
通过FabricCommandRunner 处理,会委托给ProxyCommand,FabricServiceImpl
- 参考代码
private static class CommandRunner implements FabricCommandRunner {
private FabricProtocol protocol;
private FabricConnectionManager manager;
public CommandRunner(FabricProtocol protocol, FabricConnectionManager manager) {
super();
this.protocol = protocol;
this.manager = manager;
}
@Override
public <R extends MessageLite, C extends RpcCommand<R, ProxyConnection>> void runCommand(C cmd) {
// 对于Fabric服务会通过FabricProtocol 协议处理
manager.runCommand(new ProxyCommand<>(cmd, protocol));
}
}
服务中是通过FabricRunnerFactory 使用的
// 提供了一个协议注册的地方,方便其他服务集成使用(进行FabricProtocol 子类的注册)
@Override
public FabricRunnerFactory registerProtocol(FabricProtocol protocol) {
handler.registerProtocol(protocol);
return new RunnerFactory(protocol);
}
// 通过协议获取
@Override
public FabricRunnerFactory getProtocol(int id) {
FabricProtocol protocol = handler.getProtocol(id);
Preconditions.checkNotNull(protocol);
return new RunnerFactory(protocol);
}
FabricRunnerFactory 的定义,主要提供了如何获取commandrunner会调用进行处理 FabricConnectionManager的runCommand
进行链接的处理
不少服务会调用FabricRunnerFactory的能力进行rpc 命令的执行
rpc 响应
主要是通过接口的实现进行response 的处理(毕竟rpc 是需要一个返回的)
// ResponseSender 接口进行数据响应的处理,具体的实现子类可以参考上边的FabricProtocol 协议介绍
public void handle(final PhysicalConnection connection, final int rpcType, final ByteString pBody, final ByteBuf dBody, final ResponseSender sender) throws RpcException;
类似fabricserver 的实现类
如下图,还是包含了不少的,包含了内部使用的rpc 以及客户端应用使用的,都是以RpcBus 做为核心的
说明
dremio 还包含了一个UserRPCServer,我以前简单介绍过(通过UserServer 启动的),其他类似rpc 的处理都是一致的,可以举一反三
参考资料
services/fabric-rpc/src/main/java/com/dremio/services/fabric/FabricServer.java
services/fabric-rpc/src/main/java/com/dremio/services/fabric/FabricServiceImpl.java
services/fabric-rpc/src/main/java/com/dremio/services/fabric/api/FabricProtocol.java
services/base-rpc/src/main/java/com/dremio/exec/rpc/BasicServer.java
services/base-rpc/src/main/java/com/dremio/exec/rpc/RpcBus.java
services/fabric-rpc/src/main/java/com/dremio/services/fabric/api/FabricCommandRunner.java
services/base-rpc/src/main/java/com/dremio/exec/rpc/RpcCommand.java
services/fabric-rpc/src/main/java/com/dremio/services/fabric/ConnectionManagerRegistry.java
services/fabric-rpc/src/main/java/com/dremio/services/fabric/api/FabricRunnerFactory.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/ExecToCoordTunnelCreator.java
services/fabric-rpc/src/main/java/com/dremio/services/fabric/FabricMessageHandler.java
sabot/kernel/src/main/java/com/dremio/exec/work/rpc/CoordToExecTunnelCreator.java
sabot/kernel/src/main/java/com/dremio/exec/service/executor/ExecutorServiceProductClient.java
services/fabric-rpc/src/main/java/com/dremio/services/fabric/FabricConnection.java
services/fabric-rpc/src/main/java/com/dremio/services/fabric/FabricConnectionManager.java
https://www.cnblogs.com/rongfengliang/p/17013321.html