首页 > 其他分享 >RabbitMQ如何保证消息可靠性?

RabbitMQ如何保证消息可靠性?

时间:2023-06-13 09:45:22浏览次数:32  
标签:producerMessage 可靠性 confirm ack RabbitMQ 保证 消息 MQ channel

RabbitMQ如何保证消息可靠性?

     为什么会提到MQ的消息可靠性?

     线上环境中,最近偶尔出现了这样的情况:业务执行过程中突然被中断了,后面的不再执行。问题定位到了MQ生产消息的位置 

    一、 如何保证消息的可靠性?

    一个消息会经历四个节点,只有保证这四个节点的可靠性才能保证整个系统的可靠性。

    1.  生产者发出后保证到达了MQ。
    2. MQ收到消息保证分发到了消息对应的Exchange。
    3. Exchange分发消息入队之后保证消息的持久性。
    4. 消费者收到消息之后保证消息的正确消费。
    经历了这四个保证,我们才能保证消息的可靠性,从而保证消息不会丢失。

    二、 生产者发送消息到MQ失败?
    我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失

    为了解决这个问题,RabbitMQ引入了事务机制和发送方确认机制(publisher confirm)
    1.  RabbitMQ 支持使用事务,在没有生产者确认之前,保证消息不丢失的唯一方法就是使用事务,是 channel 具有事务性,然后该 channel 上的消息都具有事务性。但是事务会将 RabbitMQ 的吞吐量降低到250倍,由于事务机制过于耗费性能,所以一般不用,这里我着重讲述发送方确认机制

    2. 发送方确认

    开启confirm:

    $channel->confirm_select();

    注册ack/nack回掉方法:

1 //ack callback function
2 $channel->set_ack_handler(function (AMQPMessage $message){
3 echo 'ack' . $message->getBody() . PHP_EOL;
4 });
5 //nack callback function
6 $channel->set_nack_handler(function (AMQPMessage $message){
7 echo 'ack' . $message->getBody() .PHP_EOL;
8 });

    设置ack/nack超时时间:
    $channel->wait_for_pending_acks_returns(5);//set wait time

    3.  下面摘取hyperf中amqp组件中生产消息的代码

    

 1     private function produceMessage(ProducerMessageInterface $producerMessage, bool $confirm = false, int $timeout = 5)
 2     {
 3         $result = false;
 4 
 5         $this->injectMessageProperty($producerMessage);
 6 
 7         $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
 8         $connection = $this->factory->getConnection($producerMessage->getPoolName());
 9 
10         try {
11             if ($confirm) {
12                 $channel = $connection->getConfirmChannel();
13             } else {
14                 $channel = $connection->getChannel();
15             }
16 
17             $channel->set_ack_handler(function () use (&$result) {
18                 $result = true;
19             });
20             $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
21             $channel->wait_for_pending_acks_returns($timeout);
22         } catch (\Throwable $exception) {
23             isset($channel) && $channel->close();
24             throw $exception;
25         }
26 
27         if ($confirm) {
28             $connection->releaseChannel($channel, true);
29         } else {
30             $result = true;
31             $connection->releaseChannel($channel);
32         }
33 
34         return $result;
35     }

 

 

 

   注意: 事务机制和普通confirm方式的吞吐量都很低,在实际生产环境中推荐使用批量confirm和异步confirm

   三、MQ接收失败或者路由失败


   到这一步基本都是一些很小概率的问题了,比如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来

  消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。

  五、消费者无法正常消费

标签:producerMessage,可靠性,confirm,ack,RabbitMQ,保证,消息,MQ,channel
From: https://www.cnblogs.com/hld123/p/17476614.html

相关文章

  • rabbitMq
    rabbitMq一、安装安装准备工具1.Eralng,底下连接已提供otp_win64_20.2.exe存放地址:otp_win64_25.3.2.exe2.rabbitmq,底下链接已提供rabbitmq-server-3.7.4.exe链接:rabbitmq-server-3.11.16.exe————————————————安装第一步:安装otp_win64_20.2.exe,一......
  • 黑马rabbitmq
    消息中间件面试题-参考回答面试官:RabbitMQ-如何保证消息不丢失候选人:嗯!我们当时MYSQL和Redis的数据双写一致性就是采用RabbitMQ实现同步的,这里面就要求了消息的高可用性,我们要保证消息的不丢失。主要从三个层面考虑第一个是开启生产者确认机制,确保生产者的消息能到达队列,如果......
  • 6-11|Python中保证两位小数的方法
    如果你在Python编程过程中需要对输出结果限制小数点位数,那么这篇文章将为你提供多种可靠的方法。一、使用round()函数round()函数是Python内置函数,用于四舍五入,也可以限制小数点位数。num=3.1415926result=round(num,2)print(result)输出结果为3.14。这种方法非常简单,但是需......
  • 使用双重检查锁定技术保证多线程中单例模式的线程安全
    使用双重检查锁定技术保证多线程中单例模式的线程安全前言单例模式是一种设计模式,保证一个类只有一个实例,并且在整个应用中共享。它适用于需要控制对共享资源的访问,例如数据库连接、配置文件或日志记录器。但是,在多线程环境下实现单例模式可能比较棘手。如果多个线程同时尝试创......
  • RabbitMQ 消息队列处理库存解锁及关单
    添加RMQ配置<!--使用高级消息队列来解决分布式事务一致性--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>application.properties#=====......
  • 基于k8s环境部署RabbitMQ集群
    一、前言本次案例是基于Statefulset部署RabbitMQ集群,同时基于StorageClass(存储类)来作为集群数据的持久化后端,因此在此之前已经部署好了NSF作为后端存储  1、创建服务命名空间Namespace#kubectlcreatensrabbitmq2、创建rbac权限ServiceAccount是kubernetesPod中的......
  • WinUI 保证程序单开
     参考代码如下:protectedstaticvoidOnlyOneApp(){stringappName=typeof(App).Namespace;_=newMutex(true,appName,outboolcreateNew);if(!createNew){ShowNotification("提......
  • RabbitMQ 延时队列
    分布式事务-最终一致性库存解锁逻辑一、Seata的不足Seata的AT模式是二阶段提交协议(2PC),第一阶段将本地事务直接提交,第二阶段想要回滚的时候,是通过回滚日志(日志表)做的反向补偿,数据库原来是多少又改了回来。Seata应用场景:后台管理系统,比如添加商品,优惠、库存、积分、会员要成功都成......
  • .Net全网最简RabbitMQ操作【强烈推荐】
    【前言】本文自1年前的1.0版本推出以来,已被业界大量科技公司采用。同时也得到了.Net圈内多位大佬的关注+推荐,文章也被多家顶级.Net/C#公众号转载。现在更新到了7.0版本,更好的服务各位.Neter。 【正文】支持.Net/.NetCore/.NetFramework,可以部署在Docker,Windows,Linux,......
  • 十一、Redis扩容如何保证哈希一致性
    横向扩容,保证哈希一致性一致性哈希将整个哈希值空间组织成一个虚拟的圆环,如假设某哈希函数H的值空间为0-2^32-1(即哈希值是一个32位无符号整形)下一步将各个服务器使用Hash进行一个哈希,具体可以选择服务器的ip或主机名作为关键字进行哈希,这样每台机器就能确定其在哈希环上的位置......