首页 > 其他分享 >RocketMQ NameServer启动流程解析

RocketMQ NameServer启动流程解析

时间:2023-04-03 15:02:32浏览次数:54  
标签:namesrvConfig controller 线程 nettyServerConfig new NameServer 解析 public RocketMQ


具体分析可参考Gitee项目NameServer解析部分 =》
代码地址

什么是NameServer

简易Topic路由注册中心,用于支持Broker的服务注册与发现。类似Dubbo的zookeeper

主要能力

  1. Broker集群管理:管理Broker集群注册信息,心跳检测broker存活
  2. 路由信息管理:保存Broker集群路由信息,然后producer、consumer通过nameserver获取路由信息进行投递、消费

NameServer启动流程

步骤

  1. 创建controller
  1. 解析命令及配置文件参数
  2. 创建NamesrvController
  1. 启动
  1. 初始化:处理netty相关,创建远程服务与工作线程。开启定时任务,移除不活跃的broker
  2. 启动:start启动nettyServer

启动入口为NamesrvStartup#main。代码如下:

public class NamesrvStartup {
   public static NamesrvController main0(String[] args) {
      // ...........其他省略逻辑
      // 第一步:创建controller
      NamesrvController controller = createNamesrvController(args);
      // 第二步:启动
      start(controller);
      // ...........其他省略逻辑
   }
   // ...........其他省略逻辑
}

第一步:创建controller

步骤如下:

  1. 解析命令及配置文件参数
  1. 含-p、-n、-h、-c等命令参数
  1. 创建NamesrvController
public class NamesrvStartup {
   public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
      // 设置版本信息
      System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

      // 解析命令行代码
      // 维护了一个options list,会把这些参数解析进去(-h情况下会终止执行,返回各个参数的说明)
      // eg:-c D:\code\opensource\rocketmq\conf\xxxx.conf
      Options options = ServerUtil.buildCommandlineOptions(new Options());
      commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
      if (null == commandLine) {// -h 情况下 commandLine == null,终止进程
         System.exit(-1);
         return null;
      }

      // nameServer的相关配置
      final NamesrvConfig namesrvConfig = new NamesrvConfig();
      // nettyServer的相关配置
      final NettyServerConfig nettyServerConfig = new NettyServerConfig();
      nettyServerConfig.setListenPort(9876);// nettyServer的端口固定为9876,其他项目不要跟这个冲突了

      // 解析配置命令选项 'c',解析文件,将参数信息解析到properties里面
      if (commandLine.hasOption('c')) {
         String file = commandLine.getOptionValue('c');
         if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
         }
      }

      // 处理-p 参数,用于打印namesrvConfig、nettyServerConfig配置信息(打印后直接exit终止)
      if (commandLine.hasOption('p')) {
         InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
         MixAll.printObjectProperties(console, namesrvConfig);
         MixAll.printObjectProperties(console, nettyServerConfig);
         System.exit(0);
      }

      // 将 commandLine 的所有配置设置到 namesrvConfig 中(将properties中属性赋予namesrvConfig中)
      /*************************************************************************************
       * 1. 先获取到object中的所有setXxx(...)方法
       * 2. 得到setXxx(...)中的Xxx
       * 3. 首字母小写得到xxx
       * 4. 从properties获取xxx属性对应的值,并根据setXxx(...)方法的参数类型进行转换
       * 5. 反射调用setXxx(...)方法进行赋值
       ************************************************************************************/
      MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

      // 如果不设置 ROCKETMQ_HOME,就会在这里报错
      if (null == namesrvConfig.getRocketmqHome()) {
         System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
         System.exit(-2);
      }

      // 加载日志配置...........省略
      
      // 将namesrvConfig, nettyServerConfig传入,其实只是为了创建一个controller实例用于后面启动用
      final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

      // 将所有配置记录下来,避免忘记       remember all configs to prevent discard
      controller.getConfiguration().registerConfig(properties);

      return controller;
   }
   // ...........其他省略逻辑
}

第二步:启动

步骤如下:

  1. 初始化:处理netty相关,创建远程服务与工作线程。开启定时任务,移除不活跃的broker
  2. 启动:start启动nettyServer
