首页 > 其他分享 >体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景

时间:2023-04-18 12:40:51浏览次数:44  
标签:优先级 队列 Rabbitmq hello var dic 面对现实 channel

         说到队列的话,大家一定不会陌生,但是扯到优先级队列的话,还是有一部分同学是不清楚的,可能是不知道怎么去实现吧,其实呢,,,这东西已

经烂大街了。。。很简单,用“堆”去实现的,在我们系统中有一个订单催付的场景,我们客户的客户在tmall,taobao下的订单,taobao会及时将订单推送给

我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客

户的对吧,比如像施华蔻,百雀林这样大商家一年起码能够给我们贡献几百万,所以理应当然,他们的订单必须得到优先处理,而曾今我们的后端系统是使

用redis来存放的定时轮询,大家都知道redis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用rabbitmq进行

改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级,好了,废话不多说,我们来看看如何去设置。

 

一:优先级队列

  既然是优先级队列,那么必然要在Queue上开一个口子贴上一个优先级的标签,为了看怎么设置,我们用一下rabbitmq的监控UI,看看这个里面是如何

手工的创建优先级队列。

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景_其他

      从这个图中可以看到在Arguments栏中有特别多的小属性,其中有一项就是"Maximum priority",这项的意思就是说可以定义优先级的最大值,其实

想想也是,不可能我们定义的优先级是一个非常大的数字,比如int.MaxValue,大多情况下都是10以内的数字就可以了,再或者我们曾今接触过的 MSMQ,

它的优先级只是一些枚举值,什么High,Normal,Low,不知道大家可否记得? 下面来看下代码中该如何实现呢???

 

1. 在Queue上附加优先级属性

Dictionary<string, object> dic = new Dictionary<string, object>();
dic.Add("x-max-priority", 20);
channel.QueueDeclare(queue: "hello",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: dic);

 

 上面的代码做了一个简单的队列声明,queuename="hello",持久化,排外。。。然后把"x-max-priority"塞入到字典中作为arguments参数,看起来还

是非常简单吧~~~

 

2. 在Message上指定优先级属性

 var properties = channel.CreateBasicProperties();
 properties.Priority = 1;
 channel.BasicPublish(exchange: "",
                      routingKey: "hello",
                      basicProperties: null,
                      body: body);

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景_其他_02

 

通过上面的代码可以看到,在Message上设置优先级,我是通过在channel通道上设置Priority属性,之后塞到basicProperties中就可以了,好了,有上面这两

个基础之后,下面就可以开始测试了,准备向rabbitmq推送10条记录,其中第5条的优先级最高,所以应该首先就print出来,如下图:

static void Main(string[] args)
{
    var sb = new StringBuilder();

    for (int i = 0; i < 11; i++)
    {
        sb.Append(i);
    }

    var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" };

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "mydirect", type: ExchangeType.Direct, durable: true);

            Dictionary<string, object> dic = new Dictionary<string, object>();
            dic.Add("x-max-priority", 20);

            for (int i = 0; i < 10; i++)
            {
                channel.QueueDeclare(queue: "hello",
                                            durable: true,
                                            exclusive: false,
                                            autoDelete: false,
                                            arguments: dic);

                string message = string.Format("{0} {1}", i, sb.ToString());
                var body = Encoding.UTF8.GetBytes(message);

                var properties = channel.CreateBasicProperties();

                properties.Priority = (i == 5) ? (byte)10 : (byte)i;

                channel.BasicPublish(exchange: "",
                                     routingKey: "hello",
                                     basicProperties: properties,
                                     body: body);

                Console.WriteLine(" [x] Sent {0}", i);
            }
        }
    }

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景_优先级_03

 

图中可以看到10条消息我都送到rabbitmq中去了,接下来打开consume端,来看看所谓的index=5 是否第一个送出来??

