首页 > 其他分享 >4.【RabbitMQ实战】- 发布确认

4.【RabbitMQ实战】- 发布确认

时间:2023-04-12 22:56:48浏览次数:32  
标签:实战 false 确认 RabbitMQ 发布 消息 var channel

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消
息都将会被指派一个唯一的 ID
(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会
发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产
者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的
multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道
返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方
法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,
生产者应用程序同样可以在回调方法中处理该 nack 消息

  • 设置队列持久化
  • 设置消息持久化
  • 发布确认(RabbitMQ 告诉生产者已经确定持久化了)

只有设置了上述三个条件才能保证消息一定不会丢失

image.png

发布确认三种模式

单个确认

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。 这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了

image.png

批量确认

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

image.png

异步发布确认

image.png

处理异步未确认的消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

发布确认代码

using rabbitmq.common;

using RabbitMQ.Client.Events;

using System;
using System.Collections.Concurrent;
using System.Text;

namespace PublishConfirm.Producer
{
    public class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("发布确认");
            //SinglePublishConfirm();  //单个发布确认耗时:00:00:00.6743354
            //MulitPublishConfirm(); //批量发布确认耗时:00:00:00.4526167
            PublishConfirmAsync(); // 异步发布确认:00:00:00.3534272
        }

        /// <summary>
        /// 单个发布确认
        /// </summary>
        public static void SinglePublishConfirm()
        {
            var begin = DateTime.Now;
            using var channel = RabbitmqUntils.GetChannel();
            string queueName = Guid.NewGuid().ToString();
            channel.QueueDeclare(queue:queueName,durable:false,exclusive:false,autoDelete:false,null);
            // 开启发布确认
            channel.ConfirmSelect();
            var begintime = DateTime.Now;
            for (int i = 0; i < 100; i++)
            {                                                   
                var body = Encoding.UTF8.GetBytes($"{i}");
                channel.BasicPublish(exchange: "", queueName, false, null, body);
                if (channel.WaitForConfirms())// 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false
                {
                    Console.WriteLine($"消息发送成功:{i}");
                }
                else
                {
                    //服务端返回 false 或超时时间内未返回,生产者可以消息重发 需添加补救措施
                }

            }
            var end = DateTime.Now;
            Console.WriteLine($"单个发布确认耗时:{end - begin}");
        }

        /// <summary>
        /// 批量发布确认
        /// </summary>
        public static void MulitPublishConfirm()
        {
            var begin = DateTime.Now;
            using var channel = RabbitmqUntils.GetChannel();
            string queueName = Guid.NewGuid().ToString();
            channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, null);
            // 开启发布确认
            channel.ConfirmSelect();
            var begintime = DateTime.Now;
            for (int i = 0; i < 1000; i++)
            {
                var body = Encoding.UTF8.GetBytes($"{i}");
                channel.BasicPublish(exchange: "", queueName, false, null, body);

                // 批量确认:100条确认异常
                if (i%100 == 0)
                {

                    if (channel.WaitForConfirms())// 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false
                    {
                        Console.WriteLine($"消息发送成功:{i}");
                    }
                    else
                    {
                        //服务端返回 false 或超时时间内未返回,生产者可以消息重发 需添加补救措施
                    }
                }
               

            }
            var end = DateTime.Now;
            Console.WriteLine($"批量发布确认耗时:{end - begin}");
        }

        /// <summary>
        /// 异步发布确认
        /// </summary>
        public static void PublishConfirmAsync()
        {
            var begin = DateTime.Now;
            using var channel = RabbitmqUntils.GetChannel();
            string queueName = Guid.NewGuid().ToString();
            channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, null);

            // 开启发布确认
            channel.ConfirmSelect();
            ConcurrentDictionary<ulong, string> confirmDic = new ConcurrentDictionary<ulong, string>();
            var begintime = DateTime.Now;
            for (int i = 1; i <= 1000; i++)
            {
                string msg = $"msg_{i}";
                var body = Encoding.UTF8.GetBytes(msg);
                confirmDic.TryAdd(channel.NextPublishSeqNo, msg);
                channel.BasicPublish(exchange: "", queueName, false, null, body);
            }

            // 监听确认的消息
            channel.BasicAcks += (sender, e) =>
            {
                Console.WriteLine($"监听确认的消息的序列号:{e.DeliveryTag}");
                confirmDic.Remove(e.DeliveryTag, out string body);
                Console.WriteLine($"删除确认的消息:{body}");
            };

            // 监听未确认的消息
            channel.BasicNacks += (sender, e) =>
            {
                Console.WriteLine($"监听未确认的消息序列号:{e.DeliveryTag}");
                confirmDic.TryGetValue(e.DeliveryTag, out string body);
                Console.WriteLine($"监听未确认的消息:{body}");

            };
            var end = DateTime.Now;
            Console.WriteLine($"异步发布确认:{end - begin}");
        }
    }
}

