首页 > 其他分享 >【RocketMQ】路由中心 NameServer

【RocketMQ】路由中心 NameServer

时间:2024-10-26 11:47:36浏览次数:1  
标签:Broker private RocketMQ new NameServer final 路由

1  前言

上节我们准备了源码以及环境的运行,其中我们启动的时候,会首先启动 NameServer,那么这节我们先看下组件 NameServer,看看它主要是干什么的,在整个生产消费的链路中充当了什么角色,发挥着什么作用。

2  NameServer

RocketMQ路由管理、 服务注册及服务发现的机制, NameServer是整个 RocketMQ 的“大脑” 。分布式服务 SOA架 构体系中会有服务注册中心,分布式服务SOA的注册中心主要提供服务调用的解析服务, 指引服务调用方(消费者)找到“远方”的服务提供者,完成网络通信,那么RocketMQ 的 路由中心存储的是什么数据呢?作为一款高性能的消息中间件,如何避免NameServer的单点故障,提供高可用性,我们往下看。

2.1  NameServer 架构设计

消息中间件的设计思路一般基于主题的订阅发布机制消息生产者(Producer)发 送某一主题的消息到消息服务器,消息服务器负责该消息的持久化存储,消息消费者 (Consumer)订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费 者(PUSH模式)或者消息消费者主动向消息服务器拉取消息(PULL模式),从而实现消息 生产者与消息消费者解调。 为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。 那消息生产者如何知道消息要发往哪台消息服务器呢?如果某一台消息服务器若机了,那么生产者如何在不重启服务的情况下感知呢?NameServer 就是为了解决上述问题而设计的。

RocketMQ 的逻辑部署图如下:

Broker 消息服务器在启动时向所有NameServer 注册(这个我们看过是通过线程池 + 门闩锁CountDownLauch实现),消息生产者(Producer)在发送消息之前先从NameServer 获取 Broker 服务器地址列表,然后根据负载算法从列表中选择一 台消息服务器进行消息发送(也就是说生产者在发送的时候就选择了某个Broker)。 NameServer与每台 Broker服务器保持长连接,并间隔 5s 检测Broker 是否存活,如果检测到 Broker宕机, 则从路由注册表中将其移除。 但是路由变化不会马上通知消息生产者,为什么要这样设计呢?这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性,那么发送失败了怎么办,这个我们在看消息发送的时候再细看(这里埋个引子发送失败会重试)。

NameServer 本身的高可用可通过部署多台 NameServer服务器来实现,但彼此之间 互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer设计的一个亮点, RocketMQ NameServer 设计追求简单高效。

2.2  NameServer 启动流程

从源码的角度窥探一下Names巳rver启动流程,重点关注NameServer相关启动参数。

NameServer 启动类:org.apache.rocketmq.namesrv.NamesrvStartup。

// main 启动入口
public static void main(String[] args) {
    main0(args);
    controllerManagerMain();
}

我们看看 main0:

public static NamesrvController main0(String[] args) {
    try {
        // 解析命令行参数和配置文件
        parseCommandlineAndConfigFile(args);
        // 创建并启动 NameServer控制器
        NamesrvController controller = createAndStartNamesrvController();
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        // 异常直接退出
        System.exit(-1);
    }
    return null;
}

两个步骤:

(1)解析命令行以及配置文件(也就是启动前的配置初始化工作)

(2)创建并启动 NamesrcController(NameServerController 实例为NameSerer 核心控制器,别跟 SpringMVC 里的 Controller 搞混= =)

我们简单看下解析配置:

/**
 * 解析命令行参数和配置文件
 * 该方法首先设置Remoting框架的版本属性,然后解析命令行参数,接着加载配置文件(如果有提供)
 * 最后,根据命令行参数和配置文件初始化相关的配置对象
 *
 * @param args 命令行参数数组
 * @throws Exception 如果解析命令行参数或加载配置文件时发生错误,则抛出异常
 */