public class NamesrvStartup {
   public static NamesrvController start(final NamesrvController controller) throws Exception {
       // 省略其他。。。。。。。。。。。。。。。。。。。。。。。。。。。
       /** 初始化,主要做如下两件事
       * 1. 处理netty相关:创建远程服务与工作线程
       * 2. 开启定时任务:移除不活跃的broker *****************/
      boolean initResult = controller.initialize();
      if (!initResult) {
         controller.shutdown();
         System.exit(-3);
      }
      // 添加一个关闭钩子
      Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
         controller.shutdown();
         return null;
      }));
      // 真正执行启动的地方
      controller.start();

      return controller;
   }
}

controller初始化

主要做的事:处理netty相关,创建远程服务与工作线程。开启定时任务,移除不活跃的broker
初始化解析入口代码分析如下

public class NamesrvStartup {
   public boolean initialize() {
      // 加载kv配置
      this.kvConfigManager.load();
      // 创建 netty 远程服务
      /**********************************************************************************************************
       * 1. serverBootstrap:熟悉netty的小伙伴应该对这个很熟悉了,这个就是netty服务端的启动类
       * 2. publicExecutor:这里创建了一个名为publicExecutor线程池
       * 3. eventLoopGroupBoss与eventLoopGroupSelector线程组:熟悉netty的小伙伴应该对这两个线程很熟悉了,
       * -- 这就是netty用来处理连接事件与读写事件的线程了,
       * -- eventLoopGroupBoss对应的是netty的boss线程组,eventLoopGroupSelector对应的是worker线程组
       *********************************************************************************************************/
      this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
      // netty 远程服务线程
      this.remotingExecutor =
              Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
      // 注册,就是把 remotingExecutor 注册到 remotingServer
      // -- 就是最终会把controller和remotingServer绑定起来
      this.registerProcessor();
      // 开启定时任务,每隔10s扫描一次broker,移除不活跃的broker
      // -- 两分钟内不活跃(lastUpdateTimestamp没更新),就移除channel
      this.scheduledExecutorService.scheduleAtFixedRate((Runnable) () -> 
              NamesrvController.this.routeInfoManager.scanNotActiveBroker(), 5, 10, TimeUnit.SECONDS);
      // 开启打印kv配置的定时任务
      this.scheduledExecutorService.scheduleAtFixedRate((Runnable) () ->
              NamesrvController.this.kvConfigManager.printAllPeriodically(), 1, 10, TimeUnit.MINUTES);
      // Tls安全传输 过程省略。。。。。。。。。。。。。。。。。。。。。

      return true;
   }
}

start启动nettyServer

这一步步骤的核心逻辑就是启动一个nettyServer服务器

public class NamesrvStartup {
   @Override
   public void start() {
      this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
              nettyServerConfig.getServerWorkerThreads(),
              new ThreadFactory() {
                 private AtomicInteger threadIndex = new AtomicInteger(0);

                 @Override
                 public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                 }
              });
      prepareSharableHandlers();

      ServerBootstrap childHandler =
              // 在 NettyRemotingServer#init 中准备的两个线程组 eventLoopGroupBoss、eventLoopGroupSelector
              this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                      // 根据是否可使用epoll来判断用EpollServerSocketChannel?还是NioServerSocketChannel?
                      .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                      // 初始化服务端可连接队列
                      // - 多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                      .option(ChannelOption.SO_BACKLOG, 1024)
                      // 允许重复使用本地地址和端口
                      .option(ChannelOption.SO_REUSEADDR, true)
                      // 假设设置为true,在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                      .option(ChannelOption.SO_KEEPALIVE, false)
                      // 禁止使用Nagle算法
                      .childOption(ChannelOption.TCP_NODELAY, true)
                      // 发送、接受缓冲区大小
                      .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                      .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                      // 绑定ip与端口
                      .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                      .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                    .addLast(defaultEventExecutorGroup,
                                            encoder,
                                            new NettyDecoder(),
                                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                            connectionManageHandler,
                                            // 处理读写请求的NettyServerHandler,即serverHandler。最终会调用到processMessageReceived(ctx, msg)方法
                                            serverHandler
                                    );
                         }
                      });
      // 是否开启缓存
      if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
         childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
      }

      try {
         ChannelFuture sync = this.serverBootstrap.bind().sync();
         InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
         this.port = addr.getPort();
      } catch (InterruptedException e1) {
         throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
      }

      if (this.channelEventListener != null) {
         this.nettyEventExecutor.start();
      }

      //周期性处理请求:1000ms执行一次   扫描和过滤废弃的请求
      this.timer.scheduleAtFixedRate(new TimerTask() {
         @Override
         public void run() {
            try {
               NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
               log.error("scanResponseTable exception", e);
            }
         }
      }, 1000 * 3, 1000);
   }
   // 省略其他。。。。。。。。。。。。。。。。。。。。。
}


