首页 > 其他分享 >RocketMQ - 生产者消息发送流程

RocketMQ - 生产者消息发送流程

时间:2023-02-21 09:12:18浏览次数:40  
标签:topic 流程 Broker 发送 topicPublishInfo 消息 RocketMQ

RocketMQ客户端的消息发送通常分为以下3层

  • 业务层:通常指直接调用RocketMQ Client发送API的业务代码。
  • 消息处理层:指RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作。
  • 通信层:指RocketMQ基于Netty封装的一个RPC通信服务,RocketMQ的各个组件之间的通信全部使用该通信层。

消息发送流程首先是RocketMQ客户端接收业务层消息,然后通过DefaultMQProducerImpl发送一个RPC请求给Broker,再由Broker处理请求并保存消息。
image

下面以DefaultMQProducer.send(Messagemsg)接口为例讲解发送流程,

  • 第一步:调用defaultMQProducerImpl.send()方法发送消息。
  • 第二步:通过设置的发送超时时间,调用defaultMQProducerImpl.send()方法发送消息。设置的超时时间可以通过sendMsgTimeout进行变更,其默认值为3s。
  • 第三步:执行 defaultMQProducerImpl.sendDefaultImpl()方法。
private SendResult sendDefaultImpl(
    Message msg,
    //通信模式,同步、异步还是单向
    final CommunicationMode communicationMode,
    //对于异步模式,需要设置发送完成后的回调
    final SendCallback sendCallback,
    final long timeout
){}

该方法是发送消息的核心方法,执行过程分为5步:
第一步: 两个检查:生产者状态、消息及消息内容。没有运行的生产者不能发送消息。消息检查主要检查消息是否为空,消息的Topic的名字是否为空或者是否符合规范;消息体大小是否符合要求,最大值为4MB,可以通过maxMessageSize进行设置。
第二步: 执行tryToFindTopicPublishInfo()方法:获取Topic路由信息,如果不存在则发出异常提醒用户。如果本地缓存没有路由信息,就通过Namesrv获取路由信息,更新到本地,再返回。具体实现代码如下:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

第三步: 计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的。
第四步: 执行队列选择方法selectOneMessageQueue()。根据队列对象中保存的上次发送消息的Broker的名字和Topic路由,选择(轮询)一个Queue将消息发送到 Broker 。 我们可以通过sendLatencyFaultEnable 来设置是否总是发送到延迟级别较低的 Broker,默认值为False。
第五步: 执行sendKernelImpl()方法。该方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址、请求体等),将请求传递给通信层,内部实现是基于Netty的,在封装为通信层request对象RemotingCommand前,会设置RequestCode表示当前请求是发送单个消息还是批量消息。

标签:topic,流程,Broker,发送,topicPublishInfo,消息,RocketMQ
From: https://www.cnblogs.com/vipsoft/p/17136280.html

相关文章

  • day15-SpringMVC执行流程
    SpringMVC执行流程1.SpringMVC执行流程分析图例子(1)创建HaloHandlerpackagecom.li.web.debug;importorg.springframework.stereotype.Controller;importorg.s......
  • 客户端跟redis的一次通信流程
    1.在redis启动初始化的时候,redis会将连接应答处理器跟AE_READABLE事件关联起来,接着如果一个客户端跟redis发起连接,此时会产生一个AE_READABLE事件,然后由连接应答处理......
  • activiti 6 遍历流程任务
    XMLInputFactoryxif=XMLInputFactory.newInstance();List<ProcessDefinition>list=repositoryService.createProcessDefinitionQuery().latestVersi......
  • app上架需要准备什么以及上架流程
     上架前需要准备的材料:众所周知,应用市场主要分为两类,一类是AppStore,另一类是安卓市场。而安卓市场又分为:第三方市场(如:应用宝、360手机助手、豌豆荚),和手机厂商市......
  • Window部署RocketMQ
    预备环境JDK1.8、Maven、Git,具体安装可自行百度一、安装包下载从官网下载https://rocketmq.apache.org/release-notes选择合适的版本下载,我这里下载的ApacheRocketMQ......
  • Rocketmq的tag显示积压
    背景公司有一个topic,消费者160多个,全都使用了tag来区分消息,在压测的时候,发现一个问题,消费者触发了积压告警,压测的consumerA还没开始压测,平台显示consumerA的积压值在不停的......
  • 流程控制
    引子流程控制即控制流程,具体指控制程序的执行流程,而程序的执行流程分为三种结构:顺序结构(之前我们写的代码都是顺序结构)、分支结构(用到if判断)、循环结构(用到while与for)一......
  • 完整商业软件的开发流程
    完整商业软件的开发流程1、产品立项,确定要做的内容、效果和收益2、开始产品原型,确定初步逻辑和技术——>需求评审,定开发节点、UI节点3、根据原型出设计图,确定实现细节......
  • RocketMQ启动
     1.启动1.1启动NAMESERVERWindows键+R进入至‘MQ文件夹\bin’下执行:startmqnamesrv.cmd2.启动BROKER从新windows+R打开cmd-->进入‘MQ文件夹\bin’下......
  • Java流程控制
    Java流程控制Scanner对象next():一定要读取到有效字符后才可以结束输入对输入有效字符之前遇到的空白,next()方法会自动将其去掉只有输入有效字符后才能将其后面的......