具体分析可参考Gitee项目NameServer解析部分 =》
代码地址
什么是NameServer
简易Topic路由注册中心,用于支持Broker的服务注册与发现。类似Dubbo的zookeeper
主要能力
- Broker集群管理:管理Broker集群注册信息,心跳检测broker存活
- 路由信息管理:保存Broker集群路由信息,然后producer、consumer通过nameserver获取路由信息进行投递、消费
NameServer启动流程
步骤
- 创建controller
- 解析命令及配置文件参数
- 创建NamesrvController
- 启动
- 初始化:处理netty相关,创建远程服务与工作线程。开启定时任务,移除不活跃的broker
- 启动:start启动nettyServer
启动入口为NamesrvStartup#main。代码如下:
public class NamesrvStartup {
public static NamesrvController main0(String[] args) {
// ...........其他省略逻辑
// 第一步:创建controller
NamesrvController controller = createNamesrvController(args);
// 第二步:启动
start(controller);
// ...........其他省略逻辑
}
// ...........其他省略逻辑
}
第一步:创建controller
步骤如下:
- 解析命令及配置文件参数
- 含-p、-n、-h、-c等命令参数
- 创建NamesrvController
public class NamesrvStartup {
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 设置版本信息
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// 解析命令行代码
// 维护了一个options list,会把这些参数解析进去(-h情况下会终止执行,返回各个参数的说明)
// eg:-c D:\code\opensource\rocketmq\conf\xxxx.conf
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {// -h 情况下 commandLine == null,终止进程
System.exit(-1);
return null;
}
// nameServer的相关配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// nettyServer的相关配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);// nettyServer的端口固定为9876,其他项目不要跟这个冲突了
// 解析配置命令选项 'c',解析文件,将参数信息解析到properties里面
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 处理-p 参数,用于打印namesrvConfig、nettyServerConfig配置信息(打印后直接exit终止)
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// 将 commandLine 的所有配置设置到 namesrvConfig 中(将properties中属性赋予namesrvConfig中)
/*************************************************************************************
* 1. 先获取到object中的所有setXxx(...)方法
* 2. 得到setXxx(...)中的Xxx
* 3. 首字母小写得到xxx
* 4. 从properties获取xxx属性对应的值,并根据setXxx(...)方法的参数类型进行转换
* 5. 反射调用setXxx(...)方法进行赋值
************************************************************************************/
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 如果不设置 ROCKETMQ_HOME,就会在这里报错
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 加载日志配置...........省略
// 将namesrvConfig, nettyServerConfig传入,其实只是为了创建一个controller实例用于后面启动用
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 将所有配置记录下来,避免忘记 remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
// ...........其他省略逻辑
}
第二步:启动
步骤如下:
- 初始化:处理netty相关,创建远程服务与工作线程。开启定时任务,移除不活跃的broker
- 启动:start启动nettyServer
public class NamesrvStartup {
public static NamesrvController start(final NamesrvController controller) throws Exception {
// 省略其他。。。。。。。。。。。。。。。。。。。。。。。。。。。
/** 初始化,主要做如下两件事
* 1. 处理netty相关:创建远程服务与工作线程
* 2. 开启定时任务:移除不活跃的broker *****************/
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 添加一个关闭钩子
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null;
}));
// 真正执行启动的地方
controller.start();
return controller;
}
}
controller初始化
主要做的事:处理netty相关,创建远程服务与工作线程。开启定时任务,移除不活跃的broker
初始化解析入口代码分析如下
public class NamesrvStartup {
public boolean initialize() {
// 加载kv配置
this.kvConfigManager.load();
// 创建 netty 远程服务
/**********************************************************************************************************
* 1. serverBootstrap:熟悉netty的小伙伴应该对这个很熟悉了,这个就是netty服务端的启动类
* 2. publicExecutor:这里创建了一个名为publicExecutor线程池
* 3. eventLoopGroupBoss与eventLoopGroupSelector线程组:熟悉netty的小伙伴应该对这两个线程很熟悉了,
* -- 这就是netty用来处理连接事件与读写事件的线程了,
* -- eventLoopGroupBoss对应的是netty的boss线程组,eventLoopGroupSelector对应的是worker线程组
*********************************************************************************************************/
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// netty 远程服务线程
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册,就是把 remotingExecutor 注册到 remotingServer
// -- 就是最终会把controller和remotingServer绑定起来
this.registerProcessor();
// 开启定时任务,每隔10s扫描一次broker,移除不活跃的broker
// -- 两分钟内不活跃(lastUpdateTimestamp没更新),就移除channel
this.scheduledExecutorService.scheduleAtFixedRate((Runnable) () ->
NamesrvController.this.routeInfoManager.scanNotActiveBroker(), 5, 10, TimeUnit.SECONDS);
// 开启打印kv配置的定时任务
this.scheduledExecutorService.scheduleAtFixedRate((Runnable) () ->
NamesrvController.this.kvConfigManager.printAllPeriodically(), 1, 10, TimeUnit.MINUTES);
// Tls安全传输 过程省略。。。。。。。。。。。。。。。。。。。。。
return true;
}
}
start启动nettyServer
这一步步骤的核心逻辑就是启动一个nettyServer服务器
public class NamesrvStartup {
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
// 在 NettyRemotingServer#init 中准备的两个线程组 eventLoopGroupBoss、eventLoopGroupSelector
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
// 根据是否可使用epoll来判断用EpollServerSocketChannel?还是NioServerSocketChannel?
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 初始化服务端可连接队列
// - 多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
// 允许重复使用本地地址和端口
.option(ChannelOption.SO_REUSEADDR, true)
// 假设设置为true,在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.option(ChannelOption.SO_KEEPALIVE, false)
// 禁止使用Nagle算法
.childOption(ChannelOption.TCP_NODELAY, true)
// 发送、接受缓冲区大小
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
// 绑定ip与端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
// 处理读写请求的NettyServerHandler,即serverHandler。最终会调用到processMessageReceived(ctx, msg)方法
serverHandler
);
}
});
// 是否开启缓存
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
//周期性处理请求:1000ms执行一次 扫描和过滤废弃的请求
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
// 省略其他。。。。。。。。。。。。。。。。。。。。。
}