首页 > 其他分享 >dremio UserServer 简单说明

dremio UserServer 简单说明

时间:2022-12-29 19:26:04浏览次数:46  
标签:netty dremio java AbstractChannelHandlerContext UserServer io 简单 channel

UserServer 目的是对于UserRPCServer 进行生命周期管理(基于netty 开发的)主要是处理非web 请求(实际上就是直接链接的服务)
从官方源码上UserServer 的创建只能在协调节点(当然候选节点也是可以执行rpc 服务的)

参考创建

DACDaemonModule

 
if (isCoordinator) {
  registry.bindSelf(
    new UserServer(
      config,
      registry.provider(java.util.concurrent.ExecutorService.class),
      registry.provider(BufferAllocator.class),
      registry.provider(UserService.class),
      registry.provider(NodeEndpoint.class),
      registry.provider(UserWorker.class),
      dacConfig.autoPort,
      bootstrapRegistry.lookup(Tracer.class),
      registry.provider(OptionValidatorListing.class)
    )
  );
}

UserRPCServer的启动

因为实现了service 接口,包含了start,具体对于UserRPCServer 的启动就是在此处的
UserServer

 
 public void start() throws Exception {
    final SabotConfig sabotConfig = config.getSabotConfig();
    allocator = bufferAllocator.get()
        .newChildAllocator(
            "rpc:user",
            sabotConfig.getLong("dremio.exec.rpc.user.server.memory.reservation"),
          sabotConfig.getLong("dremio.exec.rpc.user.server.memory.maximum"));
    // 具有netty 的服务启动
    final EventLoopGroup eventLoopGroup = TransportCheck
        .createEventLoopGroup(sabotConfig.getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");
    eventLoopCloseable = new EventLoopCloseable(eventLoopGroup);
 
    server = newUserRPCServer(eventLoopGroup);
 
    Metrics.newGauge("rpc.user.current", allocator::getAllocatedMemory);
    Metrics.newGauge("rpc.user.peak", allocator::getPeakMemoryAllocation);
  // 默认端口为31010
    int initialPort = sabotConfig.getInt(DremioClient.INITIAL_USER_PORT);
    if(allowPortHunting){
      initialPort += 333;
    }
 
    port = server.bind(initialPort, allowPortHunting);
  }
   

UserRPCServer 类图

 

 


rpc 内部任务的实行依赖了UserWorker,具体实现是在ForemenWorkManager 中定义的,也比较符合drill 的特点,foreman 是一个long running 任务
主要支持的协议处理
UserRPCServer 中的WorkIngestorImpl 类

 
// 会包含不同的rpc 请求类型
@Override
    public void feedWork(UserClientConnectionImpl connection, int rpcType, byte[] pBody, ByteBuf dBody,
                         ResponseSender responseSender) throws  RpcException {
    final UserWorker worker = this.worker.get();
    final TerminationListenerRegistry registry = connection;
    switch (rpcType) {
 
      case RpcType.RUN_QUERY_VALUE: {
        logger.debug("Received query to run.  Returning query handle.");
        final RunQuery query = parse(pBody, RunQuery.PARSER, RunQuery.class);
        UserRequest request = new UserRequest(RpcType.RUN_QUERY, query);
        final ExternalId externalId = ExternalIdHelper.generateExternalId();
        // UserWorker 请求会直接到ForemenWorkManager的submitWork,实际执行需要依赖CommandPool,可以参考以前写的简单的说明
        worker.submitWork(externalId, connection.getSession(), new UserConnectionResponseHandler(connection), request, registry);
        responseSender.send(new Response(RpcType.QUERY_HANDLE, ExternalIdHelper.toQueryId(externalId)));
        break;
      }
 
      case RpcType.CANCEL_QUERY_VALUE: {
        final QueryId queryId = parse(pBody, QueryId.PARSER, QueryId.class);
        final Ack ack = worker.cancelQuery(ExternalIdHelper.toExternal(queryId),
          connection.getSession().getCredentials().getUserName());
        responseSender.send(new Response(RpcType.ACK, ack));
        break;
      }
 
      case RpcType.RESUME_PAUSED_QUERY_VALUE: {
        final QueryId queryId = parse(pBody, QueryId.PARSER, QueryId.class);
        final Ack ack = worker.resumeQuery(ExternalIdHelper.toExternal(queryId));
        responseSender.send(new Response(RpcType.ACK, ack));
        break;
      }
 
      case RpcType.GET_CATALOGS_VALUE: {
        final GetCatalogsReq req = parse(pBody, GetCatalogsReq.PARSER, GetCatalogsReq.class);
        UserRequest request = new UserRequest(RpcType.GET_CATALOGS, req);
        worker.submitWork(connection.getSession(), new MetadataProvider.CatalogsHandler(responseSender), request, registry);
        break;
      }
 
      case RpcType.GET_SCHEMAS_VALUE: {
        final GetSchemasReq req = parse(pBody, GetSchemasReq.PARSER, GetSchemasReq.class);
        UserRequest request = new UserRequest(RpcType.GET_SCHEMAS, req);
        worker.submitWork(connection.getSession(), new MetadataProvider.SchemasHandler(responseSender), request, registry);
        break;
      }
 
      case RpcType.GET_TABLES_VALUE: {
        final GetTablesReq req = parse(pBody, GetTablesReq.PARSER, GetTablesReq.class);
        UserRequest request = new UserRequest(RpcType.GET_TABLES, req);
        worker.submitWork(connection.getSession(), new MetadataProvider.TablesHandler(responseSender), request, registry);
        break;
      }
 
      case RpcType.GET_COLUMNS_VALUE: {
        final GetColumnsReq req = parse(pBody, GetColumnsReq.PARSER, GetColumnsReq.class);
        UserRequest request = new UserRequest(RpcType.GET_COLUMNS, req);
        worker.submitWork(connection.getSession(), new MetadataProvider.ColumnsHandler(responseSender), request, registry);
        break;
      }
 
      case RpcType.CREATE_PREPARED_STATEMENT_VALUE: {
        final CreatePreparedStatementReq req = parse(pBody, CreatePreparedStatementReq.PARSER, CreatePreparedStatementReq.class);
        UserRequest request = new UserRequest(RpcType.CREATE_PREPARED_STATEMENT, req);
        worker.submitWork(connection.getSession(), new PreparedStatementProvider.PreparedStatementHandler(responseSender), request, registry);
        break;
      }
 
      case RpcType.CREATE_PREPARED_STATEMENT_ARROW_VALUE: {
        final CreatePreparedStatementArrowReq req = parse(pBody, CreatePreparedStatementArrowReq.PARSER, CreatePreparedStatementArrowReq.class);
        UserRequest request = new UserRequest(RpcType.CREATE_PREPARED_STATEMENT_ARROW, req);
        worker.submitWork(connection.getSession(), new PreparedStatementProvider.PreparedStatementArrowHandler(responseSender), request, registry);
        break;
      }
 
      case RpcType.GET_SERVER_META_VALUE: {
        final GetServerMetaReq req = parse(pBody, GetServerMetaReq.PARSER, GetServerMetaReq.class);
        UserRequest request = new UserRequest(RpcType.GET_SERVER_META, req);
        worker.submitWork(connection.getSession(), new ServerMetaProvider.ServerMetaHandler(responseSender), request, registry);
        break;
      }
 
      default:
        throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));
    }
  }
 
  }

