首页 > 其他分享 >MQ--二

MQ--二

时间:2023-10-25 16:14:51浏览次数:26  
标签:异步 -- 发送 MQ 消息 commitlog 丢失

一、作用

        MQ作用很简单,就是削峰填谷。  

 使用MQ之后我们的链路变简单了,同事异步发送消息我们的整个系统的抗压能力也上升了。

二、性能对比      

           

 三、消息可靠性保证

        消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。

        【生产者丢失】

         生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。

         由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。

         异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。

        1、下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚。

        2、下单成功,直接返回客户端成功,异步发送MQ消息。

        3、MQ回调通知消息发送结果,对应更新数据库MQ发送状态。

        4、JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试。

        5、在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入。

                  

          一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。

【MQ丢失】

        如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。        

        比如RocketMQ:

        RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。

       虽然我们可以通过配置的方式来达到MQ本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。

       【消费者丢失】

         消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。

         RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。

         消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。(Kafka没有这些)

          

       3.1、一直消费失败导致消息积压如何处理

               1、消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费。

               2、如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息。

               3、处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状。

        3.2、如果消息积压达到磁盘上限,消息被删除了如何办?

                 最初,我们发送的消息记录是落库保存了的,而转发发送的数据也保存了,那么我们就可以通过这部分数据来找到丢失的那部分数据,再单独跑个脚本重发就可以了。如果转发的程序没有落库,那就和消费方的记录去做对比,只是过程会更艰难一点。

       3.3、RocketMQ实现原理

                RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成,它的架构原理是这样的:

  1. Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳
  2. Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
  3. Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费

         

       3.3、为何RocketMQ不使用Zookeeper作为注册中心

               有以下几点原因:            

  1. 根据CAP理论,同时最多只能满足两个点,而zookeeper满足的是CP,也就是说zookeeper并不能保证服务的可用性,zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  2. 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而zookeeper的写是不可扩展的,而zookeeper要解决这个问题只能通过划分领域,划分多个zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  3. 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  4. 消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

     3.4、Broker是如何保存数据的

              RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。

              Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。             

               

             CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G,写满后自动生成一个新的文件。    

                        

 

         由于同一个topic的消息并不是连续的存储在commitlog中,消费者如果直接从commitlog获取消息效率非常低,所以通过consumequeue保存commitlog中消息的偏移量的物理地址,这样消费者在消费的时候先从consumequeue中根据偏移量定位到具体的commitlog物理文件,然后根据一定的规则(offset和文件大小取模)在commitlog中快速定位。

                

        3.5、Master和Slave之间如何同步数据

         而消息在master和slave之间的同步是根据raft协议来进行的: 

  1. 在broker收到消息后,会被标记为uncommitted状态
  2. 然后会把消息发送给所有的slave
  3. slave在收到消息之后返回ack响应给master
  4. master在收到超过半数的ack之后,把消息标记为committed
  5. 发送committed消息给所有slave,slave也修改状态为committed

        3.6、RocketMQ为何速度快

           是因为使用了顺序存储、Page Cache 和 异步刷盘。           

  1. 我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多
  2. 写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache
  3. 最后由操作系统异步将缓存中的数据刷到磁盘

        3.7、事务、半事务消息

        事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。

       半事务消息就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。

        实现原理:      

  1. 生产者先发送一条半事务消息到MQ
  2. MQ收到消息后返回ack确认
  3. 生产者开始执行本地事务
  4. 如果事务执行成功发送commit到MQ,失败发送rollback
  5. 如果MQ长时间未收到生产者的二次确认commit或者rollback,MQ对生产者发起消息回查
  6. 生产者查询事务执行最终状态
  7. 根据查询事务状态再次提交二次确认

        最终,如果MQ收到二次确认commit,就可以把消息投递给消费者,反之如果是rollback,消息会保存下来并且在3天后被删除。

       

 

