首页 > 其他分享 >RocketMQ关键技术整理

RocketMQ关键技术整理

时间:2023-07-25 11:11:53浏览次数:41  
标签:事务 关键技术 Producer Broker 消息 整理 Consumer RocketMQ

form https://gitee.com/apache/rocketmq/tree/master/docs/cn

技术架构

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。

  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

    1. Remoting Module:整个Broker的实体,负责处理来自Client端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

存储结构

(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

(2) ConsumeQueue:消息的消费索引,引入的主要目的是提高消息消费的性能。ConsumeQueue保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset、消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构。consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

(3) IndexFile:消息的索引文件,提供了一种可以通过key或时间区来查询消息的方法。Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。

RocketMQ针对Producer和Consumer采用了数据和索引分离的混合存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。

另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中,将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

通信机制

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。

为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块

在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

在RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway) 三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。

在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端知道从Broker端的哪一个消息队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。

至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。

消息过滤

RocketMQ的消息过滤是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的。

主要支持如下2种的过滤方式:

(1) Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。Broker从文件存储层(store)读取数据之前,会先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

(2) SQL92的过滤方式:这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行

负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。

事务消息

RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,采用了2PC的思想来实现了提交事务消息,通过事务消息能达到分布式事务的最终一致。

事务消息流程:

1.事务消息发送及提交:

(1) 发送消息(half消息)

(2) 服务端响应消息写入结果

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见

2.补偿流程:

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

 

事务消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

延迟消息

// RocketMQ延迟消息的定义
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

延迟消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费

broker会调度地消费SCHEDULE_TOPIC_XXXX(通过java.util.Timer每秒轮询),将消息写入真实的topic。需要注意的是,延迟消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。 

 

标签:事务,关键技术,Producer,Broker,消息,整理,Consumer,RocketMQ
From: https://www.cnblogs.com/morningao/p/17569324.html

相关文章

  • 华为20级大牛最新整理500页网络协议手册,Github竟一夜标星186K
    网络协议是每个程序员入门的必修课,但是完全掌握网络协议知识并进行实际应用却并非易事。在本书中,作者将结合自己从业多年的“泣血”经验,以通俗易懂、更加贴近日常生活的方式,从底层到上层对最基础、最常用、最重要的网络协议进行解析,并将深入分析网络协议在云计算、容器和微服务等......
  • RocketMQ 生产端与消费端
    参考:en_oc:https://www.cnblogs.com/enoc/p/rocketmq-so-no-roku.html田守枝(rebalance):https://cloud.tencent.com/developer/article/1554950官方文档:https://rocketmq.apache.org/zh/docs/  发送消息RocketMQ中定义了如下三种消息通信的方式:SYNC:同步发送,生产端会阻塞等......
  • 分布式开放消息系统(RocketMQ)的原理与实践
    备注:1.如果您此前未接触过RocketMQ,请先阅读附录部分,以便了解RocketMQ的整体架构和相关术语2.文中的MQServer与Broker表示同一概念分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:消息的顺序问......
  • 四边形不等式小整理
    原文:四边形不等式优化dp以下为内容摘录:若二元函数满足当\(a\leqb\leqc\leqd\),有\(w(a,d)+w(b,c)\geqw(a,c)+w(b,d)\),则称二元函数满足四变形不等式。二维递推优化优化式:\(f[l][r]=\min_{l\leqk<r}\{f[l][k]+f[k+1][r]+w[l][r]\}\)(区间递推)\(f[i][j]=\min_{i−1......
  • 数百种PCB元器件封装整理,看看有没有你想要的!
    常见的PCB元器件有数百种,它们都有不同的封装形式,不同封装的元器件很有可能具有相同的外观,但其内部结构和应用场景却不一样,不管是简单的电子电路还是复杂的电子电路,皆是由这些电子元器件组成。目前优恩半导体具备市场所需全品类过压过流保护元件及分立功率器件产品,如ESD静电保护器件......
  • C#中的重写与多态知识点整理(刘铁锰老师课堂笔记)
    在C#中,重写(Override)和多态(Polymorphism)是面向对象编程中的重要概念。通过重写和多态,我们可以更好地组织和管理代码,提高代码的可维护性和可扩展性。重写(Override)重写是指在派生类中重新实现基类中已经定义的方法。通过重写一个方法,我们可以为派生类中的该方法提供新的实现,同时让......
  • 工作流学习,工作流定义工具部分(未整理)
    工作流定义工具需求分析工作流分类:管理型、设定型、协作型、生产型。以通讯为中心、以文档为中心、以过程为中心、基于文件、基于消息、基于web。工作流模型包括了描述一个能够由工作流执行服务软件系统执行的过程所需的所有信息。这些信息包括:过程的开始、完成条件,构成过程的......
  • 从互联网到云时代,Apache RocketMQ 是如何演进的?
    作者:隆基2022年,RocketMQ5.0的正式版发布。相对于4.0版本而言,架构走向云原生化,并且覆盖了更多业务场景。消息队列演进史操作系统、数据库、中间件是基础软件的三驾马车,而消息队列属于最经典的中间件之一,已经有30多年的历史。消息队列的发展主要经历了以下几个阶段:第一......
  • 关于使用RocketMQ搭建多Master多Slave模式(同步)集群时遇到的问题
    搭建多Master多Slave模式(同步)集群时的java.lang.NullPointerException异常一、运行环境等基本描述(问题产生原因是权限问题,即权限不够导致无法启动broker,甚至broker线程无法通过jps命令查出。下面阐述分析思路)1.1)操作系统:Linux虚拟机:VMwareWorkstation16Pro、WSL ......
  • rocketmq
        ......