首页 > 其他分享 >消息中间件——rabbitmq,kafka,rocketmq

消息中间件——rabbitmq,kafka,rocketmq

时间:2024-12-28 16:55:16浏览次数:7  
标签:分区 broker rabbitmq kafka mq 消息 节点 leader rocketmq

目录

mq

mq解决什么问题

rabbitmq

工作原理

消息路由

如何保证消息不丢失

实现高可用

kafka

能支持这么大吞吐量的原因

如何保证消息不丢失

避免重复消费

如何保证消息顺序消费

数据存储原理

IRS

leader选举

rocketmq

为什么不使用zookeeper

分布式事务


mq

message Queue,消息队列,分布式应用之间实现异步通信的一种方式

生产者,生产消息,消息的发起方,负责创建承载业务信息的消息

消息服务端,处理消息的单元,用来创建和保存消息队列,负责消息的存储和传递,是mq的核心部分。

消费者,消费信息,是根据消息承载的信息处理业务逻辑。

mq解决什么问题

流量削峰,解决流量过大,业务需要短时间响应的问题,流量过大但是服务器性能无法满足,导致大量请求被积压,出现客户端超时的场景。为了保证高可用,把请求发送给mq,mq再将请求发送给其他服务器,从而平稳的处理后续业务,起到对大流量缓冲的作用。

例如,订单系统最大QPS是1万,这个处理能力正常时段1秒钟返回结果。但是在流量高峰时期,比如促销秒杀,如果QPS到达2万,订单系统就处理不过来,只能在超过负载后不允许用户下单,如果使用消息队列做缓冲,就可以取消这个限制,把超出负载的订单分散到一段时间处理,这样虽然有些用户下单十几秒后才收到下单成功的提示,但是比不能下单要好。

应用解耦,把相关但是耦合度不高的系统联系起来,解决不同系统之间使用不同框架后不同编程语言产生的兼容性问题,提高整个系统的灵活性。

异步处理,应用于实时性不高的场景,例如用户登录发送的验证码,支付成功通知等。生产者只需要将协商好的消息发送给消息队列,之后由消费者处理,不需要等待消费者返回结果。

rabbitmq

工作原理

核心组件,

producter,生产者

consumer,消费者

broker,代理或中介

connection,连接,消息的发送者和接收者都必须同broker建立一个连接

channel,管道,amqp协议里引入管道的概念,相当于一个虚拟连接,我们可以使用已经连接好的tcp长连接,避免了每次发送和接收消息都要创建和释放tcp长连接,这样降低了资源的消耗。不同的channel是相互隔离的。

Queue 队列,用来存储消息,队列是生产者和消费者的纽带,生产者发送的消息会存储到队列里,而消费者从队列里消费消息。

exchange,交换机,消息的路由器,他不存储消息,根据规则分发消息。

binding,绑定,exchange和队列必须建立一个绑定关系,为每个队列指定一个特殊标识。exchange和队列是多对对的关系,一个交换器的消息可以路由给多个队列,一个队列可以接收多个交换机的消息。在绑定关系建立好之后,生产者发送消息到exchange,会携带一个特殊标识。当这个标识与绑定的标识匹配时,消息就会发送给一个或者多个符合规则的队列。

vhost,虚拟主机,解决不同业务系统之间的消息隔离,节约硬件成本,实现资源的隔离和权限控制。

消息路由

rabbitmq是一个基于amqp实现的分布式消息中间件,生产者把消息发送给broker上的exchange。exchange把收到的消息根据路由规则发送给绑定的队列,最后把消息投递给订阅了这个消息的消费者,完成消息的异步通信。

其中交换机定义了消息的路由规则,消息路由到哪个队列。

队列表示消息的载体,每个消息可以根据路由规则路由到一个或多个队列里。

在消息的路由机制里,核心组件是交换机。负责接收生产者的消息,然后把消息路由到消息队列,而消息的路由规则由exchangeType 和 binging决定。binging表示建立队列和交换机之间的绑定关系,每一个绑定关系存在一个bingingkey,通过这种方式在交换机里建立一个路由关系表。