public static void parseCommandlineAndConfigFile(String[] args) throws Exception {
    // 设置Remoting框架的版本属性 rocketmq.remoting.version
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    // 构建命令行参数选项
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    // 解析命令行参数
    CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());
    // 如果解析失败,退出程序
    if (null == commandLine) {
        System.exit(-1);
        return;
    }
    // 初始化配置对象
    namesrvConfig = new NamesrvConfig();
    nettyServerConfig = new NettyServerConfig();
    nettyClientConfig = new NettyClientConfig();
    // 设置Netty服务器的监听端口  也就是默认的 NameServer 端口是 9876
    nettyServerConfig.setListenPort(9876);
    // 如果命令行参数中包含配置文件选项,加载配置文件
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            // 读取并解析配置文件
            InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
            properties = new Properties();
            properties.load(in);
            // 将配置属性设置到配置对象中
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
            MixAll.properties2Object(properties, nettyClientConfig);
            // 如果配置中启用了控制器功能,初始化并配置控制器
            if (namesrvConfig.isEnableControllerInNamesrv()) {
                controllerConfig = new ControllerConfig();
                MixAll.properties2Object(properties, controllerConfig);
            }
            // 设置配置文件路径
            namesrvConfig.setConfigStorePath(file);
            // 确认配置文件加载成功并关闭输入流
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
    // 将命令行参数设置到配置对象中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    // 如果命令行参数中包含打印配置信息的选项,打印配置信息并退出程序
    if (commandLine.hasOption('p')) {
        MixAll.printObjectProperties(logConsole, namesrvConfig);
        MixAll.printObjectProperties(logConsole, nettyServerConfig);
        MixAll.printObjectProperties(logConsole, nettyClientConfig);
        if (namesrvConfig.isEnableControllerInNamesrv()) {
            MixAll.printObjectProperties(logConsole, controllerConfig);
        }
        System.exit(0);
    }
    // 如果未设置RocketMQ的安装路径,提示用户设置环境变量并退出程序
    // 也就是环境变量中要有 ROCKETMQ_HOME 这就是我们上节刚开始启动 NameServer 报错的原因位置
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
    // 打印最终的配置信息
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
}

从代码我们可以知道先创建 NamesrvConfig ( NameServer业务参数)、 NettyServerConfig ( NameServer 网络参数), 然后在解析启动时把指定的配置文件或启动命令中的选项 值,填充到namesrvConfig、nettyServerConfig 对象。

我们看看 NamesrvConfig  属性:

// rocketmq 主目录,可以通过-Drocketmq.home.dir=path 或通过设置环境变量ROCKETMQ_HOME来配置RocketMQ 的主目录
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// NameServer 存储 KV 配置属性的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// NameServer 默认配置文件路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
// 是否支持顺序消息,默认是不支持
private boolean orderMessageEnable = false;
private boolean returnOrderTopicConfigToBroker = true;
/**
 * Indicates the nums of thread to handle client requests, like GET_ROUTEINTO_BY_TOPIC.
 */
private int clientRequestThreadPoolNums = 8;
/**
 * Indicates the nums of thread to handle broker or operation requests, like REGISTER_BROKER.
 */
private int defaultThreadPoolNums = 16;
/**
 * Indicates the capacity of queue to hold client requests.
 */
private int clientRequestThreadPoolQueueCapacity = 50000;
/**
 * Indicates the capacity of queue to hold broker or operation requests.
 */
private int defaultThreadPoolQueueCapacity = 10000;
/**
 * Interval of periodic scanning for non-active broker; 扫描 Broker 是否存活的间隔时间 5 秒
 */
private long scanNotActiveBrokerInterval = 5 * 1000;
private int unRegisterBrokerQueueCapacity = 3000;
/**
 * Support acting master or not.
 *
 * The slave can be an acting master when master node is down to support following operations:
 * 1. support lock/unlock message queue operation.
 * 2. support searchOffset, query maxOffset/minOffset operation.
 * 3. support query earliest msg store time.
 */
private boolean supportActingMaster = false;
private volatile boolean enableAllTopicList = true;
private volatile boolean enableTopicList = true;
private volatile boolean notifyMinBrokerIdChanged = false;
/**
 * Is startup the controller in this name-srv
 */
