首页 > 其他分享 >RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程

RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程

时间:2024-01-09 13:34:32浏览次数:36  
标签:RocketMQ request Broker 发送 消息 注册 NameServer 路由

欢迎关注公众号:【11来了】 发送 “资料” 可以下载Redis、JVM系列文章PDF版本!

作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!


NameServer 路由注册机制

在 Broker 启动时,通过 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); 向 NameServer 中注册自己

那么 NameServer 中,注册 Broker 信息的入口在: DefaultRequestProcessor # processRequest

  • 判断请求码,如果是 Broker 注册,则进行注册 Broker 信息
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    if (ctx != null) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }

    switch (request.getCode()) {
        // ... 省略
        // 如果是 Broker 注册
        case RequestCode.REGISTER_BROKER:
            return this.registerBroker(ctx, request);
        // ... 省略
    }
}
  • this.registerBroker 真正开始注册 Broker 信息
    在注册信息之前,会先使用 crc32 来检验消息的正确性(安全检查)

之后会调用 this.namesrvController.getRouteInfoManager().registerBroker() 来注册 Broker 的信息,这个 Broker 的信息是 BrokerController 启动时通过 Netty 发送过来的

通过 getRouteInfoManager 获取 RouteInfoManager,在该类中注册 Broker 信息,那么 RouteInfoManager 肯定是管理了 Broker 的信息

可以点进去 RouteInfoManager,可以发现其中管理了很多路由的信息

RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程_开发者

其中 brokerLiveTable 存储的是存活的 Broker 列表,那么可以查看该变量的引用链,来判断 Nameserver 在哪里进行心跳扫描

RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程_开发者_02

可以看到在 scanNotActiveBroker 方法中,会将 brokerLiveTable 中不活跃的 Broker 给剔除掉

RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程_开发者_03


生产者的发送消息流程

下面会将整体的一个发送消息的流程图片先展示出来,再通过代码进行一步一步梳理:

RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程_开发者_04

既然要看生产者的发送消息流程,就先通过方法的调用作为入口,一步一步探究流程:

RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程_发送消息_05

那么通过这个 send 方法点进去,入口为:DefaultMQProducer # send(Message msg) 方法,从该方法点击进入,调用链如下:

RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程_发送消息_06

如果你在看源码的话,可以从上边的调用链一步一步点击,最后发送消息的逻辑就在 this.sendDefaultImpl 方法中展开

  1. 首先,会先根据 Topic 获取对应的路由信息,表示该 Topic 需要向哪个 MessageQueue 中进行发送,这个路由信息会先从本地缓存中取,如果没有取到,会向 NameServer 发送请求来获取 Topic 的路由信息
  2. 设置消息发送失败的 重试次数 ,同步情况下重试次数为预设次数 +1,异步情况下默认重试次数为 1
  3. 接下来就根据 重试次数 循环发送消息,为 Topic 选择要发送的队列 MessageQueue 进行消息发送

选择队列之后,就进入到发送消息的核心逻辑:this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

  1. 在该方法中,先通过队列 MessageQueue 找到对应的 brokerAddr
  2. 之后,会尝试对消息进行压缩
  3. 判断是否存在一些需要对消息进行 禁止发送前置拦截 的钩子函数,进行一些消息的拦截处理
  4. 判断通信模式:ASYNC、ONEWAY、SYNC,将消息以对应的方式发送出去,这里以同步 SYNC 为例

如果是同步的话,会通过 this.mQClientFactory.getMQClientAPIImpl().sendMessage() 方法将消息发送出去,接下来又是层层的调用,最后真正通过 Netty 将消息发送出去的地方在 NettyRemotingClient # invokeSync() 的方法中

在这个方法中,还会对消息进行前置拦截和后置拦截,为开发者的使用提供了很多的扩展点,在这里就 真正通过 Netty 将消息发送出去了

RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程_发送消息_07



标签:RocketMQ,request,Broker,发送,消息,注册,NameServer,路由
From: https://blog.51cto.com/u_16186397/9160204

