首页 > 其他分享 >Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失

Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失

时间:2022-12-23 15:37:05浏览次数:47  
标签:11 队列 确认 RabbitMQ Day10 消息 channel 丢失


保证RabbitMQ全链路数据完全不丢失

一. 消息可靠性概述

1. 消息生命周期过程

一条消息从创建到最终被消费掉,也就是从生产端到消费端最终被消费掉大致上要经过3个步骤:

  • 1️⃣.生产端发送消息到RabbitMQ;
  • 2️⃣.RabbitMQ发送消息到消费端;
  • 3️⃣.消费端消费掉这条消息.

这3个步骤中的每一步都有可能导致消息丢失,消息丢失不可怕,可怕的是丢失了我们还不知道,所以要有一些措施来保证系统的可靠性.这里的可靠并不是一定就100%不丢失,磁盘损坏,机房爆炸等都能导致数据丢失,当然发生概率都极小,能做到99.999999%消息不丢失,就是可靠的.下面来具体分析一下问题以及解决方案.

二. 生产端的消息可靠性保障

生产端的消息可靠性保障,即生产端要确保把消息正确投递到RabbitMQ中.生产端投递时消息丢失的原因有很多,比如消息在网络传输的过程中发生网络故障,或者消息投递到RabbitMQ时RabbitMQ挂了,消息都可能丢失,而我们根本不知道发生了什么.针对以上情况,RabbitMQ本身提供了一些机制.

1. 事务消息机制

事务消息机制就是说,在发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事
务(channel.txCommit()).
然而缺点就是会降低RabbitMQ的吞吐量.

事务消息机制由于会严重降低性能,所以一般不采用这种方式,经常采用另一种轻量级的解决方案——confirm消息确认机制.

2. confirm消息确认机制

所谓的confirm消息确认机制就是指生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则就意味着这条消息可能已经丢失了,需要生产端重新发送消息.


Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失_消息队列

通过下面这句代码来开启确认模式:

channel.confirmSelect();// 开启发送方确认模式

然后异步监听确认和未确认的消息:

channel.addConfirmListener(new ConfirmListener() {

//消息正确到达broker
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("已收到消息");
//做一些其他处理
}

//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
//做一些其他处理,比如消息重发等
}
});

一旦channel进入了confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的消息队列之后,RabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果RabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作.

这样就可以让生产端感知到消息是否被成功的投递到RabbitMQ中了.但是这样还不够,可能还会有一些极端情况需要我们采取其他的机制来进行处理.

3. 消息持久化

一般情况下,RabbitMQ收到消息后会将这个消息暂时存在内存中.那么这就可能会有个问题,如果RabbitMQ挂了,然后重启后数据就会丢失.所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复.那如何持久化呢?

message消息到达RabbitMQ后先是被发送到exchange交换机中,然后路由给queue队列,最后才发送给消费端.


Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失_RabbitMQ_02

所以需要给exchange、queue和message都进行持久化.

这个持久化配置可以和confirm机制配合使用,我们可以在消息持久化到磁盘后,再给生产者发送一个Ack信号.这样,如果消息持久化到磁盘之前,RabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发.

3.1 exchange持久化

//第三个参数true表示这个exchange持久化

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

3.2 queue持久化

//第二个参数true表示这个queue持久化

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

3.3 message持久化

//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化

channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

这样即使RabbitMQ收到消息后挂了,在重启后也会自行恢复消息.

至此,RabbitMQ自身提供的几种可靠性机制都介绍完了,但这样还不足以保证消息100%可靠的投递RabbitMQ中.上面我们也提到了可能会有极端情况发生,比如RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ就挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,就不好做接下来的处理.


Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失_发送消息_03

所以除了RabbitMQ提供的一些机制外,我们自己也要做一些消息补偿机制,以应对一些极端情况.接下来介绍其中的一种补偿解决方案——消息入库.

4. 消息入库

消息入库,顾名思义就是将要发送的消息保存到数据库中.

首先发送消息前先将消息保存到数据库中,并且设置一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认消息;在生产端收到确认后将status设为1,表示RabbitMQ已收到消息.

这里有可能会出现上面说的两种情况,所以生产端这边设置一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status=0的消息,所以设置一个固定时间)还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以设置一个最大重发次数,超过就做另外的处理.


Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失_消息可靠性_04

这样消息就可以可靠的投递到RabbitMQ中了,而生产端也可以感知到了.

三. 消费端消息不丢失

通过上面的几种机制,基本已经可以让生产端的消息100%可靠的投递到RabbitMQ了,但是要确保消费端消息不丢失,可以启用手动确认模式来解决这个问题.那么接下来我们看看消费端是如何让消费端不丢失消息的.

1. 消费端消息丢失的几种情况

默认情况下,以下3种情况会导致消息丢失:

  • 1️⃣.在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;
  • 2️⃣.在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;
  • 3️⃣.消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失.


Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失_消息可靠性_05

