首页 > 其他分享 >dremio UserServer 简单说明

dremio UserServer 简单说明

时间:2022-12-31 10:33:37浏览次数:47  
标签:netty dremio java AbstractChannelHandlerContext UserServer io 简单 channel

UserServer 目的是对于UserRPCServer 进行生命周期管理(基于netty 开发的)主要是处理非web 请求(实际上就是直接链接的服务)
从官方源码上UserServer 的创建只能在协调节点(当然候选节点也是可以执行rpc 服务的)

参考创建

DACDaemonModule

 

if (isCoordinator) {
registry.bindSelf(
new UserServer(
config,
registry.provider(java.util.concurrent.ExecutorService.class),
registry.provider(BufferAllocator.class),
registry.provider(UserService.class),
registry.provider(NodeEndpoint.class),
registry.provider(UserWorker.class),
dacConfig.autoPort,
bootstrapRegistry.lookup(Tracer.class),
registry.provider(OptionValidatorListing.class)
)
);
}

UserRPCServer的启动

因为实现了service 接口,包含了start,具体对于UserRPCServer 的启动就是在此处的
UserServer

 

public void start() throws Exception {
final SabotConfig sabotConfig = config.getSabotConfig();
allocator = bufferAllocator.get()
.newChildAllocator(
"rpc:user",
sabotConfig.getLong("dremio.exec.rpc.user.server.memory.reservation"),
sabotConfig.getLong("dremio.exec.rpc.user.server.memory.maximum"));
// 具有netty 的服务启动
final EventLoopGroup eventLoopGroup = TransportCheck
.createEventLoopGroup(sabotConfig.getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");
eventLoopCloseable = new EventLoopCloseable(eventLoopGroup);

 

server = newUserRPCServer(eventLoopGroup);

 

Metrics.newGauge("rpc.user.current", allocator::getAllocatedMemory);
Metrics.newGauge("rpc.user.peak", allocator::getPeakMemoryAllocation);
// 默认端口为31010
int initialPort = sabotConfig.getInt(DremioClient.INITIAL_USER_PORT);
if(allowPortHunting){
initialPort += 333;
}

 

port = server.bind(initialPort, allowPortHunting);
}

 

 

UserRPCServer 类图

dremio UserServer 简单说明_java

 

 

rpc 内部任务的实行依赖了UserWorker,具体实现是在ForemenWorkManager 中定义的,也比较符合drill 的特点,foreman 是一个long running 任务
主要支持的协议处理
UserRPCServer 中的WorkIngestorImpl 类

 

// 会包含不同的rpc 请求类型
@Override
public void feedWork(UserClientConnectionImpl connection, int rpcType, byte[] pBody, ByteBuf dBody,
ResponseSender responseSender) throws  RpcException {
final UserWorker worker = this.worker.get();
final TerminationListenerRegistry registry = connection;
switch (rpcType) {

 

case RpcType.RUN_QUERY_VALUE: {
logger.debug("Received query to run.  Returning query handle.");
final RunQuery query = parse(pBody, RunQuery.PARSER, RunQuery.class);
UserRequest request = new UserRequest(RpcType.RUN_QUERY, query);
final ExternalId externalId = ExternalIdHelper.generateExternalId();
// UserWorker 请求会直接到ForemenWorkManager的submitWork,实际执行需要依赖CommandPool,可以参考以前写的简单的说明
worker.submitWork(externalId, connection.getSession(), new UserConnectionResponseHandler(connection), request, registry);
responseSender.send(new Response(RpcType.QUERY_HANDLE, ExternalIdHelper.toQueryId(externalId)));
break;
}

 

case RpcType.CANCEL_QUERY_VALUE: {
final QueryId queryId = parse(pBody, QueryId.PARSER, QueryId.class);
final Ack ack = worker.cancelQuery(ExternalIdHelper.toExternal(queryId),
connection.getSession().getCredentials().getUserName());
responseSender.send(new Response(RpcType.ACK, ack));
break;
}

 

case RpcType.RESUME_PAUSED_QUERY_VALUE: {
final QueryId queryId = parse(pBody, QueryId.PARSER, QueryId.class);
final Ack ack = worker.resumeQuery(ExternalIdHelper.toExternal(queryId));
responseSender.send(new Response(RpcType.ACK, ack));
break;
}

 

case RpcType.GET_CATALOGS_VALUE: {
final GetCatalogsReq req = parse(pBody, GetCatalogsReq.PARSER, GetCatalogsReq.class);
UserRequest request = new UserRequest(RpcType.GET_CATALOGS, req);
worker.submitWork(connection.getSession(), new MetadataProvider.CatalogsHandler(responseSender), request, registry);
break;
}

 

case RpcType.GET_SCHEMAS_VALUE: {
final GetSchemasReq req = parse(pBody, GetSchemasReq.PARSER, GetSchemasReq.class);
UserRequest request = new UserRequest(RpcType.GET_SCHEMAS, req);
worker.submitWork(connection.getSession(), new MetadataProvider.SchemasHandler(responseSender), request, registry);
break;
}

 

case RpcType.GET_TABLES_VALUE: {
final GetTablesReq req = parse(pBody, GetTablesReq.PARSER, GetTablesReq.class);
UserRequest request = new UserRequest(RpcType.GET_TABLES, req);
worker.submitWork(connection.getSession(), new MetadataProvider.TablesHandler(responseSender), request, registry);
break;
}

 

case RpcType.GET_COLUMNS_VALUE: {
final GetColumnsReq req = parse(pBody, GetColumnsReq.PARSER, GetColumnsReq.class);
UserRequest request = new UserRequest(RpcType.GET_COLUMNS, req);
worker.submitWork(connection.getSession(), new MetadataProvider.ColumnsHandler(responseSender), request, registry);
break;
}

 

case RpcType.CREATE_PREPARED_STATEMENT_VALUE: {
final CreatePreparedStatementReq req = parse(pBody, CreatePreparedStatementReq.PARSER, CreatePreparedStatementReq.class);
UserRequest request = new UserRequest(RpcType.CREATE_PREPARED_STATEMENT, req);
worker.submitWork(connection.getSession(), new PreparedStatementProvider.PreparedStatementHandler(responseSender), request, registry);
break;
}

 

case RpcType.CREATE_PREPARED_STATEMENT_ARROW_VALUE: {
final CreatePreparedStatementArrowReq req = parse(pBody, CreatePreparedStatementArrowReq.PARSER, CreatePreparedStatementArrowReq.class);
UserRequest request = new UserRequest(RpcType.CREATE_PREPARED_STATEMENT_ARROW, req);
worker.submitWork(connection.getSession(), new PreparedStatementProvider.PreparedStatementArrowHandler(responseSender), request, registry);
break;
}

 

case RpcType.GET_SERVER_META_VALUE: {
final GetServerMetaReq req = parse(pBody, GetServerMetaReq.PARSER, GetServerMetaReq.class);
UserRequest request = new UserRequest(RpcType.GET_SERVER_META, req);
worker.submitWork(connection.getSession(), new ServerMetaProvider.ServerMetaHandler(responseSender), request, registry);
break;
}

 

default:
throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));
}
}

 

}

