首页 > 其他分享 >【深入理解RabbitMQ】七大工作模式

【深入理解RabbitMQ】七大工作模式

时间:2024-11-24 20:01:04浏览次数:6  
标签:String 队列 七大 factory RabbitMQ 交换机 深入 Constants channel

文章目录

七种工作模式介绍

  • 简单模式
  • 工作队列
  • 订阅与发布模式
  • 路由模式
  • 主题模式
  • RPC模式
  • 发布确认模式

简单模式

基本概念

在这里插入图片描述

  • 特点:一个生产者(Producer),一个消费者(Consumer),一个队列(Queue),没有交换机。消费者监听该队列。
  • 应用场景:适用于简单的点对点消息传递场景。

代码实现

Producer端实现:

public class ProducerDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.参数设置
        factory.setHost("123.60.91.50");//ip地址
        factory.setPort(5162);//默认的提供rabbitmq服务的端口号
        factory.setUsername("admin");//用户名
        factory.setPassword("fengadmin");//密码
        factory.setVirtualHost("fbl");//虚拟机名称
        System.out.println("成功设置");
        Connection connection=factory.newConnection();
        System.out.println("连接成功");
        //3.创建一个通道
        Channel channel =connection.createChannel();
        //4.创建一个队列
        /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                    Map<String, Object> arguments) throws IOException;
        * 参数:  1.queue:队列名称
                2.durable:是否持久化,当mq重启之后,消息还在,相当于存储在硬盘当中
                3.exclusive:是否独占,只有一个消费者监听队列;当connection关闭时,是否删除队列
                4.autoDelete:是否自动删除,当没有consumer时,自动删除掉
                5.arguments:一些其他参数
         */
        channel.queueDeclare("hello",true,false,false,null);

        //5.通过channel将消息发送到队列中
        for (int i = 0; i < 10; i++) {
            String msg="hello rabbitmq--- :"+i;
        /*void basicPublish(String exchange,
                 String routingKey, BasicProperties props, byte[] body)
                                 throws IOException;
        *参数: 1.exchange:交换机的名称,简单模式下,交换机会使用默认的""
              2.routingKey:路由名称,在内置交换机下,路由名称=队列名称
              3.props:配置信息
              4.body:发送消息的数据 (参数形式时byte[])
        */
            channel.basicPublish("","hello",null,msg.getBytes());
            System.out.println(msg+" 消息发送成功");
        }

        //6.释放资源

        channel.close();
        connection.close();//注意先关channel,再关connection,如果顺序反了,会出问题
                            //或者直接关闭connectin即可

    }
}

Consumer的实现:

//消费者
public class ConsumerDemo1 {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.参数设置
        factory.setHost("123.60.91.50");//ip地址
        factory.setPort(5162);//默认的提供rabbitmq服务的端口号
        factory.setUsername("admin");//用户名
        factory.setPassword("fengadmin");//密码
        factory.setVirtualHost("fbl");//虚拟机名称
        System.out.println("成功设置");
        Connection connection=factory.newConnection();
        System.out.println("连接成功");
        //3.创建一个通道
        /*为什么在消费者方也要去创建一个通道?
        * 因为一般来说生产者和消费者并不在同一个项目中开发的
        * 如果直接生产者的项目还没有上线,那么就不存在channel这个通道,那么消费者
        * 去该通道中获取的时候,会出现报错*/
        Channel channel =connection.createChannel();

        //4.声明队列
        channel.queueDeclare("hello",true,false,false,null);
        
        //5.接收消息,并进行消费
        /*
        * String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
            参数名称:
            * 1.queue:队列名称
            * 2.autoAck:是否自动确认,消费者收到消息之后,手动和MQ进行确认,还是自动进行确认
            * 3.callback:回调对象
         * */

        //回调方法:当收到消息后,会自动执行该方法
        /*参数: 1.consumerTag:标识
               2.envelope:获取一些消息,交换机,路由key...
               3.properties:配置信息
               4.body:数据
        * */
        DefaultConsumer consumer=new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("接收到消息: "+new String(body));
            }
        };
        channel.basicConsume("hello",true,consumer);

        //try { TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}

        //6.释放资源,消费者相当于是一个监听程序,不需要关闭资源
