有关Namesrv的概念及功能,详见RocketMQ(三):架构设计中技术架构组成namesrv,这里不再赘述。
RocketMQ中Namesrv启动入口:org.apache.rocketmq.namesrv.NamesrvStartup。
Namesrv启动,NamesrvStartup#main0() 核心伪代码:
1 public static NamesrvController main0(String[] args) { 2 try { 3 // 创建NamesrvController对象 4 NamesrvController controller = createNamesrvController(args); 5 // 启动Namesrv服务端 6 start(controller); 7 return controller; 8 } catch (Throwable e) { 9 e.printStackTrace(); 10 System.exit(-1); 11 } 12 return null; 13 }
启动Namesrv流程包含两个核心的步骤:
1、NamesrvController对象的创建;
2、netty服务端启动。其中NamesrvController对象的创建是核心中的核心,会加载配置、初始化必要对象、设置默认的属性、初始化定时任务监听等。
1、NamesrvController对象的创建
1 public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { 2 // 设置版本 3 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); 4 5 // 命令行选项解析 6 Options options = ServerUtil.buildCommandlineOptions(new Options()); 7 // 解析 `mqnamesrv` 命令行参数,用于启动namesrv的参数 8 commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); 9 // 命令行无 `mqnamesrv` 参数,退出 10 if (null == commandLine) { 11 System.exit(-1); 12 return null; 13 } 14 15 // 创建NamesrvConfig 16 final NamesrvConfig namesrvConfig = new NamesrvConfig(); 17 // 创建NettyServerConfig 18 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); 19 // 设置启动监听端口 20 nettyServerConfig.setListenPort(9876); 21 // 解析命令行参数 -c 22 if (commandLine.hasOption('c')) { 23 String file = commandLine.getOptionValue('c'); 24 if (file != null) { 25 // 加载资源文件 26 InputStream in = new BufferedInputStream(new FileInputStream(file)); 27 properties = new Properties(); 28 // 将资源文件解析成Properties对象 29 properties.load(in); 30 // 填充namesrvConfig属性 31 MixAll.properties2Object(properties, namesrvConfig); 32 // 填充nettyServerConfig属性 33 MixAll.properties2Object(properties, nettyServerConfig); 34 35 // 设置MQ配置文件存储路径 36 namesrvConfig.setConfigStorePath(file); 37 38 System.out.printf("load config properties file OK, %s%n", file); 39 // 关闭流 40 in.close(); 41 } 42 } 43 44 // 解析命令行参数 -p 45 if (commandLine.hasOption('p')) { 46 InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); 47 MixAll.printObjectProperties(console, namesrvConfig); 48 MixAll.printObjectProperties(console, nettyServerConfig); 49 System.exit(0); 50 } 51 52 // 启动参数填充到namesrvConfig,nettyServerConfig的属性中 53 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); 54 55 // RocketMQ主目录为空,退出 56 if (null == namesrvConfig.getRocketmqHome()) { 57 System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); 58 System.exit(-2); 59 } 60 61 // 日志输出配置 62 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); 63 JoranConfigurator configurator = new JoranConfigurator(); 64 configurator.setContext(lc); 65 lc.reset(); 66 configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); 67 68 log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); 69 70 MixAll.printObjectProperties(log, namesrvConfig); 71 MixAll.printObjectProperties(log, nettyServerConfig); 72 73 // 创建NamesrvController 74 final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); 75 76 // remember all configs to prevent discard 77 // 将当前解析启动参数的配置信息设置进全局配置configuration的allConfigs属性中,以防丢失 78 controller.getConfiguration().registerConfig(properties); 79 80 return controller; 81 }
1、解析命令行参数mqnamesrv,若为空,namesrv启动流程终止
2、NamesrvController构造函数的参数准备,创建NamesrvConfig、NettyServerConfig对象
3、解析命令行参数-c,并将解析到的属性信息填充到NamesrvConfig、NettyServerConfig对象中
4、日志输出配置
5、初始化NamesrvController
6、将当前启动参数配置信息设置进Namesrv全局配置configuration对象中,以防丢失。
其中2、5为核心步骤,先看看NamesrvConfig、NettyServerConfig对象详情,再分析NamesrvController初始化流程。
1.1、NamesrvController构造函数的参数
1.1.1、NamesrvConfig - namesrv配置
NamesrvConfig配置中包含namesrv的基本配置信息,如rocketmq主目录、配置文件路径等信息,属性详情如下:
1 // 默认rocketmq的主目录 2 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); 3 // 默认kv属性配置文件路径 4 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; 5 // 默认namesrv配置文件存储路径 6 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; 7 private String productEnvName = "center"; 8 // 集群测试标识,默认为false 9 private boolean clusterTest = false; 10 // 是否支持顺序消息标识,默认为false 11 private boolean orderMessageEnable = false;
1.1.2、NettyServerConfig - netty服务配置
NettyServerConfig配置中包含Netty服务的配置信息,属性详情如下:
1 // 默认监听端口 2 private int listenPort = 8888; 3 // 服务端工作线程数 4 private int serverWorkerThreads = 8; 5 // 回调处理线程数 6 private int serverCallbackExecutorThreads = 0; 7 // 轮询线程数 8 private int serverSelectorThreads = 3; 9 // 服务端单向请求信号量最大值 10 private int serverOnewaySemaphoreValue = 256; 11 // 服务端异步请求信号量最大值 12 private int serverAsyncSemaphoreValue = 64; 13 // 服务端通道最大空闲时间 14 private int serverChannelMaxIdleTimeSeconds = 120; 15 // socket发送缓冲大小, 默认65535 16 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; 17 // socket接收缓冲大小, 默认65535 18 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
1.2、创建NamesrvController对象
NamesrvController构造函数:
1 public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { 2 this.namesrvConfig = namesrvConfig; 3 this.nettyServerConfig = nettyServerConfig; 4 // 初始化kv配置管理器 5 this.kvConfigManager = new KVConfigManager(this); 6 // 初始化路由管理器 7 this.routeInfoManager = new RouteInfoManager(); 8 // 初始化broker监测服务 9 this.brokerHousekeepingService = new BrokerHousekeepingService(this); 10 // configuration封装namesrvConfig、nettyServerConfig 11 this.configuration = new Configuration( 12 log, this.namesrvConfig, this.nettyServerConfig 13 ); 14 // 通过反射,将namesrvConfig中的配置存储路径,设置到configuration的storePathField属性中 15 this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); 16 }
1.2.1、kv配置管理器构造方法
初始化NamesrvController属性,方便后续调用加载方法 - load(),从NamesrvController中获取KV配置文件路径属性。
private final NamesrvController namesrvController; public KVConfigManager(NamesrvController namesrvController) { this.namesrvController = namesrvController; }
1.2.2、路由信息管理器构造方法
路由信息管理器构造器,主要是初始化相关属性。
1 /** 2 * 初始化相关属性 3 */ 4 public RouteInfoManager() { 5 // topic与queue映射关系 6 this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); 7 // broker名称与broker相关数据(集群名称、broker等)映射关系 8 this.brokerAddrTable = new HashMap<String, BrokerData>(128); 9 // 集群名称与broker名称映射关系 10 this.clusterAddrTable = new HashMap<String, Set<String>>(32); 11 // broker地址与broker活动信息映射关系 12 this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); 13 // broker与对应消息过滤服务映射关系 14 this.filterServerTable = new HashMap<String, List<String>>(256); 15 }
路由信息管理器,主要namesrv用于对broker、topic的管理,主要方法如下:
RouteInfoManager的lock属性是读写锁ReadWriteLock,读写互斥,对于读多写少的场景,可极大的提高性能。
// 读写锁 private final ReadWriteLock lock = new ReentrantReadWriteLock();
以下针对Topic的删除与创建,看看RocketMQ在路由信息中对读写锁的应用:
删除主题:
获取主题:
1.2.3、broker监测服务
初始化NamesrvController属性,方便后续监听broker相关网络连接事件,从NamesrvController中获取路由信息管理器RouteInfoManager属性。
private final NamesrvController namesrvController; public BrokerHousekeepingService(NamesrvController namesrvController) { this.namesrvController = namesrvController; }
BrokerHousekeepingService继承自ChannelEventListener接口, 通道事件监听器ChannelEventListener定义了netty网络连接相关事件处理的方法,便于namesrv处理broker的注册、剔除。
1 /** 2 * 通道事件监听器 3 */ 4 public interface ChannelEventListener { 5 // 通道连接 6 void onChannelConnect(final String remoteAddr, final Channel channel); 7 // 通道关闭 8 void onChannelClose(final String remoteAddr, final Channel channel); 9 // 通道异常 10 void onChannelException(final String remoteAddr, final Channel channel); 11 // 通道空闲 12 void onChannelIdle(final String remoteAddr, final Channel channel); 13 }
1.2.4、namesrv全局配置Configuration
Configuration构造函数如下,将配置对象中的配置信息注册到Configuration的allConfigs属性中。1 private Properties allConfigs = new Properties(); 2 3 public Configuration(InternalLogger log, Object... configObjects) { 4 this.log = log; 5 if (configObjects == null || configObjects.length == 0) { 6 return; 7 } 8 // 遍历配置对象 9 for (Object configObject : configObjects) { 10 registerConfig(configObject); 11 } 12 }
1.3、小结
创建NamesrvController对象时,优先加载配置信息。创建namesrv核心配置类、nettyServer核心配置类用于初始化NamesrvController对象。
在NamesrvController的构造函数中,会初始化KV配置管理器、路由信息管理器、broker监测服务、初始化configuration用于封装namesrvConfig, nettyServerConfig配置信息。
NamesrvController中有namesrv整个流程中所有的配置、路由信息管理器及注册、剔除功能。
2、Namesrv服务端启动
namesrv启动,NamesrvStartup#start() 核心代码:
1 /** 2 * 启动namesrv服务 3 */ 4 public static NamesrvController start(final NamesrvController controller) throws Exception { 5 // Namesrv控制器为空,抛出异常 6 if (null == controller) { 7 throw new IllegalArgumentException("NamesrvController is null"); 8 } 9 10 // 初始化Namesrv控制器 11 boolean initResult = controller.initialize(); 12 // 初始化失败,结束启动流程 13 if (!initResult) { 14 controller.shutdown(); 15 System.exit(-3); 16 } 17 18 // 钩子方法 19 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { 20 @Override 21 public Void call() throws Exception { 22 controller.shutdown(); 23 return null; 24 } 25 })); 26 27 // 启动netty服务端 28 controller.start(); 29 30 return controller; 31 }
2.1、初始化NamesrvController
1 public boolean initialize() { 2 // 加载KV配置 3 this.kvConfigManager.load(); 4 // 创建NettyServer网络处理对象 5 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); 6 // 初始化请求处理线程池 7 this.remotingExecutor = 8 Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); 9 // 注册默认的请求处理器 10 this.registerProcessor(); 11 // 开启定时任务,每隔10s扫描一次不活动Broker 12 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 13 @Override 14 public void run() { 15 NamesrvController.this.routeInfoManager.scanNotActiveBroker(); 16 } 17 }, 5, 10, TimeUnit.SECONDS); 18 // 开启定时任务:每隔10min打印一次KV配置 19 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 20 @Override 21 public void run() { 22 NamesrvController.this.kvConfigManager.printAllPeriodically(); 23 } 24 }, 1, 10, TimeUnit.MINUTES); 25 // ... 26 return true; 27 }
2.1.1、加载KV配置
解析KV配置文件,并将配置内容添加进KVConfigManager的配置表属性configTable中。
2.1.2、创建NettyServer网络处理对象remotingServer
初始化netty服务端相关设置
2.1.3、初始化请求处理线程池remotingExecutor
创建处理请求的线城池,指定工作线程的数量及线程池前缀。
2.1.4、注册默认的请求处理器
注册默认的请求处理器DefaultRequestProcessor,填充NettyRemotingAbstract的defaultRequestProcessor属性。
2.1.5、开启定时任务处理扫描不活动的broker,打印KV配置信息
扫描并移除不活动的broker, RouteInfoManager#scanNotActiveBroker() 核心代码:
1 // 心跳监测超时事件 2 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; 3 4 // 扫描不活动的broker并剔除 5 public void scanNotActiveBroker() { 6 // 获取broker活动表 7 Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); 8 // 遍历活动表中活动的broker信息 9 while (it.hasNext()) { 10 Entry<String, BrokerLiveInfo> next = it.next(); 11 // 获取活动broker的最后更新时间戳 12 long last = next.getValue().getLastUpdateTimestamp(); 13 // broker最后时间戳2min内未被更新 14 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { 15 // 关闭broker的通道 16 RemotingUtil.closeChannel(next.getValue().getChannel()); 17 // 剔除broker 18 it.remove(); 19 log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); 20 // 通道销毁监听事件注册 21 this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); 22 } 23 } 24 }
2.1.6、小结
在初始化namesrv的操作:
1、加载KV配置信息;
2、初始化NettyServer网络处理对象,设置工作线程等;
3、初始化请求处理线程池;
4、创建并注册请求处理器;
5、设置定时任务,监听不活动的broker,并剔除不活动的broker;打印KV配置信息。
2.2、启动netty服务
netty服务端启动,NamesrvController#start() 核心伪代码:
1 public void start() throws Exception { 2 // netty通信服务启动 3 this.remotingServer.start(); 4 // 文件监控服务 5 if (this.fileWatchService != null) { 6 this.fileWatchService.start(); 7 } 8 }
3、NameSrv启动核心流程
namesrv启动核心流程图如下:
至此NameSrv启动的源码分析完成。
标签:初始化,namesrv,NameSrv,broker,private,源码,new,NamesrvController,RocketMQ From: https://www.cnblogs.com/RunningSnails/p/17412149.html