执行调用参考

可以看看实际rpc 的执行情况

ts=2022-12-29 07:12:15;thread_name=USER-rpc-event-queue;id=95;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.dremio.sabot.rpc.user.UserRPCServer.handle()
at com.dremio.exec.rpc.RpcBus$RequestEvent.run(RpcBus.java:475)
at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)
at com.dremio.exec.rpc.RpcBus$SameExecutor.execute(RpcBus.java:341)
at com.dremio.common.SerializedExecutor.execute(SerializedExecutor.java:129)
at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:375)
at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:346)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:750)

 

ts=2022-12-29 07:12:15;thread_name=out-of-band-observer;id=ae9;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.dremio.sabot.rpc.user.UserRPCServer.access$000()
at com.dremio.sabot.rpc.user.UserRPCServer$UserClientConnectionImpl.sendResult(UserRPCServer.java:575)
at com.dremio.exec.work.protector.UserConnectionResponseHandler.completed(UserConnectionResponseHandler.java:36)
at com.dremio.service.jobs.LocalJobsService$QueryListener.execCompletion(LocalJobsService.java:1731)
at com.dremio.exec.planner.observer.OutOfBandQueryObserver$1.run(OutOfBandQueryObserver.java:50)
at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)
at com.dremio.context.RequestContext.run(RequestContext.java:96)
at com.dremio.common.concurrent.ContextMigratingExecutorService.lambda$decorate$3(ContextMigratingExecutorService.java:199)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

 