其实,上述3种情况导致消息丢失归根结底是因为RabbitMQ的自动ack机制,也就是默认情况下,RabbitMQ在消息发出后就立即将这条消息删除了,而不管消费端是否接收到,是否处理完,导致消费端消息丢失后RabbitMQ自己已经没有这条消息了.


Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失_消息队列_06

所以我们需要将自动ack机制改为手动ack机制.

2. 消息确认模式

  • 1️⃣. 自动确认模式: 如果消费者挂掉,待ack的消息会重回到队列中;如果消费者抛出异常,消息会不断的被重发,直到处理成功,不会丢失消息,即便服务挂掉,没有处理完成的消息也会重回队列,但是异常会让消息不断重试.
  • 2️⃣. 手动确认模式: 在手动确认模式中,一定要注意不能忘记应答消息,因为对于RabbitMQ来说如果处理消息没有超时,只要不应答消息,他就会认为仍在正常处理消息,导致消息队列出现阻塞,影响业务执行.如果消费者来不及处理就死掉时,没有响应ack时,消费者项目在重启后会重复发送一条信息给其他消费者.我们可以选择丢弃消息,这其实也是一种应答.
  • 3️⃣. 不确认模式acknowledge="none": 不使用确认机制,只要消息发送完成就会立即在队列中移除掉该消息,无论客户端异常还是断开,不会重发消息.

3. 消费端手动确认消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//接收到消息,做处理
//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
//出错处理,这里可以让消息重回队列重新发送或直接丢弃消息.
//ack返回false,并重新回到队列,api里面解释得很清楚
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
//拒绝消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
};

注意:

在出错处理时,我们想让消息重回消息队列,但是经过开发中的实际测试,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行回滚,如此反复进行.这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行.对于消息回滚到消息队列,我们希望比较理想的方式是让出现异常的消息到达消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行.因此我们采取的解决方案是:将消息进行应答,这时消息队列会删除该消息,同时我们再次发送该消息到消息队列,这样就实现了错误的消息进入到消息队列尾部.

//手动进行应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

//重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(new Object()));

对于RabbitMQ服务端而言,队列中的消息分成了两个部分: 一部分是等待投递给消费端的消息;另一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息.如果RabbitMQ一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机(RabbitMQ会自己感知到),则RabbitMQ会安排该消息重新进入队列(放在队列头部),等待投递给下一个消费者,此时也有可能还是原来的那个消费端,当然消费端需要确保幂等性.

标签:11,队列,确认,RabbitMQ,Day10,消息,channel,丢失
From: https://blog.51cto.com/u_7044146/5965784

相关文章

  • Day10_16_消息队列之RocketMQ
    RocketMQ一.RocketMQ简介1.概述消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:削峰填谷: 主要解决瞬时写压力大......
  • Day11_01_Redis教程之非关系型数据库
    非关系型数据库简介一.非关系型数据库1.NoSQL简介NoSQL,泛指非关系型的数据库,NoSQL即Not-OnlySQL,它可以作为关系型数据库的良好补充.随着互联网Web2.0网站的兴起,非关系型......
  • Day11_02_Redis教程之Redis简介
    Redis简介一.Redis是什么?1.Redis开发背景2008年,意大利的一家创业公司Merzia推出了一款基于MySQL的网站实时统计系统LLOOGG,然而没过多久该公司的创始人SalvatoreSanf......
  • Day11_03_Redis教程之Redis服务器客户端安装配置及配置文件详解
    Redis服务器客户端安装配置及配置文件详解一.Redis的安装在ubuntu18.04下,可以直接通过命令安装.1.更新系统环境$sudoapt-getupdate#更新软件列表$sudoapt-getupgra......
  • Day11_04_Redis教程之关闭Redis客户端
    关闭Redis客户端一.ClientKill命令RedisClientKill命令用于关闭客户端连接.redisclientkill命令基本语法:redis127.0.0.1:6379>CLIENTKILLip:port返回值:成功关......
  • 1111
    一、选题的背景二、大数据分析设计方案三、数据分析步骤importpandasaspdimportmatplotlib.pyplotaspltplt.rcParams['font.sans-serif']=['SimHei']#用来正常......
  • 11:高级部分-MySQL
    (目录)(一)view视图1.开场高级部分不属于实习内容,已经超过了实习范围尤其是培训机构不会讲这些,主要是经验规范之谈2.view视图创建、使用以及作用视图主要负责筛选,有......
  • 力扣每日一题2022.12.23---2011. 执行操作后的变量值
    存在一种仅支持4种操作和1个变量X的编程语言:   ++X和X++使变量X的值加1   --X和X--使变量X的值减1最初,X的值是0给你一个字符串数组operati......
  • 2005年11月16日
       从昨天开始整理所有软件测试教材的相关PPT和教案、案例了,这工作看起来简单,却是十分的辛苦,每一页PPT都要看一边,从布局到内容,一天检查500页PPT后,第二天眼睛充满浆糊......
  • 远程服务器返回错误: (411) 所需的长度。
    最近在项目上遇到了问题是:411错误,出现这个错误可能是再请求POST的时候,若没有参数的情况下,需要把 HttpWebRequest的长度设置为0,req.ContentLength=0;publicstringHttpPo......