//        channel.close();
//        connection.close();


    }
}

重点API:

  • 声明队列:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments);

    • queue:队列名称
    • durable:是否持久化,当mq重启之后,消息还在,相当于存储在硬盘当中
  • autoDelete:是否自动删除,当没有consumer时,自动删除掉

  • arguments:一些其他参数

工作队列模式

基本概念

  • 特点:一个生产者(Producer),无交换机,一个队列,多个消费者(Consumer)。多个消费者监听一个队列,对消息的消费是轮询方式接收。
    应用场景:适用于处理消息较多的情况,多个消费者共同处理同一个队列中的消息,提高消息处理速度。

代码实现

发布订阅模式

基本概念

在这里插入图片描述

  • 特点:一个生产者(Producer),一个交换机(Exchange),多个队列,多个消费者(Consumer)。消费者监听队列。交换机只负责将消息绑定到队列中,不会存储消息。发布订阅模式的交换机类型属于Fanout类型。
  • 应用场景:适用于一对多的消息广播场景,如日志记录、即时通知等。

交换机类型:
交换机分为三种,分别代表着不同的消息路由类型:

  1. Fanout:广播,将消息交给所有绑定到交换机的队列,即(Publish/Subcribe模式)
  2. Direct:定向,将消息交给符合指定的routing key的队列(`Routing``模式)
  3. Topic:通配符,将消息交给符合routing pattern(路由模式)的队列(Topics模式)

Routing KeyBinding Key之间的区别:
Routing KeyBinding Key之间是不存在明显的区分的,在方法中参数的名称也经常混用,我们可以从意义上来区分:
Binding Key是用来将交换机和队列之间进行绑定的,Routing Key是生产者和交换机之间进行路由的标记

代码实现

//生产者:
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();

        //2.参数设置
        factory.setHost(Constants.IP);//ip地址
        factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.USER_PASSWORD);//密码
        factory.setVirtualHost(Constants.VHOST);//虚拟机名称
        System.out.println("成功设置");
        Connection connection=factory.newConnection();
        System.out.println("连接成功");
        //3.创建一个通道
        Channel channel =connection.createChannel();
        //4.声明一个交换机
        /*参数:
        * 1.交换机的名称
        * 2.交换机的类型(4种类型)
        * 3.是否持久化
        * 4.是否自动删除
        * 5.是否是内置的交换机(如果是内置交换机,就无法从客户端处指定交换机)
        * 6.一些参数*/
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);
        //5.声明两个队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
        //6.声明交换机和队列之间的绑定关系
        /*queueBind(String exchange, String queue, String routingKey) throws IOException;
        * 参数:
        * 1.exchange:交换机名称
        * 2.queue:队列名称
        * 3.routingKey:相当于BingKey
        * */
        channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
        channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
        System.out.println("交换机和队列绑定成功");

        //7.发布消息
//5.通过channel将消息发送到队列中
        for (int i = 0; i < 10; i++) {
            String msg="hello fanout--- :"+i;
            channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());
            System.out.println(msg+" 消息发送成功");
        }
        //8.释放资源
        channel.close();
        connection.close();
    }
}

路由模式

基本概念

在这里插入图片描述

  • 特点:一个生产者,一个交换机,多个队列,多个消费者。生产者发送消息时会发送一个Routing Key给交换机,交换机通过这个Routing Key绑定到对应规则的队列上,实现消息的分发。
  • 应用场景:适用于需要将不同级别的消息数据路由到特定的消息队列中的场景。

代码实现

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();

        //2.参数设置
        factory.setHost(Constants.IP);//ip地址
        factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.USER_PASSWORD);//密码
        factory.setVirtualHost(Constants.VHOST);//虚拟机名称
        System.out.println("成功设置");
        Connection connection=factory.newConnection();
        System.out.println("连接成功");
        //3.创建一个通道
        Channel channel =connection.createChannel();
        //4.声明一个交换机
        /*参数:
         * 1.交换机的名称
         * 2.交换机的类型(4种类型)
         * 3.是否持久化
         * 4.是否自动删除
         * 5.是否是内置的交换机(如果是内置交换机,就无法从客户端处指定交换机)
         * 6.一些参数*/
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true);
        //5.声明三个队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);

        //6.声明交换机和队列之间的绑定关系
        /*queueBind(String exchange, String queue, String routingKey) throws IOException;
         * 参数:
         * 1.exchange:交换机名称
         * 2.queue:队列名称
         * 3.routingKey:相当于BingKey
         * */

        channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");

        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");

        System.out.println("交换机和队列绑定成功");

        //7.发布消息
        //只发送到routingKey=="a"中
        for (int i = 0; i < 10; i++) {
            String msg="hello direct--- :"+i;
            channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,msg.getBytes());
            System.out.println(msg+" 消息发送成功");
        }
        for (int i = 0; i < 20; i++) {
            String msg="hello direct--- :"+i;
            channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,msg.getBytes());
            System.out.println(msg+" 消息发送成功");
        }

        //8.释放资源
        channel.close();
        connection.close();
    }
}

重要API:

  • 声明交换机:channel.exchangeDeclare(String exchange, BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal);
    • exchange:交换机名称
    • type:交换机的类型,一共四种类型(direct,fanout,topic,headers)
    • durable:是否持久化
    • autoDelete:是否自动删除
    • internal:是否是内置交换机.(如果是内置交换机,不能通过客户端直接发送消息到该交换机上)
  • 队列和交换机绑定:queueBind(String exchange, String queue, String routingKey)
    • Exchange:交换机的名称
    • Queue:队列的名称
    • routingKey:路由参数
  • 发送消息到指定的队列中:basicPublish(String exchange, String routingKey, BasicProperties prop, byte[] message)
    • exchange:交换机名称
    • routingKey:路由参数
    • prop:配置参数
    • message:信息

通配符模式

基本概念

  • 特点:一个生产者,一个交换机,多个队列,多个消费者。Topics模式在Routing模式的基础上,给队列绑定带通配符的路由关键字。只要消息的Routing Key能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用Topic交换机。
    *号:代表匹配一个词(只能一个,不能多也不能少)。
    #号:代表匹配一个或多个词(可以没有,或者有很多)。
  • 应用场景:适用于需要对消息数据中的Routing Key进行通配符匹配,将满足条件的消息数据分发到特定的队列中的场景。
    在这里插入图片描述

topic类型的交换机在匹配规则上,有一些要求:

  1. RoutingKey是一系列用.分割的单词,比如a.orange.b,b.rabbit.c
  2. BindingKeyRoutingKey一样,也是点.分割的字符串
  3. BindingKey中存在两种特殊的字符串,用于模糊匹配
    a. *表示任意一个单词
    b. #表示任意多个单词

代码实现

//生产者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();

        //2.参数设置
        factory.setHost(Constants.IP);//ip地址
        factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.USER_PASSWORD);//密码
        factory.setVirtualHost(Constants.VHOST);//虚拟机名称
        System.out.println("成功设置");
        Connection connection=factory.newConnection();
        System.out.println("连接成功");
        //3.创建一个通道
        Channel channel =connection.createChannel();
        //4.声明一个交换机
        /*参数:
         * 1.交换机的名称
         * 2.交换机的类型(4种类型)
         * 3.是否持久化
         * 4.是否自动删除
         * 5.是否是内置的交换机(如果是内置交换机,就无法从客户端处指定交换机)
         * 6.一些参数*/
        channel.exchangeDeclare(Constants.TOPICS_EXCHANGE, BuiltinExchangeType.TOPIC,true);
        //5.声明三个队列
            channel.queueDeclare(Constants.TOPICS_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.TOPICS_QUEUE2,true,false,false,null);

        //6.声明交换机和队列之间的绑定关系
        /*queueBind(String exchange, String queue, String routingKey) throws IOException;
         * 参数:
         * 1.exchange:交换机名称
         * 2.queue:队列名称
         * 3.routingKey:相当于BingKey
         * */

        channel.queueBind(Constants.TOPICS_QUEUE1,Constants.TOPICS_EXCHANGE,"*.orange.*");

        channel.queueBind(Constants.TOPICS_QUEUE2,Constants.TOPICS_EXCHANGE,"*.*.rabbit");
        channel.queueBind(Constants.TOPICS_QUEUE2,Constants.TOPICS_EXCHANGE,"lazy.*");

        System.out.println("交换机和队列绑定成功");

        //7.发布消息
        //只发送到routingKey=="a"中
        for (int i = 0; i < 10; i++) {
            String msg="hello direct--- :"+i;
            channel.basicPublish(Constants.TOPICS_EXCHANGE,"a.orange.b",null,msg.getBytes());
            System.out.println(msg+" 消息发送成功");
        }
        for (int i = 0; i < 20; i++) {
            String msg="hello direct--- :"+i;
            channel.basicPublish(Constants.TOPICS_EXCHANGE,"a.b.rabbit",null,msg.getBytes());
            System.out.println(msg+" 消息发送成功");
        }

        //8.释放资源
        channel.close();
        connection.close();
    }
}

RPC(远程过程调用模式)

基本概念

在这里插入图片描述

  • 特点:客户端通过RabbitMQ发送消息到服务端,服务端调用函数对消息进行处理,再将处理结果通过另一消息队列返回给客户端。严格意义来说,这种模式违背了消息队列(MQ)的初衷,它需要等待服务端放回结果。可以视为RabbitMQ的一种扩展,为了实现消息可靠性投递。
  • 应用场景:适用于需要客户端与服务端之间进行双向通信的场景。

rpc通信的工作流程:
Client:

  1. 发送请求Request,且携带reply_tocorrelation_id两个参数
  • 在消息属性中设置replyTo字段,这个字段指定了一个回调队列,服务器处理之后,会将响应的结果发送到这个队列中.
  • 客户端在回调队列上等待消息.一旦接收到响应,客户端就会检查消息的correlation_id属性,以确保是它所期望的响应
  1. 接收响应(校验correlation_id)
    Server:
  2. 接收请求,进行响应
  3. 发送响应(按照客户端指定的replyTo,设置correlation_id)

代码实现

client代码:

{

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();

        //2.参数设置
        factory.setHost(Constants.IP);//ip地址
        factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.USER_PASSWORD);//密码
        factory.setVirtualHost(Constants.VHOST);//虚拟机名称
        System.out.println("成功设置");
        Connection connection=factory.newConnection();
        System.out.println("连接成功");
        //3.创建一个通道
        Channel channel =connection.createChannel();

        //4.声明一个请求队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
        channel.queueDeclare(Constants.RPC_REPONSE_QUEUE,true,false,false,null);
        //唯一标志本次请求
        String corrId= UUID.randomUUID().toString();
        //生成附带的参数属性
        AMQP.BasicProperties properties=new AMQP.BasicProperties()
                .builder()
                .correlationId(corrId)//唯一标志本次请求
                .replyTo(Constants.RPC_REPONSE_QUEUE)//设置回调队列
                .build();

        String msg="hello rpc....";
        //5.向请求队列发送请求
        channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,properties,msg.getBytes());

        //接受响应

        //阻塞队列,用于存储回调的结果
        final BlockingDeque<String> reponse=new LinkedBlockingDeque<>(1);

        //8.接收消息
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: "+new String(body));
                if(properties.getCorrelationId().equals(corrId)){
                    //校验一致
                    reponse.offer(new String(body,"UTF-8"));
                }
            }
        };
        //9.从响应队列中消费
        channel.basicConsume(Constants.RPC_REPONSE_QUEUE,true,consumer);
        String result=reponse.take();

        System.out.println("[RPC Client接收到响应: ]"+result);

        //释放资源
        channel.close();
        connection.close();
    }
}

重要的API:

  • 唯一标志本次请求: String corrId= UUID.randomUUID().toString();
  • 生成附带的参数属性:AMQP.BasicProperties properties=new AMQP.BasicProperties().builder() .correlationId(corrId)//唯一标志本次请求 .replyTo(Constants.RPC_REPONSE_QUEUE)//设置回调队列 .build();

server代码:

public class RpcServer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();

        //2.参数设置
        factory.setHost(Constants.IP);//ip地址
        factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.USER_PASSWORD);//密码
        factory.setVirtualHost(Constants.VHOST);//虚拟机名称
        System.out.println("成功设置");
        Connection connection=factory.newConnection();
        System.out.println("连接成功");
        //3.创建一个通道
        Channel channel =connection.createChannel();

        //4.声明一个请求队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
        channel.queueDeclare(Constants.RPC_REPONSE_QUEUE,true,false,false,null);
        //5.接收一个请求
        channel.basicQos(1);//设置最多只能接受一个消息

        DefaultConsumer consumer=new DefaultConsumer(channel){
            //回调机制
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //生成附带的参数
                AMQP.BasicProperties prop=new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                //生成响应结果
                String response=new String(body);
                //发送响应到响应队列中
                channel.basicPublish("",properties.getReplyTo(),prop,response.getBytes());
                //对消息进行应答
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };
        //从request队列中接受请求
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);

    }
}

Publisher Confirms(发布确认模式)

MQ是如何保证消息的可靠性的

在这里插入图片描述
作为消息中间件,都会面临消息丢失的问题,消息丢失的问题大概可以分为三种情况:

  1. 生产者问题:因为应用程序故障,网络抖动等原因,生产者没有向Borker发送消息;
  2. 消息中间件自身的问题:生产者成功将消息发送给了Borker,但是Borker并没有把消息保存好,导致消息丢失.
  3. 消费者问题:Broker发送消息给到消费者,消费者在消费消息时,因为没有处理好,导致Borker将发送失败的消息从队列中删除了.

这里我们可以针对这三种情况,给出三种不同的解决方案:

  • 生产者产生问题:发布确认机制
  • Borker产生问题:持久化机制
  • 消费者产生问题:消息应答机制

现在我们来学习发布确认机制

什么是发布确认机制

在这里插入图片描述

  • 特点:指的是提供者可靠投递到交换机的过程,不会因为网络或者其他问题导致消息丢失。在这个模式下,可以在提供者注册一个回调函数。当消息发送后,不管如何都会触发这个回调函数,可以通过这个回调函数来判断是否到达交换机。
  • 应用场景:适用于需要确保消息可靠投递到交换机的场景。

发布确认机制是RabbitMQ的七大工作模式之一.

  • 生产者将信道设定为confirm(确认)模式
  • 信道进入confirm模式,会将在该信道上发布的消息都指派一个唯一标识ID(从1开始);
  • 一旦消息进入到所匹配的队列时,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID);
  • 生产者就可以得知消息已经正确到达目的队列中了
  • 如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出
  • borker回传给生产者的确认消息中存在两个属性:deliveryTagmultiple,其中deliveryTag 表示确认消息的序号,multiple表示是否批量处理

发布确认机制的好处

发布确认机制最大的好处在于异步,生产者可以同时发布消息和等待信道返回确认消息

  1. 当消息最终得到确认之后,生产者可以通过回调的方法来处理该确认信息
  2. 如果RabbitMQ因为Borker内部错误导致消息丢失,那么就会向生产者发送一条nack命令,生产者同样可以在回调方法中处理该nack命令(也就是说Borker会给生产者发送的ACK分别为ack或者nack)

发布确认的三大策略

单独确认策略

特点:每发送一条消息,就会等待确认信息,收到确认信息之后,才会接着发送消息

代码实现
//单独确认策略
public static void individually() throws Exception {

    try (Connection connection = createConnection()) {
        Channel channel = connection.createChannel();
        //开启信道的确认模式
        channel.confirmSelect();
        //声明队列
        channel.queueDeclare(Constants.PUBLISHER_QUEUE1, true, false, false, null);
        //循环发送消息,调用等待确认消息方法
        long start = System.currentTimeMillis();
        for (int i = 0; i < 200; i++) {
            String msg = "消息:" + i;
            channel.basicPublish("", Constants.PUBLISHER_QUEUE1, null, msg.getBytes());
            channel.waitForConfirmsOrDie(5_000);//等待确认消息,只要消息被确认,该方法就会返回
        }
        long end = System.currentTimeMillis();
        System.out.format("单独确认策略,用时:%d\n", end - start);
    }
}

重点API:
channel.waitForConfirmsOrDie(5_000);:等待确认消息,只要消息被确认,该方法就会返回

批量确认策略
代码实现
//批量确认策略
public static void inBatch () throws Exception {
    try (Connection connection = createConnection()) {
        Channel channel = connection.createChannel();
        //开启信道的确认模式
        channel.confirmSelect();
        //声明队列
        channel.queueDeclare(Constants.PUBLISHER_QUEUE2, true, false, false, null);
        long start = System.currentTimeMillis();
        int batchSize=100;
        int curSize=0;
        for (int i = 0; i < 200; i++) {
            String msg="消息: "+i;
            channel.basicPublish("", Constants.PUBLISHER_QUEUE2, null, msg.getBytes());

            curSize++;
            if(batchSize==curSize){
                channel.waitForConfirmsOrDie(5_000);//等待确认消息,只要消息被确认,该方法就会返回
                curSize=0;
            }
        }
        if(curSize>0){//保证消息确认完毕
            channel.waitForConfirmsOrDie(5_000);//等待确认消息,只要消息被确认,该方法就会返回
            curSize--;
        }
        long end = System.currentTimeMillis();
        System.out.format("批量确认策略,用时:%d\n", end - start);
    }
}

相比于单独确认策略,批量确认大大提升了效率,但其缺点是:如果出现了Nack信息或者超时,我们不清楚是哪条消息出现了问题,客户端这时需要将这一批次的消息全部重发,这样会带来明显的重复消息数量。当消息经常丢失的时候,批量确认的性能不升反降。

异步确认策略
代码实现
//异步确认策略
public static void Asynchronously () throws Exception {
    try (Connection connection = createConnection()) {
        Channel channel = connection.createChannel();
        //开启信道的确认模式
        channel.confirmSelect();
        //声明队列
        channel.queueDeclare(Constants.PUBLISHER_QUEUE3, true, false, false, null);

        //有序集合,元素按照自然顺序排序,存储为confirm消息序号
        SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<>());
		
		//生产者收到消息的ack和nack的消息,进行处理:
        channel.addConfirmListener(new ConfirmListener() {
            /**
            *这里是生产者收到了ack信息,然后进行消息的处理
           */
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                /*l: deliveryTag,每一个信息的ID
                * b: mutiple,表示是否批量处理*/
                if(b){
                    //批量处理
                    confirmSet.headSet(l+1).clear();
                }else{
                	//单独处理
                    confirmSet.remove(l);
                }
            }
			/**
            *这里是生产者收到了nack信息,然后进行消息的处理
           */
            @Override
            public void handleNack(long l, boolean b) throws IOException {
                /*l:deliveryTag,每一个信息的ID
                 * b:mutiple,表示是否批量处理*/
                if(b){
                    //批量处理
                    confirmSet.headSet(l+1).clear();
                }else{
                	//单独处理
                    confirmSet.remove(l);
                }
                //重发信息的逻辑
            }
        });

        //循环发送消息
        long start = System.currentTimeMillis();

        for (int i = 0; i < 200; i++) {
            String msg="消息:"+i;
            long nextId=channel.getNextPublishSeqNo();//得到下一次发送消息的序号,从1开始
            channel.basicPublish("",Constants.PUBLISHER_QUEUE2,null,msg.getBytes());
            //将序号存入集合中
            confirmSet.add(nextId);
        }

        //消息确认完毕
        while(!confirmSet.isEmpty()){
            Thread.sleep(10);
        }

        long end = System.currentTimeMillis();
        System.out.format("异步确认策略,用时:%d\n", end - start);
    }
}

重点API:

  • Channel接口提供了addConfirmListener方法,这个方法可以添加ConfirmListener回调接口
  • ConfirmListener该方法实现了对处于Confirm模式的信道的监听,可以分别对应处理RabbitMQ发送给生产者的acknack
  • ConfirmListener接口中包含两个重写方法:handleAck(long deliveryTag,boolean multiple)handleNack(long deliveryTag,boolean multiple)
    其中的两个参数deliveryTag 表示发送消息的序号,multiple 表示是否批量确认

三种策略的对比:
在这里插入图片描述

标签:String,队列,七大,factory,RabbitMQ,交换机,深入,Constants,channel
From: https://blog.csdn.net/2301_78320637/article/details/144010244

相关文章

  • RabbitMQ5:Fanout交换机、Direct交换机、Topic交换机
    欢迎来到“雪碧聊技术”CSDN博客!在这里,您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者,还是具有一定经验的开发者,相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导,我将不断探索Java的深邃世界,分享最新的技术动态、实战经验以及项目......
  • 深入理解Rust的所有权和借用
    文章目录所有权所有权基本规则值的拷贝传递函数借用借用分类借用的原则总结Rust编程语言的所有权机制和借用是它的核心特性之一,旨在确保内存安全、并发安全以及避免数据竞争。由于所有权机制,不需要通过垃圾回收进行内存处理,在保证高性能的同时,还保证了内存安全。通......
  • 深入理解MySQL中的默认值:从NULL到数据完整性
    深入理解MySQL中的默认值:从NULL到数据完整性引言在数据库设计中,字段的默认值是一个看似微不足道,却可能引发大问题的话题。特别是在MySQL中,字段的默认值处理方式直接影响数据的完整性和一致性。本文将深入探讨MySQL中默认值的机制,并通过实例引导你理解如何在实际开发中正确处理默......
  • RabbitMQ4:work模型
    欢迎来到“雪碧聊技术”CSDN博客!在这里,您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者,还是具有一定经验的开发者,相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导,我将不断探索Java的深邃世界,分享最新的技术动态、实战经验以及项目......
  • 【Python】 深入理解Python的单元测试:用unittest和pytest进行测试驱动开发
    《PythonOpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门!单元测试是现代软件开发中的重要组成部分,通过验证代码的功能性、准确性和稳定性,提升代码质量和开发效率。本文章深入介绍Python中两种主流单元测试框架:unittest和pytest,并结合测试驱动开发(TDD)的思想,展示如......
  • MySQL原理简介—4.深入分析Buffer Pool
    大纲1.BufferPool是什么2.如何配置BufferPool的大小3.数据页是MySQL中抽象出来的数据单位4.数据页如何对应BufferPool中的缓存页5.缓存页对应的描述信息是什么6.BufferPool简单总结7.数据库启动时如何初始化BufferPool8.free链表可判断哪些缓存页是空闲的9.free链表......
  • spring框架必知点整理(点到为止非深入篇)
    1.springboot的启动流程springboot的启动流程分两阶段:构造函数,实例化SpringApplication阶段实例化后,调用SpringApplication的run方法阶段 启动流程@SpringBootApplicationpublicclassMySpringBootWebApplication{publicstaticvoidmain(String[]args)......
  • 深入理解数据库连接池:从概念到实践
    深入理解数据库连接池:从概念到实践引言在现代Web应用开发中,数据库连接的管理是一个至关重要的环节。传统的数据库连接管理方式在高并发环境下存在性能瓶颈,而数据库连接池技术通过预先创建和管理数据库连接,显著提高了系统的性能和稳定性。本文将深入探讨数据库连接池的概念、优势......
  • 深入理解JDBC API:从SQL注入到PreparedStatement的安全解决方案
    深入理解JDBCAPI:从SQL注入到PreparedStatement的安全解决方案引言在现代Web应用开发中,数据库操作是不可或缺的一部分。Java数据库连接(JDBC)API为Java开发者提供了一种与数据库交互的标准方式。然而,随着应用的复杂性增加,安全问题也随之而来。其中,SQL注入是最常见且危险的安全漏洞......
  • Oracle 深入学习 Part 7: Maintaining Online Redo Log Files(维护联机重做日志文件)
            联机日志文件又叫重做日志文件,记录了对数据库的任何改变。Oracle遵循WAL(Write-AheadLogging)原则,即在提交事务前,先写到RedoBuffer(日志缓冲区),再由LGWR(日志写入进程)写入到物理的联机重做日志文件中。1.1组的概念每个日志组包含多个成员文件(Member),用......