首页 > 其他分享 >RocketMQ 是如何发送消息

RocketMQ 是如何发送消息

时间:2022-11-24 11:25:15浏览次数:44  
标签:MessageQueue 写入 Broker 发送 消息 CommitLog response RocketMQ

创建Topic的时候为何要指定MessageQueue数量?

  • 简单来说,就是你要指定你的这个Topic对应了多少个队列,也就是多少个MessageQueue。
  • MessageQueue就是RocketMQ中非常关键的一个数据分片机制,他通过MessageQueue将一个Topic的数据拆分为了很多个数据分片,然后在每个Broker机器上都存储一些MessageQueue。
  • Topic是一个逻辑上的概念,实际上在每个broker上以queue的形式保存,也就是说每个topic在broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引。

生产者发送消息的时候写入哪个MessageQueue?

  • 生产者会跟NameServer进行通信获取Topic的路由数据, 以生产者从NameServer中就会知道,一个Topic有几个MessageQueue,哪些MessageQueue在哪台Broker机器上
  • 让一个Topic中的数据分散在多个MessageQueue中,进而分散在多个Broker机器上,实现RocketMQ集群分布式存储海量的消息数据了

如果某个Broker出现故障该怎么办?

  • 如果某个Broker临时出现故障了,比如Master Broker挂了,此时正在等待的其他Slave Broker自动热切换为Master Broker,那么这个时候对这一组Broker就没有Master Broker可以写入了
  • 如果你还是按照之前的策略来均匀把数据写入各个Broker上的MessageQueue,那么会导致你在一段时间内,每次访问到这个挂掉的Master Broker都会访问失败,这个似乎不是我们想要的样子。
  • 对于这个问题,通常来说建议大家在Producer中开启一个开关,就是sendLatencyFaultEnable
  • 一旦打开了这个开关,那么他会有一个自动容错机制,比如如果某次访问一个Broker发现网络延迟有500ms,然后还无法访问,那么就会自动回避访问这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker了

RocketMQ 是如何持久化消息的

1、为什么Broker数据存储是最重要的一个环节

  • roker数据存储实际上才是一个MQ最核心的环节,他决定了生产者消息写入的吞吐量,决定了消息不能丢失,决定了消费者获取消息的吞吐量,这些都是由他决定的

2、CommitLog消息顺序写入机制

  • 当生产者的消息发送到一个Broker上的时候,他接收到了一条消息,接着他会对这个消息做什么事情?首先第一步,他会把这个消息直接写入磁盘上的一个日志文件,叫做CommitLog,直接顺序写入这个文件
  • 这个CommitLog是很多磁盘文件,每个文件限定最多1GB,Broker收到消息之后就直接追加写入这个文件的末尾,就跟上面的图里一样。如果一个CommitLog写满了1GB,就会创建一个新的CommitLog文件。

RocketMq是如何写入数据的

设定一个topic -> 根据设定的MessageQueue个数 -> 分不在不同的master Broker里边 -> 每个MessageQueue是由多个 CommitLog组成 -> Commit是采用顺序读写。加上OS PageCache来保证写入性能 -> 首先。OS PageCache是基于内存的缓冲池。采用异步刷盘或者同步刷盘顺序写入磁盘 (异步刷盘宕机是会有可能导致数据丢失的

  • DLedger 相当于替换了 CommitLog
  • DLedger CommitLog 来构建出机器上的MessageQueue
  • Broker机器刚刚启动的时候,就是靠这个DLedger基于Raft协议实现的leader选举机制,互相投票选举出来一个Leader,其他人就是Follower,然后只有Leader可以接收数据写入,Follower只能接收Leader同步过来的数据
  • DLedger收到一条数据之后,会标记为uncommitted状态,然后他会通过自己的DLedgerServer组件把这个uncommitted数据发送给Follower Broker的DLedgerServer。
  • 接着Follower Broker的DLedgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的DLedgerServer,然后如果Leader Broker收到超过半数的Follower Broker返回ack之后,就会将消息标记为committed状态。
  • 也就是说。当leaderBroker收到消息之后会同步给 FollowerBroker 节点。当节点响应ack之后主节点才会返回给生产者ack

源码索引

  • 消息发送
  • producer.send(msg);
  • -> defaultMQProducerImpl.sendDefaultImpl
    • -> this.tryToFindTopicPublishInfo 从 NameService 获取 Topic路由信息(本地有缓存就从缓存中获取)
    • -> this.selectOneMessageQueue 选择一个消息队列 queue
    • -> this.sendKernelImpl 调用发送核心方法
    • -> mQClientFactory.getMQClientAPIImpl().sendMessage 进行发送
  • -> MQClientAPIImpl#sendMessageSync
  • -> remotingClient.invokeSync 调用netty方法发送 RequestCode.SEND_MESSAGE 消息
  • broker接受到消息的处理
  • -> NettyServerHandler#channelRead0
  • -> NettyRemotingAbstract#processMessageReceived
  • -> NettyRemotingAbstract#processRequestCommand 处理客户端的请求消息
  • -> processor.asyncProcessRequest 客户端发送的是异步消息,不需要同步返回成功
  • -> SendMessageProcessor#asyncProcessRequest 进入消息处理
  • -> AbstractSendMessageProcessor#parseRequestHeader 解析请求
  • -> SendMessageProcessor#asyncSendMessage 异步保存发送的消息
  • -> this.brokerController.getMessageStore().asyncPutMessage(msgInner) MessageStore存储消息
  • -> MessageStore#asyncPutMessage 异步保存发送的消息
  • -> MessageStore#putMessage 保存发送的消息
  • -> DefaultMessageStore#asyncPutMessages DefaultMessageStore保存消息默认实现
  • -> this.commitLog.asyncPutMessages(messageExtBatch) 保存发送的消息
  • -> CommitLog#asyncPutMessages 保存发送的消息
  • -> mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext) mappedFile
  • -> CommitLog#submitFlushRequest 提交刷盘 (异步 / 同步)
  • -> CommitLog#submitReplicaRequest 将消息同步到从节点。可配置备份多少个
  • -> 消息保存完毕

