首页 > 其他分享 >3.【RabbitMQ实战】- 工作队列(Work Queue)

3.【RabbitMQ实战】- 工作队列(Work Queue)

时间:2023-04-12 22:56:03浏览次数:39  
标签:Console 消费者 队列 应答 Work RabbitMQ Queue 消息

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

轮询分发消息

image.png

封装获取Channel代码

using RabbitMQ.Client;

using System.Data.SqlTypes;

namespace rabbitmq.common
{
    /// <summary>
    /// 工具类
    /// </summary>
    public class RabbitmqUntils
    {
        /// <summary>
        /// 对列名称
        /// </summary>
        public static string QueueName { get; set; } = "test_hello";

        public static string WorkQueueName { get; set; } = "test_WorkQueue";

        /// <summary>
        /// 得到一个Channel  作为轻量级的Connection极大减少了操作系统建立TCPconnection的开销
        /// </summary>
        /// <returns></returns>
        public static IModel GetChannel()
        {
            //创建一个连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost";
            connectionFactory.UserName = "guest";
            connectionFactory.Password = "guest";
            var connection = connectionFactory.CreateConnection();
            var channel = connection.CreateModel();
            return channel;
        }

    }
}

生产者代码

using rabbitmq.common;

using RabbitMQ.Client;

using System.Text;

namespace _02.WorkQueue.Producer
{
    public class Program
    {
        static void Main(string[] args)
        {
            using var channel = RabbitmqUntils.GetChannel();
            /*
                *生成一个队列
                *1.队列名称
                *2.队列里面的消息是否持久化默认消息存储在内存中
                *3.该队列是否只供一个消费者进行消费是否进行共享true可以多个消费者消费
                *4.是否自动删除最后一个消费者端开连接以后该队列是否自动删除true自动删除*
                *5.其他参数
             */
            channel.QueueDeclare(RabbitmqUntils.WorkQueueName, false, false, false, null);//创建一个消息队列
            Console.WriteLine("请输入要发送的消息:");
            string message = Console.ReadLine();
            if (string.IsNullOrEmpty(message))
            {
                while (true)
                {
                    Console.WriteLine("请输入要发送的消息: {0}", message);
                }
            }
            else
            {

                while (true)
                {
                    var body = Encoding.UTF8.GetBytes(message);
                    /*
                    * 发送一个消息
                    * 1.发送到那个交换机
                    * 2.路由的 key 是哪个
                    * 3.其他的参数信息
                    * 4.发送消息的消息体
                    */
                    channel.BasicPublish(exchange: "", RabbitmqUntils.WorkQueueName, null, body); //开始传递
                    Console.WriteLine("已发送: {0}", message);
                    Console.WriteLine("请输入要发送的消息:");
                    message = Console.ReadLine();
                }
            }
           
            Console.ReadKey();
        }
    }
}

消费者代码

using rabbitmq.common;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

using System.Text;

namespace _02.WorkQueue.Consumer1
{
    public class Program
    {
        static void Main(string[] args)
        {
            if (args == null || args.Length == 0)
            {               
                Console.WriteLine("请输入客户端名称:");
                string clientName = Console.ReadLine();
                while (string.IsNullOrEmpty(clientName))
                {
                    Console.WriteLine("请输入客户端名称:");
                }
                args = new string[clientName.Length];
                args[0] = clientName;
            }

            Console.WriteLine($"{args[0]}:等待消费消息");
            using var channel = RabbitmqUntils.GetChannel();
            // 创建队列/交换机(如队列/交换机已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列/交换机而引发报错)
            channel.QueueDeclare(RabbitmqUntils.WorkQueueName, false, false, false, null);
            // 事件对象
            var consumer = new EventingBasicConsumer(channel);
            /*
           * 消费者消费消息
           * 1.消费哪个队列
           * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
           * 3.消费者未成功消费的回调
           */
            channel.BasicConsume(RabbitmqUntils.WorkQueueName, false, consumer);
            // 接收消息回调
            consumer.Received += (sender, e) =>
            {
                var body = e.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("接收消息: {0}", message);
            };

            // 取消消息回调
            consumer.Registered += (sender, e) =>
            {
                var body = e.ConsumerTags.ToArray();        
                Console.WriteLine("取消消息: {0}", body);
            };

            Console.WriteLine($"{args[0]}:消费消息完成!");           
            Console.ReadKey();
        }
    }
}

测试效果

11.gif

消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收 到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失 了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当 然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使 得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以 某种速率能够处理这些消息的情况下使用

手动应答

手动应答的三个方法

  • 用于肯定应答 RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

image.png

  • 用于否定应答

image.png

  • 用于否定应答 与 Channel.BasicNack 相比少一个 multiple 参数不处理该消息了直接拒绝,可以将其丢弃了