private boolean enableControllerInNamesrv = false;
private volatile boolean needWaitForService = false;
private int waitSecondsForService = 45;
/**
 * If enable this flag, the topics that don't exist in broker registration payload will be deleted from name server.
 *
 * WARNING:
 * 1. Enable this flag and "enableSingleTopicRegister" of broker config meanwhile to avoid losing topic route info unexpectedly.
 * 2. This flag does not support static topic currently.
 */
private boolean deleteTopicWithBrokerRegistration = false;

再看看 NettyServerConfig  属性:

// NameServer 默认绑定地址
private String bindAddress = "0.0.0.0";
// NameServer 监昕端口,该值默认会被初始化为 9876
private int listenPort = 0;
// Netty 业务线程池线程个数
private int serverWorkerThreads = 8;
// Netty public 任务线程池线程个数, Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池, 则由 public线程池执行
private int serverCallbackExecutorThreads = 0;
//  IO 线程池线程个数,主要是 NameServer、Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包, 然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方
private int serverSelectorThreads = 3;
// send oneway 消息请求井发度(Broker 端参数)
private int serverOnewaySemaphoreValue = 256;
// 异步消息发送最大并发度(Broker 端参数)
private int serverAsyncSemaphoreValue = 64;
// 网络连接最大空闲时间,默认 120s。如果连接空闲时间超过该参数设置的值,连接将被关闭
private int serverChannelMaxIdleTimeSeconds = 120;
// 网络 socket 发送缓存区大小, 默认 64k
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 网络 socket 接收缓存区大小,默认 64k
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
/**
 * 高水位标记,当Channel的待写入数据量达到此值时,Netty会自动关闭Channel的写操作,
 * 需要用户手动调用Channel的flush方法来刷新缓冲区以继续写入数据。
 * 这有助于防止应用程序过度缓冲数据,导致内存使用过多。
 */
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
/**
 * 低水位标记,当Channel的待写入数据量减少到此值时,Netty会自动重新打开Channel的写操作,
 * 允许数据再次被写入缓冲区。
 * 这有助于在数据量减少到一个合理水平时恢复写操作,保证数据传输的流畅。
 */
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
// 同时处理的连接请求的最大数量 默认1024
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
// ByteBuffer 是否开启缓存, 建议开启
private boolean serverPooledByteBufAllocatorEnable = true;
// 是否启用 Epoll IO 模型, Linux 环境建议开启
private boolean useEpollNativeSelector = false;

然后我们看看创建以及启动 NamesrvController:

public static NamesrvController createAndStartNamesrvController() throws Exception {
    // 创建 NameServer 控制器实例
    NamesrvController controller = createNamesrvController();
    // 启动 NameServer控制器
    start(controller);
    // 获取 Netty服务器配置
    NettyServerConfig serverConfig = controller.getNettyServerConfig();
    // 格式化输出
    String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());
    // 记录启动日志
    log.info(tip);
    // 控制台输出启动信息
    System.out.printf("%s%n", tip);
    // 返回创建并启动的 NameServer控制器实例
    return controller;
}

格式化输出的信息,就是我们上节启动 NameServer 后输出的信息:

我们这里主要看下启动 start:

public static NamesrvController start(final NamesrvController controller) throws Exception {
    // 检查传入的 NamesrvController 实例是否为null,如果是,则抛出异常
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    // 初始化 NamesrvController,如果初始化失败,则关闭控制器并退出程序
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
    // 注册关闭钩子,确保在程序退出时优雅地关闭 NamesrvController
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
        controller.shutdown();
        return null;
    }));
    // 启动NamesrvController
    controller.start();
    // 返回启动后的 NamesrvController 实例
    return controller;
}

可以看到启动主要做了三件事情:

(1)初始化

(2)注册关闭钩子函数

(3)启动

先看下 NamesrvController 的初始化方法 initialize:

