首页 > 其他分享 >RabbitMq队列优先级

RabbitMq队列优先级

时间:2022-11-17 09:44:42浏览次数:58  
标签:优先级 队列 factory RabbitMq 消息 设置

RabbitMq队列优先级

使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

添加优先级队列

  1. 声明队列时增加一个参数
// 官方允许是 0-255 之间 此处设置10 允许优化级范围0-10 不要设置过大 浪费CPU与内存
arguments.put("x-max-priority", 10);
  1. 发布消息时设置优先级 不能高于声明队列时设置的参数
// 设置优先级, 不得高于 x-max-priority 设置的值
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
        .builder().priority(5).build();

实战

消息生产者

/**
 * 优先级生产者
 */
public static void priorityProducer() throws Exception {
    // 创建一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 工厂IP 连接RabbitMq的队列
    factory.setHost("localhost");
    // 用户名
    factory.setUsername("zjh");
    // 密码
    factory.setPassword("zjh");

    // 创建连接
    Connection connection = factory.newConnection();
    // 获取信道
    Channel channel = connection.createChannel();
    /*
        生成一个队列
        1.队列名称
        2.队列里面的消息是否持久化,默认情况消息存储在内存中
        3.该队列是否只提供一个消费者进行消费 是否进行消息共享,true可以多个消费者消费
        4.是否自动删除 最后一个消费者断开连接以后 该队列是否自动删除 true自动删除 false不自动删除
        5.其他参数
     */
    Map<String, Object> arguments = new HashMap<>(8);
    // 官方允许是 0-255 之间 此处设置10 允许优化级范围0-10 不要设置过大 浪费CPU与内存
    arguments.put("x-max-priority", 10);
    channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);

    // 设置循环次数
    long length = 11;
    for (int i = 1; i < length; i++) {
        String message = "info" + i;
        // 设置等于五的优先级高
        if (i == 5){
            // 设置优先级, 不得高于 x-max-priority 设置的值
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                    .builder().priority(5).build();
            // 发送消息
            channel.basicPublish("", QUEUE_NAME, basicProperties, message.getBytes());
        } else {
            // 发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        }
    }
    System.out.println("消息发送完毕");
}

消息消费者


/**
 * @author zjh
 */
public class Consumer {

    /**
     * 队列名称
     */
    public static final String QUEUE_NAME = "hello";

    /**
     * 接收消息
     */
    public static void main(String[] args) throws Exception {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂IP 连接RabbitMq的队列
        factory.setHost("localhost");
        // 用户名
        factory.setUsername("zjh");
        // 密码
        factory.setPassword("zjh");

        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();

        // 声明接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody()));

        // 声明取消接收消息回调
        CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费被中断");

        /*
            消费者消费信息
            1.消费哪个队列
            2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
            3.消费者未成功消费的回调
            4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

测试步骤

  1. 确保 RabbitMq Management中没有该队列,有的话自行删除

  2. 先启动 生产者,不要启动消费者,因为如果启动了消费者就会一直监听着队列,只要队列有消息就会消费掉就达不到我们想要的效果,所以我们先发送消息到队列中,确保消息已经存在了在启动消费者。

  1. 可以看到已经有十条消息了,这时在启动消费者,就会发现我们设置的info5优先级高,就会提前消费。如图所示:

标签:优先级,队列,factory,RabbitMq,消息,设置
From: https://www.cnblogs.com/zjh0420/p/16898391.html

相关文章

  • 数据结构篇——栈和队列
    数据结构篇——栈和队列本次我们介绍基础算法中的栈和队列,我们会从下面几个角度来介绍:栈和队列简述模拟栈模拟队列栈和队列简述首先我们要简单了解一下栈和队列:......
  • Django Celery RabbitMQ访问被拒绝(403) ACCESS_REFUSED
    报错代码:(403)ACCESS_REFUSED-LoginwasrefusedusingauthenticationmechanismPLAI(省略) 解决方案:    在rabbitmq中注册用户具体代码实现:列出用户rabbitm......
  • RabbitMq发布确认高级
    RabbitMq发布确认高级在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才......
  • 3-RabbitMQ面试题
    为什么使用MQ?MQ的优点简答异步处理-相比于传统的串行、并行方式,提高了系统吞吐量。应用解耦-系统间通过消息通信,不用关心其他系统的处理。流量削锋-可以通过消......
  • Linux下进程间通信方式之管道、信号、共享内存、消息队列、信号量、套接字
    /*1,进程间通信(IPC)Inter-ProcessCommunication比较好理解概念的就是进程间通信就是在不同进程之间传播或交换信息。2,linux下IPC机制的分类:管道、信号、共享内存、......
  • RabbitMq延迟队列
    RabbitMq延迟队列延迟队列概念延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队......
  • 「Java数据结构」手撕数组队列及环形数组队列。
    目录​​一、队列​​​​1、基本介绍​​​​2、示意图​​​​3、队列的特点​​​​二、数组模拟队列​​​​1、数组队列初始化​​​​2、判断方法​​​​3、增删改查......
  • RabbitMQ
    RabbitMQ1.初识MQ1.1.同步和异步通讯微服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像发邮件,不需要马上回复。两种方式各有优劣,打电......
  • 080_阻塞队列 BlockingQueue
    目录简介演示代码抛出异常add()添加元素队列已满时抛出异常remove()移除元素为空时抛出异常有返回值,不抛出异常offer()添加元素队列已满时返回false不抛异常poll()移除......
  • RabbitMQ
    RabbitMQ使用场景服务解耦假设有这样一个场景,服务A产生数据,而服务B,C,D需要这些数据,那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可但是,随着......