生产者在发送消息的时候,需要声明一个routingkey路由键,交换机拿到路由键之后,与路由表里面的bingingkey进行匹配,匹配方式由exchangeType决定。

exchange的类型,1.direct,完整匹配,路由键和绑定键完全一致,相当于点对点发送

2.fanout,广播,把消息广播给绑定在交换器上的所有队列。

3.topic,正则匹配,根据路由键使用正则表达式匹配bingingkey,符合匹配规则的队列收到这个消息。

如何保证消息不丢失

消息的传递过程,

1.生产者发送消息到mq,为了保证mq服务器收到生产者发送的消息,使用消息确认机制,生产者需要接收一条确认消息已被接收的通知。否则,就重新发送消息。通过channel的confirmselect开启发送方确认模式。

handleACK方法,表示broker接收到了消息,handleNack,表示消息丢失。这里可以做重试机制,例如一条消息发送3次还没有成功,就认为发送失败了。如果这个消息非常重要不能丢失,例如订单消息,我们这类消息放到数据库进行持久化。给每条消息设置一个状态标识,假设初始状态是0,服务器接收之后,触发回调,这时把这条消息的状态设置1.表示已收到。

补偿机制,用一个定时任务轮询数据库里状态为0的消息,并重新发送。设置一个最大重试次数,防止一条消息不断重试,消耗内存资源降低数据库性能。

但是不断轮询数据库会增加数据库压力,每发送一条消息都有和数据库进行交互,更新消息的状态,优化方法,1.创建一个专用的数据库处理发送失败的消息。

2.采用Redis缓存更新消息状态,减少轮询数据库压力。

mq存储的信息不丢失

开启mq的持久化机制,基于raid刷盘,2种模式

raid0,磁盘集成,将多块磁盘当做一块使用。这种模式可以提高磁盘存储量。

raid1,磁盘镜像,将磁盘分成2半,将同样的数据在2块区域里各存储一份。这样避免了在消息存储的过程里,因为某一块磁盘道坏了,导致消息丢失。

消费者保证消息不丢失,

1.在处理消息的过程里出现了异常,导致消息丢失

2.消费者因为网络中断或者网络抖动,没有接收到消息

3.消费者刚接收到消息,服务器宕机了,导致后续操作中断。

使用信息确认机制。只需要消费者在收到消息后手动确认。当mq没有收到消息确认的时候重发消息。

实现高可用

高可用在分布式系统里,如果出现某些节点不可用的情况,保证客户端能够连接其他节点,不影响业务执行。实现高可用用2种集群部署和负载均衡。

集群部署,集群节点有2种,磁盘节点,内存节点,在集群环境里至少需要一个磁盘节点,用来持久化元数据,避免内存节点都崩溃时,无法同步元数据

集群模式有2种,普通集群模式,不同节点之间只会相互同步元素数据,不同同步消息内容。好处是可以节省存储和同步数据的网络开销。如果需要保证队列的高可用,需要开启镜像队列模式。

镜像队列模式,消息内容在所有镜像节点同步,可用性更高。但系统性能下降,在节点过多的情况下同步数据的代价比较大。

集群搭建成功后,保证高可用,使用负载均衡组件来做路由。对于生产者和消费者只需要连接到组件的虚拟IP地址就可以了。

1.可以监控集群里节点的状态,如果某个节点发生异常或故障把他剔除。

2.为了提高可用性,部署多个服务,能够自主完成主从选举

3.master需要对外提供一个虚拟IP地址,客户端只需要连接到这个虚拟的IP地址就可以完成真实的IP地址路由。

kafka

一个能处理超千万亿消息吞吐量的实时消息处理平台。

能支持这么大吞吐量的原因