static void Main(string[] args)
{
    for (int m = 0; m < int.MaxValue; m++)
    {
        var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" };

        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            var result = channel.BasicGet("hello", true);

            if (result != null)
            {
                var str = Encoding.UTF8.GetString(result.Body);
                Console.WriteLine("{0}  消息内容 {1}", m, str);
                System.Threading.Thread.Sleep(1);
            }
        }
    }
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景_优先级_04

 

一切都是这么的完美,接下来为了进行可视化验证,你可以在WebUI中观察观察,可以发现在Queue上面多了一个 Pri 标记,有意思吧。

 

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景_优先级_05

 

好了,这么重要的功能,是不是已经让你足够兴奋啦, 希望大家能够好好的在实际场景中运用吧~~~

 

标签:优先级,队列,Rabbitmq,hello,var,dic,面对现实,channel
From: https://blog.51cto.com/u_15353947/6202799

相关文章

  • 搭建高可用的rabbitmq集群 + Mirror Queue + 使用C#驱动连接
    我们知道rabbitmq是一个专业的MQ产品,而且它也是一个严格遵守AMQP协议的玩意,但是要想骚,一定需要拿出高可用的东西出来,这不本篇就跟大家说一下cluster的概念,rabbitmq是erlang写的一个成品,所以知道如何构建erlang的node集群就ok了,他需要一个统一的cookie机制......
  • centos7安装RabbitMQ教程
    转载自:https://www.cnblogs.com/qiansm/p/15241295.html==================== centos7安装RabbitMQ教程erlang版本:22.2.1RabbitMQ版本:3.8.21、安装Erlang1.1安装依赖yuminstallepel-releaseyuminstallgccgcc-c++unixODBCunixODBC-develwxBasewxGTKSDLwxGTK-g......
  • Java运算符优先级分析
    packagecom.zt.javase01;publicclassTest2{publicstaticvoidmain(String[]args){intn=10;n+=(n++)+(++n);System.out.println(n);//输出32/*(n++)(++n)从左到右执行因此(n+......
  • springboot项目打成jar包后 ,配置文件加载的优先级顺序
    SpringBoot会按照以下顺序来加载配置文件:1、内置默认值:SpringBoot会首先加载内置的默认值,这些默认值定义在SpringBoot的代码中,例如,内置的默认端口号为8080。2、应用级别的配置文件:SpringBoot会从以下位置加载应用级别的配置文件,这些位置按照优先级逐一检查:当前目录下的/c......
  • Java连接RabbitMQ报错:An unexpected connection driver error occured(偶尔能连上)
    1、查看rabbitMq的状态。输入命令:rabbitmqctlstatus,发现没有报错,但是rabbit中的host是root,并不是ip地址,所以连接不上。 2、运行命令:echo【ip地址】root>>/etc/hosts。将RabbitMQ服务所在的IP地址添加到/etc/hosts中。 ......
  • 如何保证RabbitMQ消息不重复消费
    如何保证RabbitMQ消息不重复消费消息中间件是无法保证消息重复消费,所以只能从业务上来保证消费不重复消费,在消费端保证接口的幂等性。什么是幂等性幂等性原本是数学上的概念,用在接口上就可以理解为:同一个接口,多次发出同一个请求,必须保证操作只执行一次。调用接口发生异常并且......
  • Linux内核进程管理进程优先级
    **前言:**进程优先级实际上是系统对进程重要性的一个客观评价。根据这个评价的结果来为进程分配不同的系统资源,这个资源包括内存资源和CPU资源。为了保证“公平公正”的评价每个进程,Google工程师为此设计了一套评价系统。为什么要有进程优先级?这似乎不用过多的解释,毕竟自从多任......
  • nginx location配置的优先级
        location正则写法location=/{#精确匹配/,主机名后面不能带任何字符串[configurationA]}location/{#因为所有的地址都以/开头,所以这条规则将匹配到所有请求#但是正则和最长字符串会优先匹配[configurationB]}location/documents......
  • 4.【RabbitMQ实战】- 发布确认
    生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息......
  • 3.【RabbitMQ实战】- 工作队列(Work Queue)
    工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。轮询分发消息......