public boolean initialize() {
    // 加载系统配置,这是系统运行所必需的配置信息
    loadConfig();
    // 初始化网络组件,为网络通信做准备
    initiateNetworkComponents();
    // 初始化线程池,用于管理和执行系统中的各种任务
    initiateThreadExecutors();
    // 注册处理器,这些处理器将处理系统中的各种请求和任务
    registerProcessor();
    // 启动定时服务,比如检查 Broker 的存活状态
    startScheduleService();
    // 初始化SSL上下文,为安全通信做准备
    initiateSslContext();
    // 初始化RPC钩子,用于在远程过程调用时执行特定操作
    initiateRpcHooks();
    // 返回
    return true;
}

我们这里看下启动的调度任务 startScheduleService:

private void startScheduleService() {
    // 定时扫描不活跃的Broker并移除 默认每隔 5秒
    this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
        5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
    // 周期性地打印所有配置信息 默认每隔 10分钟
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
        1, 10, TimeUnit.MINUTES);
    // 定期打印水位信息 队列大小等信息 默认每隔 1秒
    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
        try {
            NamesrvController.this.printWaterMark();
        } catch (Throwable e) {
            LOGGER.error("printWaterMark error.", e);
        }
    }, 10, 1, TimeUnit.SECONDS);
}

最后我们看下 NamesrvController 的启动 start:

public void start() throws Exception {
    // 启动服务端
    this.remotingServer.start();
    // 这里不会走 因为看前边端口会被设置为 9876 
    if (0 == nettyServerConfig.getListenPort()) {
        nettyServerConfig.setListenPort(this.remotingServer.localListenPort());
    }
    this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()
        + ":" + nettyServerConfig.getListenPort()));
    // 启动客户端
    this.remotingClient.start();
    // 如果文件监视服务已初始化,则启动该服务
    // initialize 初始化的时候, initiateSslContext 初始化 ssl的时候,会初始化 fileWatchService 
    // 监听的文件列表是 {TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath}
    // tls.server.certPath tls.server.keyPath tls.server.trustCertPath
    // 都是 ssl 认证相关的 也就是当你开启了 ssl 会监听证书的变化
    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
    // 启动路由信息管理器
    this.routeInfoManager.start();
}

最后我们看下服务端的启动 this.remotingServer.start():

public void start() {
    // 创建一个DefaultEventExecutorGroup实例,用于处理连接和请求
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactoryImpl("NettyServerCodecThread_"));
    // 准备共享的处理器,这些处理器可以在多个通道中共享
    prepareSharableHandlers();
    // 配置服务器的引导程序
    serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .option(ChannelOption.SO_REUSEADDR, true)
        .childOption(ChannelOption.SO_KEEPALIVE, false)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .localAddress(new InetSocketAddress(this.nettyServerConfig.getBindAddress(),
            this.nettyServerConfig.getListenPort()))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                configChannel(ch);
            }
        });
    // 添加自定义配置,如果有的话
    addCustomConfig(serverBootstrap);
    try {
        // 尝试绑定服务器到指定的端口,并等待操作完成  这里就是绑定我们的 9876端口
        ChannelFuture sync = serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        // 如果配置的端口为0,则使用分配的端口
        if (0 == nettyServerConfig.getListenPort()) {
            this.nettyServerConfig.setListenPort(addr.getPort());
        }
        // 记录服务器启动信息
        log.info("RemotingServer started, listening {}:{}", this.nettyServerConfig.getBindAddress(),
            this.nettyServerConfig.getListenPort());
        // 将服务器实例添加到服务器表中
        this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);
    } catch (Exception e) {
        // 如果绑定失败,抛出一个异常
        throw new IllegalStateException(String.format("Failed to bind to %s:%d", nettyServerConfig.getBindAddress(),
            nettyServerConfig.getListenPort()), e);
    }
    // 如果存在通道事件监听器,则启动Netty事件执行器
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
    // 创建并启动一个定时任务,定期扫描响应表
    TimerTask timerScanResponseTable = new TimerTask() {
        @Override
        public void run(Timeout timeout) {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            } finally {
                timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);
            }
        }
    };
    this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);
    // 定期执行任务,打印远程代码分布
    scheduledExecutorService.scheduleWithFixedDelay(() -> {
        try {
            NettyRemotingServer.this.printRemotingCodeDistribution();
        } catch (Throwable e) {
            TRAFFIC_LOGGER.error("NettyRemotingServer print remoting code distribution exception", e);
        }
    }, 1, 1, TimeUnit.SECONDS);
}