标签:namesrvConfig,controller,线程,nettyServerConfig,new,NameServer,解析,public,RocketMQ
From: https://blog.51cto.com/u_13854513/6166439

相关文章

  • 实现一个函数用来解析 URL 的 querystring
    实现如下效果consturl="https://xxxx.com?a=3&b=4&c=5&name=1+1=2";//解析后得到qs如下constqs={a:3,b:4,c:5,name:'1+1=2'};纯碎使用 javascript 完成解析函数,而不利用浏览器DOM特性API,代码如下所示,细节在注释中体现functionparse(url......
  • 详细解析Java异步线程处理队列任务工具类以及实战
    场景待入快速理解小场景描述:【一群人】来到【一个大厅】办理业务,大厅中有【多个窗口】给我们办理业务。每个人都有自己要办事情,处理过程需要消耗时间。大厅根据人群多少,开始窗口梳理。如果把“一群人”理解成一群待处理的n个【任务】,把这群人排成一个长队就形成了一个【任......
  • Mybatis配置文件解析(转载)
    流程图demo案例还是从案例开始。publicstaticvoidmain(String[]args){Stringresource="mybatis-config.xml";InputStreaminputStream=null;SqlSessionsqlSession=null;try{inputStream=Resources.getResourceAsStream(resourc......
  • 软考高项第4版教程-差异点解析来啦(第5章下)!
    第5章信息系统工程,我拆分成2篇来解析第4版教程的差异重点,上篇解析了“软件工程”和“数据工程”知识块,这次带来下篇:“系统集成”和“安全工程”知识块。“信息系统工程”文字版“系统集成”讲了4部分知识点,分别是网络集成、数据集成、软件集成和应用集成。1.系统集成4个基本原则在......
  • Thrift 格式解析
    Thrift格式解析https://www.cnblogs.com/Forever-Kenlen-Ja/p/9649724.html常用数据格式包括CSVJSONXML,这些格式有缺点:CSV没有指定数据类型,如可能将数字开头的字符串无认为数字使用文本存储会浪费空间JSONXML注重可读,提高程序员效率,但数据存储传输效率不高,尤其大数......
  • 时间日期解析配置
    @ConfigurationpublicclassLocalDateTimeConfig{/**序列化内容*LocalDateTime->String*服务端返回给客户端内容**/@BeanpublicLocalDateTimeSerializerlocalDateTimeSerializer(){returnnewLocalDateTimeSeria......
  • 提升集群吞吐量与稳定性的秘诀: Dubbo 自适应负载均衡与限流策略实现解析
    作者:刘泉禄整体介绍本文所说的“柔性服务”主要是指consumer端的负载均衡和provider端的限流两个功能。在之前的Dubbo版本中,负载均衡部分更多的考虑的是公平性原则,即consumer端尽可能平等的从provider中作出选择,在某些情况下表现并不够理想。而限流部分只提供了静态的限......
  • 提升集群吞吐量与稳定性的秘诀: Dubbo 自适应负载均衡与限流策略实现解析
    作者:刘泉禄整体介绍本文所说的“柔性服务”主要是指consumer端的负载均衡和provider端的限流两个功能。在之前的Dubbo版本中,负载均衡部分更多的考虑的是公平性原则,即consumer端尽可能平等的从provider中作出选择,在某些情况下表现并不够理想。而限流部分只提供了静态......
  • 计算机网络实验 实验5 运输层和应用层协议解析
    实验5运输层和应用层协议解析一、实验目的  本实验通过运用Wireshark对网络活动进行分析,观察TCP协议报文,分析通信时序,理解TCP的工作过程,掌握TCP工作原理与实现;学会运用Wireshark分析TCP连接管理、流量控制和拥塞控制的过程,发现TCP的性能问题。二、实验内容任务1:TCP正常......
  • rocketmq-spring : 实战与源码解析一网打尽
    RocketMQ是大家耳熟能详的消息队列,开源项目rocketmq-spring可以帮助开发者在SpringBoot项目中快速整合RocketMQ。这篇文章会介绍SpringBoot项目使用rocketmq-springSDK实现消息收发的操作流程,同时笔者会从开发者的角度解读SDK的设计逻辑。1SDK简介项目地址:......