ts=2022-12-29 07:12:15;thread_name=USER-rpc-event-queue;id=95;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.dremio.sabot.rpc.user.UserRPCServer.getResponseDefaultInstance()
at com.dremio.exec.rpc.RpcBus$ResponseEvent.run(RpcBus.java:518)
at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)
at com.dremio.exec.rpc.RpcBus$SameExecutor.execute(RpcBus.java:341)
at com.dremio.common.SerializedExecutor.execute(SerializedExecutor.java:129)
at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:380)
at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:346)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:750)

说明

dremio 的默认jdbc 请求处理是基于UserServer的包装处理的,实际是UserRPCServer ,内部实现基于了netty进行网络处理
对于想了解实际客户端执行的可以参考,同时建议深入再看看源码

参考资料

sabot/kernel/src/main/java/com/dremio/sabot/rpc/user/UserServer.java
services/base-rpc/src/main/java/com/dremio/exec/rpc/BasicServer.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
sabot/kernel/src/test/java/com/dremio/exec/server/SabotNode.java
sabot/kernel/src/test/java/com/dremio/sabot/rpc/user/TestUserRpcServer.java
sabot/kernel/src/main/java/com/dremio/exec/work/protector/UserWorker.java
sabot/kernel/src/main/java/com/dremio/sabot/rpc/user/WorkIngestor.java
sabot/kernel/src/main/java/com/dremio/exec/work/protector/ForemenWorkManager.java

标签:netty,dremio,java,AbstractChannelHandlerContext,UserServer,io,简单,channel
From: https://blog.51cto.com/rongfengliang/5982001

相关文章

  • 微信聊天记录删除了怎么恢复,简单几步搞定!
    在当下社会,微信已经成为了,我们生活中不可缺少的一部分,更有夸张的说法,你可以一天不吃饭,可是不能一天不打开微信APP进行聊天,而很多用户在使用微信进行聊天的时候会发现,与QQ不......
  • C# WinForm国际化实现的简单方法
    本文是完善:https://www.jb51.net/article/45675.htm文章描述很详细,重要的代码都贴出来了我帮大家懒到家,将窗口代码和资源文件也传上来到手可运行,直观的了解多......
  • 基于WPF实现简单放大镜效果
    原文网址:https://www.jb51.net/article/271074.htmWPF如何实现简单放大镜框架使用.NET40;VisualStudio2019;实现此功能需要用到 VisualBrush ,放大镜展现使用 Can......
  • 使用Python3+PyQT5+Pyserial实现简单的串口工具方法
    练手项目,先上图先实现一个简单的串口工具,为之后的上位机做准备代码如下:github下载地址pyserial_demo.pyimportsysimportserialimportserial.tools......
  • RAM——单口、双口、简单双口、真双口的区别
    单口 RAM(SingleRAM)、双口RAM(DualRAM)、简单双口RAM(Simple-Dual RAM)、真双口RAM(True-DualRAM)有什么不同?对于 分布式RAM,支持简单双口RAM和双口RAM,不能配置成真......
  • MySQL 5.7 版本的安装及简单使用(图文教程)
    MySQL5.7版本的安装使用详细教程写得还是比较详细,甚至有些繁琐,有很多步骤在其他的教程文档里都是省略掉的,但是我还是要写出来,因为我当时走了很多弯路,我希望你们能够避免我......
  • 简单hibernate5.2.10入门配置
    1、官网下载hibernate5相关jar包​​http://hibernate.org/orm/​​2、解压后,在lib目录中找到required目录下的jar包添加至项目。再添加数据库驱动包,博主用MySQLjar包添加......
  • eclipse下简单配置struts2.5.8
    1.下载structs2.5.8jar包。首先去官方下载structs2.5.8的jar包。下载地址:http://struts.apache.org/download.cgi#struts258点击下载完成之后,将该压缩包进行解压。2.在eclip......
  • Hibernate环境部署与注解简单使用
    Hibernate是对象关系映射框架,它对JDBC进行了非常轻量级的对象封装,对对象进行持久化。它可以自动生成SQL语句,自动执行。简单的说就是帮你把XXXDAO的繁琐工作都给自动完成了,要......
  • nmcli 命令简单使用
    centos7/8机器上默认有安装nmcli,可直接使用修改ip。nmclicaddtypeethernetifnameeth0#会提示‘ethernet-eth0’创建成功nmclicmodethernet-eth0ipv4.me......