好啦,到这里我们的启动过程就看的差不多了,核心是 NamesrvController 控制器,看完我们最起码要知道的是,默认的端口是 9876,并且会有一个调度任务是每隔 5秒 扫描 Broker 的状态,不存活的直接移除。

public void scanNotActiveBroker() {
    try {
        // 开始扫描不活跃的 Broker 
        log.info("start scanNotActiveBroker");
        
        // 遍历BrokerLiveTable中的每个Broker信息
        for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {
            // 获取Broker最后一次更新时间
            long last = next.getValue().getLastUpdateTimestamp();
            // 获取Broker的心跳超时时间 默认是 120秒 DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
            long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
            
            // 判断Broker是否超时 也就是超过120秒没有更新的话 就认为不存活 则进行 destory 剔除
            if ((last + timeoutMillis) < System.currentTimeMillis()) {
                // 关闭Broker的通信通道
                RemotingHelper.closeChannel(next.getValue().getChannel());
                // 记录Broker通道过期警告日志
                log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
                // 调用Broker通道销毁后的处理方法
                this.onChannelDestroy(next.getKey());
            }
        }
    } catch (Exception e) {
        // 记录扫描不活跃Broker时遇到的异常
        log.error("scanNotActiveBroker exception", e);
    }
}

2.3  NameServer 路由注册、故障剔除

NameServer 主要作用是为消息生产者和消息消费者提供关于主题Topic 的路由信息, 那么NameServer 需要存储路由的基础信息,还要能够管理Broker节点,包括路由注册、 路由删除等功能。

2.3.1  路由元信息

NameServer 路由实现类: org.apache.rocketmq.namesrv.routeinfo.RoutelnfoManager, 在了解路由注册之前,我们首先看一下 NameServer 到底存储哪些信息。

// Broker 默认超时时间 120秒
private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
// Broker 基础信息, 包含 brokerName、 所属集群名称、 主备 Broker地址
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker 集群信息,存储集群中所有 Broker 名称
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker 状态信息。 NameServer 每次收到心跳包时会替换该信息
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker 上的 FilterServer 列表,用于类模式消息过滤
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// Topic 的队列信息映射
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
private final BatchUnregistrationService unRegisterService;
// 所属配置
private final NamesrvController namesrvController;
private final NamesrvConfig namesrvConfig;

RocketMQ 基于订阅发布机制, 一个Topic 拥有多个消息队列,一个Broker为每一主题默认创建4个读队列4个写队列。 多个Broker组成一个集群, BrokerName 由相同的多台 Broker 组成Master-Slave 架构, brokerId 为 0 代表 Master, 大于 0 表示 Slave。 BrokerLivelnfo 中 的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间

QueueData、 BrokerData、 BrokerLiveinfo 类图信息图下:

比如RocketMQ2 主 2 从部署图如下:

对应运行时数据结构如下:

2.3.2  路由注册

RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。 Broker启动时向集群中所有的NameServer发送心跳语句,每隔 30s 向集群中所有NameServer发送心跳包, NameServer 收到 Broker 心跳包时会更新 brokerLiveTable 缓存中 BrokerLivelnfo 的 l astUpdateTimestamp ,然后 NameServer 每隔 5s 扫描 brokerLiveTable,如果连续 120s 没有收到心跳包, NameServer将移除该Broker 的路由信息同时关闭 Socket连接。

2.3.2.1  Broker 发送心跳包

Broker 发送心跳包的核心代码如下:

// BrokerController#start
public void start() throws Exception {
    ...
    scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
        @Override
        public void run0() {
            try {
                if (System.currentTimeMillis() < shouldStartTime) {
                    BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
                    return;
                }
                if (isIsolated) {
                    BrokerController.LOG.info("Skip register for broker is isolated");
                    return;
                }
                // 注册
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                BrokerController.LOG.error("registerBrokerAll Exception", e);
            }
        }
        // 延迟10秒启动 
        // brokerConfig.getRegisterNameServerPeriod() 默认 30秒 private int registerNameServerPeriod = 1000 * 30; 
        // 也就是每隔 30秒执行一次
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
    ...
}