磁盘顺序读写,磁盘的盘片不停的旋转,磁头在磁盘表面画出一个圆形轨迹,叫做磁道,由内到外,因为半径不同,所以有许多磁道,用半径线,把磁道分割成扇区。如果需要读写,就需要找到数据对应的扇区,这个过程叫寻址。随机io,读写数据在磁盘上分散存储,寻址比较耗时。顺序io,读写数据在磁盘上集中,不需要重复寻址。

kafka的消息是不断追加到磁盘文件的末尾,是顺序写入,这样提高了吞吐量。

稀疏索引,插入一批消息才会产生一条索引记录。后续使用二分法查找,可以提高检索效率。

批量文件压缩,默认不会删除数据,把所有的消息变成一个批量文件,把相同key合并成一个value,这样对消息进行合理的批量压缩,减少网络io。

零拷贝机制,kafka文件传输使用Java nio库里transferto方法,这个方法使用Linux系统调用sendFile函数,sendfile函数,实现了零拷贝,不需要用户经过缓冲区,可以直接把数据拷贝到网卡。

把磁盘里的文件发送到远程服务器的过程

1.从磁盘里读取目标文件的内容,拷贝到内核空间的缓冲区。

2.cup把内核空间的数据拷贝到用户空间缓冲区。

3.在应用里调用Write方法,把用户空间缓冲区的数据拷贝到内核空间的socket缓冲区。

4.把socket缓冲区里的数据拷贝到网卡

5.网卡把数据传输到目标服务器上。

这个过程需要拷贝4次,其中2次拷贝是浪费的

1.从内核空间拷贝的用户空间

2.再从用户空间拷贝到内核空间

由于用户空间和内核空间切换带来cup的上下文切换,影响了CPU的性能。零拷贝把这2次拷贝省略掉,应用程序直接把磁盘里的数据从内核传输到socket缓冲区,不需要经过用户空间。

如何保证消息不丢失

kafka是一个用来实现异步消息的中间件

producer,需要保证消息能够到达broker实现消息存储。默认采用的是异步发送,要确保消息能够发送成功,有2个方法,

1.把异步消息改成同步消息,这样producer能够知道消息发送结果。

2.添加异步回调函数监听消息发送结果。如果发送失败可以重试。

producer 提供重试参数retries,因为网络或者broker故障,producer自动重试。

broker保证收到的消息不丢失,只需要把消息持久化到磁盘。kafka采用异步批量刷盘实现,按照一定的消息量和时间间隔来刷盘,刷盘动作由操作系统调用,如果在刷盘之前系统破溃了,就会导致数据丢失。

kafka没有同步刷盘机制,这里采用分区副本和acks机制解决。

分区副本机制,是针对每个数据分区的高可用策略。每个分区副本集都包含唯一的leader和多个follower,leader处理事务请求,follower处理leader同步数据。

acks参数,producer可以设置acks参数,结合broker副本机制来共同保证数据可靠性,1.acks=0,表示producer不需要等待broker响应,就认为消息发送成功。

2.acks=1,表示broker里的leader分区在收到消息后,不在等待其他follower分区同步完成,就给producer就返回确认,这种情况下,如果leader 分区挂了,导致消息丢失。3.acks=-1,表示broker里leader分区已收到消息,并且isr列表里的follower分区同步完成,再给生产者返回确认,这样可以保证消息的可靠性。

保证消费者收到消息,如果消费者没有消费完这个消息就提交了,还可以通过调整offset重新消费。

避免重复消费

kafka存储的消息都有一个offset,kafka的消费是通过offset值来维护当前已经消费的数据。每消费一批消息,就会更新offset值,避免重复消费,默认情况下,在消息消费完成后,kafka会自动提交offset,消费端的自动提交有5秒间隔,在消费者消费的过程里,应用程序关闭或者机器宕机,这样offset没有提交,导致重复消费问题。

分区再平衡机制,把多个分区均匀的分配给消费者,消费者从分区消费消息,如果消费者在5分钟里没有消费完这批消息,就会触发再平衡机制,导致offset提交失败。再平衡之后,消费者还是从之前没有提交的offset开始消费,导致重复消费问题。

