优先级队列
C# 数据类型 queue----先进先出
RabbitMQ---队列-----默认也是先进先出~~
RabbitMQ 设置优先级----可以配置让 消费顺序,不按照 先进先出的默认规则;
给定的优先级---最终体现在消费者; 优先级越高,消费的时候,就优先消费。就在前面消费
案例:
设置 { "vip1", "hello2", "world3", "common4", "vip5" } 五个消息,带有vip的优先级设置最高,最后消费的顺序为 { "vip1", "vip5", "hello2", "world3", "common4" } 。
生产者
using (var channel = connection.CreateModel())
{
//声明交换机exchang
channel.ExchangeDeclare(exchange: "PriorityExchange", type: ExchangeType.Direct,
durable: true, autoDelete: false, arguments: null);
//声明队列queue---要使用优先级队列,必须在声明队列的时候,就要指定支持优先级的功能
//队列优先级最高为10,不加x-max-priority,计算发布时设置了消息的优先级也不会生效
channel.QueueDeclare(queue: "PriorityQueue", durable: true,
exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object>() { { "x-max-priority", 10 } });
//绑定exchange和queue
channel.QueueBind(queue: "PriorityQueue", exchange: "PriorityExchange",
routingKey: "Priority");
Console.WriteLine("生产者准备就绪....");
//一些待发送的消息
string[] msgs = { "vip1", "hello2", "world3", "common4", "vip5" };
//设置消息优先级
IBasicProperties props = channel.CreateBasicProperties();
foreach (string msg in msgs)
{
//vip开头的消息,优先级设置为9
if (msg.StartsWith("vip"))
{
props.Priority = 9;
channel.BasicPublish(exchange: "PriorityExchange",routingKey: "Priority",
basicProperties: props, body: Encoding.UTF8.GetBytes(msg));
}
//其他消息的优先级为1
else
{
props.Priority = 1;
channel.BasicPublish(exchange: "PriorityExchange",routingKey: "Priority",
basicProperties: props, body: Encoding.UTF8.GetBytes(msg));
}
}
}
消费者
using (var channel = connection.CreateModel())
{
#region EventingBasicConsumer
//定义消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"正常收到消息:{Encoding.UTF8.GetString(ea.Body.ToArray())}");
};
Console.WriteLine("优先级队列:消费者准备就绪....");
//处理消息
channel.BasicConsume(queue: "PriorityQueue", autoAck: true, consumer: consumer);
Console.ReadKey();
#endregion
}
队列和消息配置
队列有消息的数量限制吗?每条的大小有限制吗? 都是可以配置的。
案例:设置消息数量为20,发30条,结果只保留了前20条。
using (var channel = connection.CreateModel())
{
//创建队列
Dictionary<string, Object> map = new Dictionary<string, Object>();
//设置队列最大的条数20条
map.Add("x-max-length", 20);
////设置队列溢出方式保留前20条
map.Add("x-overflow", "reject-publish");
//毫秒为单位,队列自创建起,过去几秒后队列自动删除
map.Add("x-expires", 20000);
//队列上消息过期时间,应小于队列过期时间
map.Add("x-message-ttl", 8000);
channel.QueueDeclare("myQueue", false, false, false, map);
channel.ExchangeDeclare("MyExchages", type: ExchangeType.Direct,
durable: true, autoDelete: false);
//消息队列绑定消息交换机
channel.QueueBind("myQueue", "MyExchages", routingKey: "myroutingkeyTest");
for (int i = 0; i < 30; i++)
{
channel.BasicPublish(exchange: "MyExchages", routingKey: "myroutingkeyTest",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"测试队列消息长度==={i + 1}~"));
}
Console.Read();
}
标签:false,exchange,队列,RabbitMQ,消息,优先级,channel
From: https://www.cnblogs.com/nullcodeworld/p/18669521