标签:实战,false,确认,RabbitMQ,发布,消息,var,channel
From: https://www.cnblogs.com/imtudou/p/17311528.html

相关文章

  • 3.【RabbitMQ实战】- 工作队列(Work Queue)
    工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。轮询分发消息......
  • 10.【RabbitMQ实战】- RabbitMQ集群
    搭建集群镜像队列默认情况下node1创建的队列不会同步到node2上此时如果已经发送到了一条消息到node1上的队列,该队列并不会备份到node2上此时node1宕机并重启,该消息会丢失,配置对应策略可保证集群上队列备份并且消息不丢失负载均衡生产者给node1发消息,此时node1宕机,但是......
  • 9.【RabbitMQ实战】- RabbitMQ其他知识点
    幂等性MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过在海量订单生成的业务高峰期,生产端有可能就会重复发......
  • Go微服务框架go-kratos实战学习08:负载均衡基本使用
    微服务框架go-kratos中负载均衡使用一、介绍在前面这篇文章负载均衡和它的算法介绍,讲了什么是负载均衡以及作用、算法介绍。go-kratos的负载均衡主要接口是Selector,它是一个可插拔的设计。因为它设计的都是接口,只要实现了接口就实现了负载均衡。go-kratos在目录下提供了......
  • Android Kotlin实战之高阶使用泛型扩展协程懒加载详解
    前言:通过前面几篇文章,我们已基本掌握kotlin的基本写法与使用,但是在开发过程中,以及一些开源的API还是会出现大家模式的高阶玩法以及问题,如何避免,接下来讲解针对原来的文章进行一些扩展,解决大家在工作中遇到的问题,如何去解决如果还有人不了解kotlin,可以查看我的基础篇kotlin。Android......
  • testng+HttpClient项目实战(一)
    参考文档:https://www.cnblogs.com/yingyingja/p/9973960.htmlhttps://www.cnblogs.com/yingyingja/p/9974181.htmlhttps://www.cnblogs.com/yingyingja/p/9974183.htmlhttps://www.cnblogs.com/yingyingja/p/9974186.htmlhttps://www.cnblogs.com/yingyingja/p/9974189.html......
  • VMware ESXi6.7服务器,确认用户名密码正确,但是web管理界面无法正常登陆
    VMwareESXi6.7服务器,确认用户名密码正确,但是web管理界面无法正常登陆。修改完成密码后,连接显示器键盘,操作没有问题。但是远程通过web管理界面访问时,提示:“由于用户名火密码不正确,无法完成登陆”确认用户名密码没错,连接显示器键盘,操作没有问题。 需要restartmanagementage......
  • (之前的项目复习)我的Java项目实战--校园餐饮商户外卖系统03
    开发笔记三分类管理业务开发公共字段自动填充问题分析前面我们已经完成了后台系统的员工管理功能开发,在新增员工时需要设置创建时间、创建人、修改时间、修改人等字段,在编辑员工时需要设置修改时间和修改人等字段。这些字段属于公共字段,也就是很多表中都有这些字段,如下:能......
  • 学习笔记396—自定义Docker镜像推送到Docker Hub实战
    自定义Docker镜像推送到DockerHub实战云原生探索的必经之路—容器化,而容器化目前最主流的技术莫过于Docker了,因为之前也大量的输出过Docker相关的技术博客,如果感兴趣的话可以直接访问专栏:​​《探索云原生》​​,按需学习哦。这篇文章还是从Docker入手,从0开始讲述下如何将自己的D......
  • ansible模块实战练习
    ansible模块实战练习Cloud研习社 Cloud研习社 2023-04-0110:55 发表于山东收录于合集#一站式教程220个#计算机185个#ansible22个#云计算196个#linux209个教程每周二、四、六更新今天我们练习一下ansible模块的用法,也算是一个小复习:准备三个节点:其中一个作......