解决方法,

1.提高消费者的性能避免触发再平衡,使用异步方式处理消息,缩短单个消息消费时间,调整消息处理的超时时间,减少一次性从broker里获取消息的数量。

2.针对消息生成MD5,保存到数据库里,在处理消息之前,先去查询下消息没有有被消费。

如何保证消息顺序消费

kafka使用分区机制存储消息,同一个topic,可以维护多个分区实现消息分片,生产者在生产消息的时候,会根据消息的key进行取模,决定把消息存储到哪个分区里,消息是按照先后顺序有序的存储到分区里

假设topic有3个分区,消息正好被路由到3个独立的分区里面,消费端有3个消费者通过再平衡机制分别被指派了对应的分区,因为消费者是独立的网络节点,所以消费的顺序与发送的顺序不同,出现了乱序消费问题。

解决方法,自定义消息分区路由算法,把指定的key发送给同一个分区,指定特定的消费者来消费某个分区数据,保证消费的顺序。

数据存储原理

kafka存储消息的队列叫做topic,是一个逻辑概念,可以理解为一组消息的集合。topic,生产者,消费者是多对对的关系,一个生产者可以发送消息到多个topic,一个消费者可以从多个topic获取消息。

为了实现横向扩展,kafka会把不同的数据存放在不同的broker上,同时为了降低单体服务器的访问压力,会把一个topic的数据分割成多个分区,在服务器上每一个分区都有一个物理目录,topic名字后面的数字编号代表分区。例如,创建一个mytopic主题,数据目录存储到了3台机器上。mytopic0存储在A节点,mytopic1存储在B节点,mytopic2存储在C节点

为了提高分区的可靠性,设计了副本机制,在创建topic时,通过设定replication-factor副本因子确定副本数量。副本因子必须小于副本数,为了保证不会有1个分区的2个副本存储在一个节点上。所有的副本有2种角色,leader对外提供读写服务,follower从leader异步拉取数据。

为了防止log不断追加导致文件过大,检索消息的效率降低,当一个分区超出一定的大小的时候,会被切割成多个segment,在磁盘上每个segment由一个。log文件,index文件,timeindex文件组成。

log文件用来存储具体的数据文件。

index文件用来存储消费者的offset的索引文件,

Timeindex文件用来存储时间戳的索引文件,

并以切割时记录的offset值作为文件名。

kafka设计2种索引文件,1.偏移量索引文件,记录offset和消息在log文件里的映射关系。

2.时间戳索引文件,记录时间戳和offset的关系。

为了提高检索效率,kafka采用的是稀疏索引,间隔一批消息才产生一条索引记录。

我们可以通过参数设置索引的稀疏程度,索引越密集检索的效率越快,但是占用磁盘空间越多,越稀疏的索引占用的磁盘空间越小,在插入和删除时所需要维护开销越小。kafka在检索数据时,采用的是二分查找法,效率高。

IRS

消息发送到broker上,以分区的形式存储在磁盘上,为了保证分区的可靠性,提供副本机制。在分区的副本里,有leader和follower2种分区,生产者的消息首先存储到leader分区,然后复制到follower分区,这样设计的好处是一旦leader分区的节点挂了,可以重新从剩余的分区里面选举出新的leader分区。这样消费者可以从新的leader分区获取未消费的数据。

在分区副本机制里有2个重要功能,副本数据同步和leader选举。这些功能涉及到网络通信,为了避免通信延迟带来的性能问题,尽可能保证新选出来的leader分区的数据时最新的,设计了ISR方法,全称是in sync replica,是一个列表,保存的是和leader分区节点数据最接近follower分区。如果这个follower分区里面的数据落后leader分区太多,从isr列表删除,那么在列表里面的节点,同步数据是最新的,所以后续的leader选举,只需要从ISR列表进行筛选就可以了。