rocketMq 同步消息的原理

  • netty调用方会触发的动作

  • RemotingClient#invokeSync

  • RemotingClient#invokeSyncImpl 发送同步方法的实现

  • this.responseTable.put(opaque, responseFuture) 这个很关键,将opaque 存到响应的 responseTable里边

  • 然后下方 responseFuture.waitResponse(timeoutMillis) 会阻塞当前请求

  • netty被调用方会触发的动作

  • NettyRemotingAbstract 然后我们看此处的方法。

  • RemotingResponseCallback callback = new RemotingResponseCallback() 构建了一个远程

  • 如果请求是同步请求的话,一定会触发 callback.callback(response);

final RemotingResponseCallback callback = new RemotingResponseCallback() {
    @Override
    public void callback(RemotingCommand response) {
        doAfterRpcHooks(remoteAddr, cmd, response);
        if (!cmd.isOnewayRPC()) {
            if (response != null) {
                response.setOpaque(opaque);
                response.markResponseType();
                try {
                    ctx.writeAndFlush(response);
                } catch (Throwable e) {
                    log.error("process request over, but response failed", e);
                    log.error(cmd.toString());
                    log.error(response.toString());
                }
            } else {
            }
        }
    }
};
  • 请观察一下这个的实现。 response.setOpaque(opaque); 想当于将请求的 opaque 塞入到了response里边。

  • 然后将 ctx.writeAndFlush(response); 到调用方

  • 然后回到调用方

  • NettyRemotingAbstract#processMessageReceived 检查到是 RESPONSE_COMMAND 响应的请求

  • responseFuture.putResponse 会设置 responseCommand 并且 countDownLatch.countDown 释放之前阻塞的请求

标签:MessageQueue,写入,Broker,发送,消息,CommitLog,response,RocketMQ
From: https://www.cnblogs.com/yunlongn/p/16921223.html

相关文章

  • 1、RocketMQ(安装与测试)
    1、RocketMQ(安装与测试)本文编写时间:2022年11月23日Version:RocketMQ4.8版本虚拟机:2核4GJDK:1.832位下载......
  • Python:企业微信批量发工资条工具 -应用消息发送模块
    16年python练手,开发了采用企业微信批量发工资条的程序,其中对企业微信发消息的功能做了包装。在此记录和分享一下。整体程序,使用了wxPythonGUI,以excel来存放数据和消息......
  • win32开发(按键消息)
      对于一个应用来说,按键和鼠标都是基本的消息。当然,win32也需要独立处理按键消息和鼠标消息。今天,我们就讨论一下按键消息。一般认为,当键盘上一个key按下去之后,os会给ap......
  • 嵌入式操作系统内核原理和开发(消息队列)
         消息队列是线程交互的一种方法,任务可以通过消息队列来实现数据的沟通和交换。在嵌入式系统上,这可以说这是用的最多的一种方法。通过消息队列,无论是发送者,还是接......
  • ftk学习记(消息框篇)
      上一篇说到了输入框。闲话不多说,首先看结果显示,  大家看看效果是不是和我们之前说的一样。今天,我们谈一下消息狂。这种消息框其实应用得特别多,有警告用的,有提问的......
  • autojs git判断差异python发送的手机上
    #!usr/bin/python#-*-coding:utf-8-*-#根据git变化,将变化的文件推送到手机上importosimportsysprint("文件编码格式:"+sys.getdefaultencoding())#############......
  • 解决PostMessage发送字符串造成数据错乱问题
       以前一直使用PostMessage来发送字符串数据到主界面,由于字符串是临时变量,而PostMessage是异步发送,有时候由于主界面接收到数据的时候,系统已经将字符串占用的内存释放......
  • 消息队列中间件nsq安装与使用
    安装与运行nsq的镜像开启容器时并不是默认开启三个服务的,而是需要手动开启。dockerpullnsqio/nsqdockerrun-itd--restart=on-failure:20-p4150:4150-p4151:4151-p......
  • 精华推荐 | 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketM
    RocketMQ的发展历史RocketMQ是一个统一消息引擎、轻量级数据处理平台。RocketMQ是一款阿里巴巴开源的消息中间件。2016年11月28日,阿里巴巴向广西党性培训Apache软......
  • freertos消息队列的值传递和指针传递
    消息队列的使用方法总结:1、消息队列初始化(定义一个消息队列的结构体),一般在main.c中完成。2、消息队列的发送:  aextern消息队列   b定义一个结构体的指针指向消......