最后落点是在 BrokerOuterAPI#registerBrokerAll:

// BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean enableActingMaster,
    final boolean compressed,
    final Long heartbeatTimeoutMillis,
    final BrokerIdentity brokerIdentity) {

    final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    // 获取所有的 NameServer 信息
    List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setEnableActingMaster(enableActingMaster);
        requestHeader.setCompressed(false);
        if (heartbeatTimeoutMillis != null) {
            requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);
        }

        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        // 计数器锁
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        // 扔到线程池中注册
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
                @Override
                public void run0() {
                    try {
                        RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                        if (result != null) {
                            registerBrokerResultList.add(result);
                        }

                        LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
                    } catch (Exception e) {
                        LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }

        try {
            if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
                LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
            }
        } catch (InterruptedException ignore) {
        }
    }

    return registerBrokerResultList;
}

该方法主要是遍历NameServer列表, Broker 消息服务器依次向 NameServer发送心跳包。

发送心跳包具体逻辑,首先封装请求包头(Header):

brokerAddr: broker 地址

broker Id: brokerld,O:Master:,大于 0: Slave

brokerName: broker 名称

clusterName: 集群名称

haServerAddr: master 地址,初次请求时该值为空, slave 向 Nameserver 注册后返回

requestBody:filterServerList。 消息过滤服务器列表;topicConfigWrapper。 主题配置, topicConfigWrapper 内部封装的是 TopicConfigManager 中的 topicConfigTable,内部存储的是 Broker启动时默认的一些 Topic, MixAll. SELF_TEST_ TOPIC 、 MixAll.DEFAULT_TOPIC ( AutoCreateTopic Enable=true )., MixAll.BENCHMARK_TOPIC 、 MixAll.OFFSET_MOVED_EVENT、 BrokerConfig#brokerClusterName、 BrokerConfig#brokerName。 Broker 中 Topic 默认存储在${Rocket_Home}/store/confg/topic. json 中。

RocketMQ 网络传输基于 Netty, 具体网络实现细节本书不会过细去剖析,在这里介绍 一下网络跟踪方法: 每一个请求, RocketMQ 都会定义一个RequestCode,然后在服务端会 对应相应的网络处理器(processor包中), 只需整库搜索 RequestCode 即可找到相应的处理 逻辑。

2.3.2.2  NameServer 处理心跳包

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 网络处理器解析请求类 型, 如果请求类型为RequestCode.REGISTER_BROKER,则请求最终转发到 RoutelnfoManager#registerBroker。

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final String zoneName,
    final Long timeoutMillis,
    final Boolean enableActingMaster,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        // 加锁 这个是不是有点问题  lock 是不是应该写在 try 上边?
        this.lock.writeLock().lockInterruptibly();
        //init or update the cluster info
        Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
        brokerNames.add(brokerName);

        boolean registerFirst = false;
        // 获取当前的 broker 信息
        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
        if (null == brokerData) {
            registerFirst = true;
            brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
            // 更新
            this.brokerAddrTable.put(brokerName, brokerData);
        }
        // 具体的主从的逻辑 我就没看了哈
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }

    return result;
}

路由注册需要加写锁,防止并发修改RoutelnfoManager 中的路由表。 首先判断 Broker 所属集群是否存在, 如果不存在则创建,然后将broker名加入到集群Broker集合中。

BrokerLivelnfo,存活 Broker 信息表, BrokeLivelnfo 是执行路由删除的重要依据。

2.3.3  路由删除

上面看到Broker每隔 30s 向 NameServer发送一个心跳包,心跳包中包含 Broker Id 、 Broker 地址、 Broker 名称、 Broker 所属集群名称、 Broker 关联的 FilterServer 列表。 但是如果Broker若机, NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker 呢? Name Server 会每隔 5s 扫描 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker失效,移除该 Broker, 关闭与Broker连接,并同时更新topicQueueTable、 brokerAddrTable、 brokerLive Table、 filterServerTable。