执行调用参考

可以看看实际rpc 的执行情况

ts=2022-12-29 07:12:15;thread_name=USER-rpc-event-queue;id=95;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.dremio.sabot.rpc.user.UserRPCServer.handle()
        at com.dremio.exec.rpc.RpcBus$RequestEvent.run(RpcBus.java:475)
        at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)
        at com.dremio.exec.rpc.RpcBus$SameExecutor.execute(RpcBus.java:341)
        at com.dremio.common.SerializedExecutor.execute(SerializedExecutor.java:129)
        at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:375)
        at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:346)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:750)
 
ts=2022-12-29 07:12:15;thread_name=out-of-band-observer;id=ae9;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.dremio.sabot.rpc.user.UserRPCServer.access$000()
        at com.dremio.sabot.rpc.user.UserRPCServer$UserClientConnectionImpl.sendResult(UserRPCServer.java:575)
        at com.dremio.exec.work.protector.UserConnectionResponseHandler.completed(UserConnectionResponseHandler.java:36)
        at com.dremio.service.jobs.LocalJobsService$QueryListener.execCompletion(LocalJobsService.java:1731)
        at com.dremio.exec.planner.observer.OutOfBandQueryObserver$1.run(OutOfBandQueryObserver.java:50)
        at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)
        at com.dremio.context.RequestContext.run(RequestContext.java:96)
        at com.dremio.common.concurrent.ContextMigratingExecutorService.lambda$decorate$3(ContextMigratingExecutorService.java:199)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
 