IRS优点,1.尽可能保证了数据同步的效率,因为同步效率不高的节点已被删除。

2.避免数据丢失。ISR列表里的节点的数据是和leader最接近的。

leader选举

早期版本使用zookeeper完成选举,利用zookeeper的watch机制,所以kafka节点不允许重复写入,但允许设置临时节点。这样实现简单,但是存在一些问题,例如当分区和副本数量过多时,如果所有副本都直接参与选举,一旦出现节点增减,就会造成大量的watch事件触发,导致zookeeper负载过重。

新版本里的实现方法,不是所有的replica数据集都参与leader选举,而是由其中的一个broker统一指挥,他叫做controller。kafka先选出控制器。

所有的broker会尝试在zookeeper里创建临时节点/controller,谁先创建成功,谁就是控制器。如果控制器挂掉或者出现网络问题,zookeeper上的临时节点就会消失。其他broker通过watch监听到控制器下线后,然后重新选出控制器,这个控制器相当于选举委员会主席。

当一个节点成为控制器后,就会监听,管理broker,topic,partition的信息。

控制器确定后,就可以开始分区选举了,只有ISR保持心跳同步的副本才有资格参与选举,让ISR里第一个replica变成leader,例如,isr是1,3,5,优先让1成为leader。

rocketmq

为什么不使用zookeeper

rocketmq使用nameServer实现broker注册和发现,为了保证高可用,nameServer可以集群部署,broker在nameServer上注册自己,生产者和消费者使用nameServer发现broker,每个broker节点在启动的时候,根据配置遍历nameServer列表,与每个nameServer建立长连接,注册自己的信息,之后每隔30s发送心跳信息。每个nameServer每隔10S会检查一些各个broker的最近一次心跳时间,如果发现某个broker超过120s都没有发送心跳信息,就把这个broker从路由信息删除。

rocketmq是一个保持最终一致性的架构设计,他的架构决定了他只需要一个轻量级元数据服务器就够了,不需要像zookeeper这样强一致性解决方案。不依赖另一个中间件可以减少整体的维护成本。根据cap理论,zookeeper选择cp,而nameServer选择ap

分布式事务

半消息,是一种暂时不能投递给消费者的消息,消息生产者已经将消息成功发送到mq服务器,但是mq服务器没有收到生产者对这条消息的二次确认,这时这条消息被标记为暂时不能投递。

消息回查,由于网络阻塞或生产者应用重启等原因,导致某条事务消息的二次确认丢失,mq服务器通过扫描发现某条消息长期处于半消息状态时,就主动向消息的生产者询问该消息的最终状态,要么commit,要么rollback.

生产者向mq服务器发送半消息

mq服务器将消息持久化成功之后,向生产者发送ACK,确认消息已经发送成功,这个时候消息标记为半消息。

生产者开始执行本地事务

生产者根据本地事务执行结果向mq提交二次确认。如果mq收到commit状态,将消息标记为可投递,向消费者发送消息,如果mq服务器收到回滚,则删除半消息,消费者将不会收到该消息。

如果断网或生产者应用重启的情况下,提交的二次确认没有到达mq服务器,经过固定时候后,mq服务器将对该消息发起消息回查。

生产者收到消息回查后,检查对应消息的本地事务执行的结果

生产者根据检查得到的本地事务执行的最终结果再次提交二次确认,如果mq收到commit状态,将消息标记为可投递,向消费者发送消息,如果mq服务器收到回滚,则删除半消息,消费者将不会收到该消息。

标签:分区,broker,rabbitmq,kafka,mq,消息,节点,leader,rocketmq
From: https://blog.csdn.net/z524635690/article/details/144649884