RocktMQ 有两个触发点来触发路由删除:

(1)NameServer 定时扫描 brokerLiveTable 检测上次心跳包与当前系统时间的时间差, 如果时间戳大于 120s,则需要移除该Broker信息 这个我们上边看过了。

(2)Broker 在正常被关闭的情况下,会执行unrRgisterBroker指令。

由于不管是何种方式触发的路由删除,路由删除的方法都是一样的,就是从topic QueueTable、 brokerAddrTable、 brokerLiveTable、 filterServerTable 删除与该 Broker 相关的信息。

2.3.4  路由发现

RocketMQ 路由发现是非实时的,当 Topic路由 出现变化后, NameServer不主动推送给客户端, 而 是由客户端定时拉取主题最新的路由。 根据主题名 称拉取路由信息的命令编码为: GET_ROUTEINTO_BY_TOPIC。RocketMQ 路由结果如图 2-6 所示。

orderTopicConf :顺序消息配置内容,来自于 kvConfig

List<QueueData> queueDatas: topic 队列元数据

List<BrokerData> brokerDatas : topic 分布的 broker 元数据

HashMap<String/* brokerAddress*/ ,List<String>/*filterServer*/>filterServerTable: broker 上过滤服务器地址列表

NameServer 路由发现实现类:ClientRequestProcessor#getRoutelnfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    // 创建一个响应命令对象,用于后续填充响应信息
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // 解析请求命令的自定义头,获取特定的请求参数
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    // 检查名称服务器是否准备就绪
    boolean namesrvReady = needCheckNamesrvReady.get() && System.currentTimeMillis() - startupTimeMillis >= TimeUnit.SECONDS.toMillis(namesrvController.getNamesrvConfig().getWaitSecondsForService());

    // 如果名称服务器未准备就绪且配置了等待服务,则返回错误响应
    if (namesrvController.getNamesrvConfig().isNeedWaitForService() && !namesrvReady) {
        log.warn("name server not ready. request code {} ", request.getCode());
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("name server not ready");
        return response;
    }

    // 从路由信息管理器中获取指定主题的路由数据
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    // 如果找到了主题的路由数据,则进行后续处理
    if (topicRouteData != null) {
        ...

        // 根据请求版本和是否只接受标准JSON来决定如何序列化主题路由数据
        byte[] content;
        Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();
        if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || null != standardJsonOnly && standardJsonOnly) {
            content = topicRouteData.encode(SerializerFeature.BrowserCompatible,
                SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
                SerializerFeature.MapSortField);
        } else {
            content = topicRouteData.encode();
        }

        // 填充响应命令的主体,设置成功响应码,并返回响应
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    // 如果没有找到主题的路由信息,则返回错误响应
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

主要就是调用 RouterlnfoManager 的方法,从路由表 topicQueueTable、 brokerAddrTable、 filterServerTable 中分别填充 TopicRouteData 中的 List<QueueData>、 List<BrokerData>和 filterServer 地址表。如果找到主题对应的路由信息并且该主题为顺序消息,则从NameServer KVconfig 中获取关于顺序消息相关的配置填充路由信息。如果找不到路由信息CODE则使用TOPIC NOT_EXISTS,表示没有找到对应的路由。

3  小结

本章主要介绍了NameServer路由功能,包含路由元数据、路由注册与发现机制。 为了 加强对本章的理解,路由发现机制,大致关系如下:

还有我们起码要知道的是路由元信息是存放在 NamesrvConntroller 核心控制器里的 RouteInfoManager类里的几个集合中,Broker路由注册是BrokerOutApi里的 registerBroker 方法,路由心跳是相互的。NamesrvController 里的定时任务每隔5秒看看 Broker列表里是否都存活以及Broker启动的时候启动定时任务每隔 30秒 更新一下自己的元信息保持存活。发送消息的时候通过 ClientRequestProcessor里的getRoutelnfoByTopic方法获取某个 Topic 的路由信息,知道这些关键类的关系哈,有理解不对的地方还请指正哈。