ts=2022-12-29 07:12:15;thread_name=USER-rpc-event-queue;id=95;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.dremio.sabot.rpc.user.UserRPCServer.getResponseDefaultInstance()
        at com.dremio.exec.rpc.RpcBus$ResponseEvent.run(RpcBus.java:518)
        at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)
        at com.dremio.exec.rpc.RpcBus$SameExecutor.execute(RpcBus.java:341)
        at com.dremio.common.SerializedExecutor.execute(SerializedExecutor.java:129)
        at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:380)
        at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:346)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:750)

说明

dremio 的默认jdbc 请求处理是基于UserServer的包装处理的,实际是UserRPCServer ,内部实现基于了netty进行网络处理
对于想了解实际客户端执行的可以参考,同时建议深入再看看源码

参考资料

sabot/kernel/src/main/java/com/dremio/sabot/rpc/user/UserServer.java
services/base-rpc/src/main/java/com/dremio/exec/rpc/BasicServer.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
sabot/kernel/src/test/java/com/dremio/exec/server/SabotNode.java
sabot/kernel/src/test/java/com/dremio/sabot/rpc/user/TestUserRpcServer.java
sabot/kernel/src/main/java/com/dremio/exec/work/protector/UserWorker.java
sabot/kernel/src/main/java/com/dremio/sabot/rpc/user/WorkIngestor.java
sabot/kernel/src/main/java/com/dremio/exec/work/protector/ForemenWorkManager.java

标签:netty,dremio,java,AbstractChannelHandlerContext,UserServer,io,简单,channel
From: https://www.cnblogs.com/rongfengliang/p/17013321.html

相关文章

  • dremio 参考配置参考
    实际上我以前简单说明过,下边包含一个官方文档完整的,方便参数学习官方提供的参考配置dremio-reference.confincludeclasspath("dremio-reference-ext.conf")......
  • dremio jdbc 客户端简单说明
    dremiojdbc客户端实际上包含了基本上两大类,一个是传统jdbc的,一个是基于apachearrowflightsqljdbc的当前主要说明传统jdbc客户端的,内部上dremio基于了calcite......
  • 华为云CDN加速服务的精细化管理,让加速变得简单起来​
    著名经济学家吴敬琏先生说过:一套完善且良好的管理模式,不但能让企业的工作效率得到更好地提升,还能降低业务生产成本,为企业带来巨大的效能。同样这套理论也适用于CDN加速服务......
  • pycharm—flask创建简单web项目
    flask创建简单web项目1、系统系统版本OSwin10pycharm专业版2022.3.12、引入flask包pipinstallflask3、项目目录展示、代码、浏览器访问fromf......
  • make_pair 简单使用
    code:#include<iostream>usingnamespacestd;intmain(){autopair_1=make_pair(1,"2");cout<<pair_1.first<<endl;cout<<pair_1.second<<endl;autopa......
  • 为什么需要创建型模式以及简单工厂模式(二)
    简单工厂模式又称为静态工厂模式是设计模式的基础模式,本文首先介绍了创建型设计模式的特点,使用场景,然后介绍了简单工厂模式的概念,特点,示例代码,以及简单工......
  • C# 高级语法混合简单示例(接口、索引器)
    程序集如下  usingSystem;usingSystem.Collections;usingSystem.Collections.Generic;usingSystem.IO;usingSystem.Linq;usingSystem.Text;usingSyst......
  • gin+MySQL简单实现数据库查询
    利用gin项目搭建一个简易的后端系统。一个简易的HTTP响应接口首先在go工作区的终端输入这条指令:goget-ugithub.com/gin-gonic/gin将gin项目的相关依赖保存......
  • 简单聊下.NET6 Minimal API的使用方式(转)
    前言#    随着.Net6的发布,微软也改进了对之前ASP.NETCore构建方式,使用了新的MinimalAPI模式。之前默认的方式是需要在Startup中注册IOC和中间件相关,但是在MinimalA......
  • 使用tensorflow创建一个简单的神经网络
    欢迎关注”生信修炼手册”!本文是对tensorflow官方入门教程的学习和翻译,展示了创建一个基础的神经网络模型来解决图像分类问题的过程。具体步骤如下1. 加载数据tensorflow......