相关文章

  • 中兴BE7200Pro+的WIFI 7路由器开箱
          上一个讨论的帖子:https://www.chiphell.com/thread-2573626-1-1.html。      对应小米WIFI7路由器BE6500Pro开箱的帖子:https://www.chiphell.com/thread-2573915-1-1.html。      上次开箱了小米的路由,当时与坛友说了已经抢了中兴的这款,然后今天取......
  • 小米WIFI 7路由器BE6500 Pro开箱
          上次发帖与坛子里的网友们聊了小米的这款路由,正好今天拿到货了,所以来个开箱图,让其他彦祖们也见识见识小米家的路由产品。      以前买过小米家的路由器,但是当时就是买来尝鲜,这次咋的也是对WIFI7的尝鲜吧,下面开箱:1、 包装照;正面: 背面: ......
  • RocketMQ系统性学习-RocketMQ原理分析之消息的可靠性以及有序性如何保证
    欢迎关注公众号:【11来了】发送“资料”可以下载Redis、JVM系列文章PDF版本!作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!消息的可靠性如何保证?要保证消息的可靠性,先来思考一下从哪些方面保证呢?这要看消息的生命周期,既然保证可靠性,那么就是......
  • RocketMQ系统性学习-RocketMQ原理分析之源码启动、Broker启动流程分析
    欢迎关注公众号:【11来了】发送“资料”可以下载Redis、JVM系列文章PDF版本!作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!RocketMQ原理分析启动RocketMQ源码分析RocketMQ之前,先确保可以成功启动起来NameServer启动在Idea中配置ROCK......
  • RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及事务消息收发、最大重试消费
    欢迎关注公众号:【11来了】发送“资料”可以下载Redis、JVM系列文章PDF版本!作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!事务消息收发流程如下:发送给MQ一条任务操作MQ的Broker成功收到后,那么发送方就开始执行原子db业务如果执行原子......
  • # Vue3 使用路由 Router
    Vue3使用路由Router之前几篇博文说了一下vue的基本语法和vue的传参,今天这篇博文稍微说一下vue3里面使用路由。介绍众所周知,vue是用来构建单页面应用的前端框架,大于大多数此类型应用来讲,都推荐使用官方支持的vueRouter,在单页面应用,客户端的JavaScript可以连接页面......
  • VueRouter中存储路由的参数是什么?
    一、VueRouter的基本介绍什么是VueRouter是一个Vue.js官方的路由管理器,它可以帮助我们在Vue.js应用中实现页面之间的导航和跳转。它提供了一系列的API和配置选项,使得我们可以更加灵活地管理和控制应用的路由。在VueRouter中,存储路由的参数主要是通过路由对象来实现的。每当我们进行......
  • 手撕Vue-Router-提取路由信息
    前言好了经过上一篇的学习,我们已经知道了如何监听Hash的变化,如何监听路径的一个变化,本篇我们就可以来实现我们自己的VueRouter了,那么怎么实现呢,在实现之前我们先来回顾一下官方的VueRouter是怎么使用的。VueRouter的使用首先需要去下载官方的VueRouter,如果是通过npm的......
  • 手撕Vue-Router-初始化路由信息
    前言经过上一节课的学习,我们已经完成了提取我们想要的路由信息数据格式,提取完毕了之后,接下来我们该干什么,接下来需要做的步骤就是监听路由的变化,保存当前的路由。那么就会遇到几个问题,就是怎么监听,怎么保存,我们先回到VueRouter的官方文档,点击右上角的API参考,然后拖动到底部,在底......
  • 动态路由-RIP协议
    RIP简述RIP协议RIP(RoutingInformationProtocol),路由信息协议,是一种距离矢量算法的协议,使用跳数作为度量来衡量到达目的网络的距离。RIP的特点RIP是距离矢量路由协议,属于IGP协议。直连网络跳数为0,路由器发送路由更新时,会把度量值加1,最大15跳;RIP适用于规模较小的网络,有RIPv1和RIPv2......