标签:Broker,private,RocketMQ,new,NameServer,final,路由
From: https://www.cnblogs.com/kukuxjx/p/18503628

相关文章

  • HCIP 路由引入
    一、实验拓扑二、实验需求及解法本实验模拟OSPF与IS-IS互联的网络环境,完成以下需求:1.配置所有设备的IP地址。R1:interfaceGigabitEthernet0/0/1ipaddress13.1.1.1255.255.255.0interfaceSerial1/0/0ipaddress12.1.1.1255.255.255.0R2:interfaceGiga......
  • Vue-Router实现路由跳转
    1、官方指导文件1、官方指导文件客户端路由的作用是在单页应用(SPA)中将浏览器的URL和用户看到的内容绑定起来。当用户在应用中浏览不同页面时,URL会随之更新,但页面不需要从服务器重新加载。2、如何定义一个新的路由1)引入相关的组件importHomeViewfrom'../views/H......
  • 【RocketMQ】源码以及环境搭建
    1  前言本节我们开始看一下RocketMQ相关的东西,我们主要看一条链路,大致如下:(1)环境的搭建,源码的下载(2)消息的结构以及相关类可能也会看下消息的存储(3)消息的生产以及发送过程(4)消息的消费过程大概看着四方面的内容,本节主要看下源码的下载以及环境的搭建。在看之前,我们顺便回......
  • 【华为HCIP实战课程十八】OSPF的外部路由类型,网络工程师
    一、外部路由类型:上节讲的外部路由类型,无关乎COST大小,OSPF外部路由类型1优先于外部路由类型2二、转发地址实验拓扑我们再SW3/R5/R6三台设备运行RIP,SW3即运行RIP又运行OSPFSW3配置rip[SW3-rip-1]ver2[SW3-rip-1]network10.0.0.0AR5去掉ospf配置和AR6配置rip[R5-ri......
  • RocketMQ 消息堆积了怎么解决
    目录引言消息堆积的原因RocketMQ的基本架构解决消息堆积的方法4.1扩大消费者规模4.2调整消息优先级4.3优化消费逻辑4.4消息重试与死信队列4.5监控与报警机制实现解决堆积的步骤5.1扩大消费者规模的配置5.2调整消息优先级的配置5.3优化消费逻辑的示例5.4......
  • 互联网十万个为什么之什么是路由?
    路由(Routing)是网络中数据包从源点到目的地的路径选择过程。它可以确定数据包在多个网络互联的设备(如路由器)之间传输的最佳路径。路由器使用预先定义的路由协议和策略来决定如何将数据包转发到下一个节点或最终目的地。这个决策是基于路由表的信息,该信息包含了网络中各路径的......
  • 【NodeJS】NodeJS+mongoDB在线版开发简单RestfulAPI (二):项目文件夹架构及路由的设置
    本项目旨在学习如何快速使用nodejs开发后端api,并为以后开展其他项目的开启提供简易的后端模版。(非后端工程师)由于文档是代码写完之后,为了记录项目中需要注意的技术点,因此文档的叙述方式并非开发顺序(并非循序渐进的教学文档)。建议配合项目源码node-mongodb-template。【NodeJS......
  • 手把手Linux安装RocketMQ教程
    手把手Linux安装RocketMQ教程1.下载rocketmq安装包2.创建目录并将压缩包上传至服务器3.配置RocketMQ4.启动RocketMQ5.关闭RocketMQ6.测试RocketMQ7.mqadmin查看服务状态8.配置启动脚本1.namesrv脚本2.broker脚本3.单脚本启动4.单脚本停止待完善1.开启自启动配置2.安装ro......
  • 浅析RocketMQ
    SpringBoot引入RocketMQ快速构建单机RocketMQhttps://www.haveyb.com/article/3079参考这篇文章,快速构建单机RocketMQ项目引入jar包和配置<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter&......
  • 最新 Seata 集成了RocketMQ事务消息,Seata 越来越 牛X 了! yyds !
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......