image.png
手动应答的好处是可以批量应答并且减少网络拥堵

multiple 参数

true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是8 那么此时
5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
image.png

消息重新入队列

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

手动应答的关键代码

image.png

手动应答测试效果

image.png
image.png
image.png
image.png

消息重新入队列测试效果

Consumer1 休眠 1s
Consumer1 休眠 10s
发送消息 11,22,33,44
image.png
Consumer1 接受消息 11,33
image.png
Consumer2 接受消息 22 此时杀死 Consumer2进程可看到消息44被Consumer1消费
image.png
image.png

持久化

保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化

队列持久化

申明队列时设置参数durable:true
image.png
注意:可能会遇到的错误
如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
image.png

显示队列持久化
image.png

消息持久化

image.png

不公平分发

最开始RabbitMQ分发消息采用的是轮询分发,但是有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发

Consumer1,Consumer2 都配置如下代码
image.png

image.png

image.png

预取值

限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题
该值定义通道上允许的未确认消息的最大数量
虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)

配置 prefetchCount > 1 即可
image.png

标签:Console,消费者,队列,应答,Work,RabbitMQ,Queue,消息
From: https://www.cnblogs.com/imtudou/p/17311530.html

相关文章

  • 10.【RabbitMQ实战】- RabbitMQ集群
    搭建集群镜像队列默认情况下node1创建的队列不会同步到node2上此时如果已经发送到了一条消息到node1上的队列,该队列并不会备份到node2上此时node1宕机并重启,该消息会丢失,配置对应策略可保证集群上队列备份并且消息不丢失负载均衡生产者给node1发消息,此时node1宕机,但是......
  • 9.【RabbitMQ实战】- RabbitMQ其他知识点
    幂等性MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过在海量订单生成的业务高峰期,生产端有可能就会重复发......
  • SNMP(Simple Network Management Protocol)——简单网络管理协议
    SNMP(SimpleNetworkManagementProtocol)——简单网络管理协议 目录一、SNMP简介1.背景2.SNMP管理的网络架构二、SNMPMIB1.SNMPMIB简介2.MIB分类(1)公有MIB(2)私有MIB3.被管理设备的基本属性(1)对象表示符(2)对象的状态(3)对象的访问权限(4)对象的数据类型三、SNMPv1工作......
  • RobotFramework 简介
    一、RobotFramework(一) Introduction RobotFrameworkisaPython-based,extensiblekeyword-drivenautomationframeworkforacceptancetesting,acceptancetestdrivendevelopment(ATDD),behaviordrivendevelopment(BDD)androboticprocessautomation(RPA)......
  • win10、win2016离线安装 .netframework3.5
    下载地址:(网上收集的)   https://pan.baidu.com/s/1O24nLgXhehHveae25p9SLg密码:amgu   https://url93.ctfile.com/f/29519493-531656763-281351(访问密码:8843)   https://soft.3dmgame.com/down/205311.html下载NetFx3.cab后将其放于C盘WINDOWS文件夹下(C:\Windows)点击“......
  • Collection - PriorityQueue源码解析
    前面以JavaArrayDeque为例讲解了Stack和Queue,其实还有一种特殊的队列叫做PriorityQueue,即优先队列。优先队列的作用是能保证每次取出的元素都是队列中权值最小的(Java的优先队列每次取最小元素,C++的优先队列每次取最大元素)。这里牵涉到了大小关系,元素大小的评判可以通过元素本身......
  • 【Java 线程池】【四】ThreadPoolExector中的Worker工作者原理
    1 前言上一节我们看了ThreadPoolExecutor线程池的execute内部方法流程,addWorker方法流程,看到Worker是线程池内部的工作者,每个Worker内部持有一个线程,addWorker方法创建了一个Worker工作者,并且放入HashSet的容器中,那么这节我们就来看看Worker是如何工作的。2  内部属性我们......
  • IWorkflowBlueprint 蓝图构建器
    前言:学习的过程总是很奇妙....下面是我对Elsa工作流Builder的一个理解,我觉得用思维导图很适合做概括性的描述。如有错误,望大伙们指导......
  • Python queue (队列)
     importthreadingimporttimeimportqueuedefproducer():count=1while1:q.put('No.%i'%count)print('ProducerputNo.%i'%count)time.sleep(1)count+=1defcustomer(name):whi......
  • Raspberry Pi crontab not work bug All In One
    RaspberryPicrontabnotworkbugAllInOneRaspberryPicrontab不执行bug???pi@raspberrypi:~/Desktop$sudocrontab-epi@raspberrypi:~/Desktop$sudocrontab-l#Editthisfiletointroducetaskstoberunbycron.##Eachtasktorunhastobe......