标签:异步,--,发送,MQ,消息,commitlog,丢失
From: https://www.cnblogs.com/xiaobaicai12138/p/17787100.html

相关文章

  • 《流畅的Python》 读书笔记 第5章 一等函数 20231025
    第5章一等函数第四章相对偏僻,但时间上一样要花我很久,就先跳过了,回头再补。而这个第5章节是非常重要的。只是最近工作有点忙,我读的越来越慢了~继续坚持吧。在Python中,所有函数都是一等对象,整数、字符串和字典都是一等对象(注:first-classobject)要成为一等对象,需要满足......
  • UE5 Common UI
    1.教程(需FQ)   IntroductiontoCommonUI|InsideUnreal 2.与UMG区别CommonActivatableWidgetStack同一个栈中的窗口一个激活时,其他的可以自动隐藏。通过 PushWidget<CommonActivatableWidget>()函数,可以激活指定类型的窗口。CommonActivatableWidget激活/取消......
  • 认识ant
    一、初体验:新建文件:build.xml<?xmlversion="1.0"?><projectname="helloWorld"><targetname="sayhelloWorld"><echomessage="hello,anna"/></target></project>运行:进入对应的目录,antsayHelloW......
  • Win10没有Hyper-V的解决方法
    windows10家庭版,程序添加的地方没有hyper-v,但是可以手动添加新建一个文本文件,将下面内容复制进去,然后修改后缀为.cmd执行这个文件需要权限,所以要右键,选择以管理员身份运行。pushd"%~dp0"dir/b%SystemRoot%\servicing\Packages\*Hyper-V*.mum>hyper-v.txtfor/f%%iin('......
  • docker 换源 apt-get update
    Docker换源在Docker中,我们可以通过修改/etc/apt/sources.list文件来更换软件源。以下是一个示例Dockerfile文件,展示了如何在构建镜像时更换软件源:FROMubuntu:latest#使用sed命令替换默认的软件源为阿里云镜像源RUNsed-i's/archive.ubuntu.com/mirrors.aliyun.c......
  • Vue 中 axios 的使用和跨域问题的解决
    一、内容:1.Axios是一个基于promise的HTTP库,类似于jQuery的ajax,用于http请求。axios并不是vue插件,所以不能使用Vue.use()。2.它既可以应用于浏览器端,也可以应用于node.js编写的服务端。3.Axios具有以下特性: (1)支持PromiseAPI。 (2)拦截请求与响应,比如:在请求前......
  • Spring异步线程池-TaskDecorator传递线程上下文
    TaskDecorator:TaskDecorator是一个执行回调方法的装饰器,主要应用于线程间传递数据,或者提供任务的监控/统计信息。从主线程拷贝数据到子线程,具体数据实际上是封装到threadlocal里面。实现方式:定义一个TaskDecorator,在线程池中设置使用这个TaskDecorator。注意......
  • 不要再写自然溢出哈希了
    众所周知,自然溢出哈希的模数是$2^{64}$,这不是个质数,就导致了它有被卡的风险,你怎么知道我今天就被卡了。而卡掉这种哈希的方法可以参考这篇博客。以后还是不要嫌麻烦,要手写哈希模数。......
  • Kotlin 协程Job 代替 Handler执行延时任务 带取消
    privatevalhandler=Handler(Looper.getMainLooper())varrunnable=Runnable{dismissProgressDialog()}......handler.postDelayed(runnable,(10*1000).toLong())......//取消任务handler.removeCallbacks(runnable)privatevarjob:Job?=null......job......
  • Linux 脚本加密解密工具
    1、系统自带工具gzexe加密方法:#gzexetest.sh此时在目录下就会产生一个test.sh~文件,该文件是源文件,test.sh是加密后的文件解密方法:#gzexe-dtest.sh在目录下就会产生一个test.sh~文件,该文件是源文件,test.sh是解密后的文件2、shc加密软件,unshc来解密软件#wgethtt......