相关文章

  • Kafka为什么这么快? 高性能背后的原理?
    Kafka是一款性能非常优秀的消息队列,每秒处理的消息体量可以达到千万级别。今天来聊一聊Kafka高性能背后的技术原理。1批量发送Kafka收发消息都是批量进行处理的。我们看一下Kafka生产者发送消息的代码:privateFuture<RecordMetadata>doSend(ProducerRecord<K,V>re......
  • Kafka_2.13-3.6.0 常用命令快速指南
    Kafka_2.13-3.6.0常用命令及说明1.环境配置下载并解压Kafka#下载Kafka_2.13-3.6.0安装包wgethttps://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz#解压文件tar-xzfkafka_2.13-3.6.0.tgz#进入Kafka目录cdkafka_2.13-3.6.0启动Zookeeper......
  • Flink CDC MySQL 同步数据到 Kafka实践中可能遇到的问题
    FlinkCDCMySQL同步数据到Kafka实践中可能遇到的问题一、问题场景[ERROR]CouldnotexecuteSQLstatement.Reason:org.apache.flink.table.api.ValidationException:Theprimarykeyisnecessarywhenenable'Key:'scan.incremental.snapshot.enabled',defau......
  • Kafka日志存储全解析
    文章目录1.1.日志存储结构1.1.1.日志文件结构1.1.2.topic1.1.3.partition1.1.4.segment索引文件1.1.5.message结构1.1.6.message查找过程1.2.存储策略1.2.1.顺序写1.2.2.页缓存1.2.3.零拷贝1.2.4.缓存机制1.3.日志格式演变1.3.1.V0版本1.3.2.V1版本1.3.3.V0/V1消息集......
  • kafka的备份策略:从备份到恢复
    文章目录一、全量备份二、增量备份三、全量恢复四、增量恢复前言:Kafka的备份的单元是partition,也就是每个partition都都会有leaderpartiton和followpartiton。其中leaderpartition是用来进行和producer进行写交互,follow从leader副本进行拉数据进行同步,从而保证数据......
  • Kafka数据迁移全解析:同集群和跨集群
    文章目录一、同集群迁移二、跨集群迁移Kafka两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。一、同集群迁移应用场景:broker迁移主要使用的场景是broker上线,下线,或者扩容等.基于同一套zookeeper的操作。实践:将需要新添加的broker列表一并添加到kafk......
  • Kafka Broker、Producer、Consumer配置参数
    参数的设置对Kafka性能有着至关重要的影响。以下是一些关键参数及其对性能的具体影响:KafkaBroker配置参数num.network.threads:控制Kafka网络线程的数量,这些线程负责处理网络I/O操作。增加此参数的值可以提高网络I/O处理能力,但也会增加内存消耗。num.io.threads:控制KafkaI/O......
  • (七).NET6.0部署RabbitMQ
    1.下载erlang语言包OTP。官网地址:https://www.erlang.org/downloads2.Rabbitmq官网下载地址:https://www.rabbitmq.com/download.html需要先安装Erlang语言包,然后再安装RabbitMQ,安装RabbitMQ的服务器名称(电脑名称),以及用户名称,不要带中文,有可能会导致服务无法识别服务器,导致一......
  • 【杂谈】Kafka 消息偏移量:如何高效地定位和管理消息?
    前言在Kafka中,消息偏移量是什么?是文件中的索引吗?又是如何通过偏移量快速定位消息的?本文将深入探讨这些问题,帮助你更好地理解Kafka的偏移量机制。Kafka的偏移量是什么?Kafka中的偏移量实际上是每条消息的序号。它为每条消息提供了一个唯一的标识。通过偏移量,消费者可以......
  • C5GAME 游戏饰品交易平台借助 RocketMQ Serverless 保障千万级玩家流畅体验
    作者:邹星宇、刘尧C5GAME:安全便捷,国内领先的游戏饰品交易平台C5GAME游戏饰品交易平台(www.c5game.com)是国内领先的STEAM游戏饰品交易的服务平台,专注于CS:GO以及DOTA2等热门游戏装备C2C中介交易。自网站上线以来,C5GAME凭借其安全便捷的交易和流畅友好的体验,迅速在玩家......