首页 > 其他分享 >rabbitmq系列03---发布确认

rabbitmq系列03---发布确认

时间:2024-04-27 11:22:38浏览次数:28  
标签:03 确认 System rabbitmq --- 发布 消息 queueName channel

一、发布确认逻辑

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了
当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。

 二、发布发布确认的策略

发布确认默认是没有开启的,如果要开启,需要调用方法 confirmSelect每当你要想使用发布确认,都需要在 channel 上调用该方法

 

1、单个发布确认

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

//单个确认
    public static void publishMessageSingle() throws Exception{
        Channel channel = Util.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个消息就马上发布确认
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("消息发送成功");
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end-begin)+"ms");
    }

 

2、批量确认发布
与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

//批量确认
    public static void publishMessageBatch() throws Exception{
        Channel channel = Util.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //批量确认消息大小
        int batchSize = 100;
        //开始时间
        long begin = System.currentTimeMillis();
        //批量发消息 批量发布确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //每达到100次,批量发布确认一次
            if(i%batchSize == 0){
                //发布确认
              channel.waitForConfirms();
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end-begin)+"ms");
    }

3、异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

//异步发布确认
    public static void publishMessageAsync() throws Exception{
        Channel channel = Util.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        /**
         * deliveryTag:消息的标记
         * multiple:是否批量确认
         */
        //消息确认成功 回调函数
        ConfirmCallback ackCallback = (deliveryTag,multiple)->{
            System.out.println("确确认消息"+deliveryTag);
        };
        //消息确认失败 回调函数
        ConfirmCallback nackCallback = (deliveryTag,multiple)->{
            System.out.println("未确认消息"+deliveryTag);
        };
        //准备消息监听器 监听那些消息成功,那些消息失败
        /**
         * 1.监听成功方法
         * 2.监听失败方法
         */
         channel.addConfirmListener(ackCallback,nackCallback);//异步通知
        //批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个异步发布确认消息,耗时"+(end-begin)+"ms");
    }

 

标签:03,确认,System,rabbitmq,---,发布,消息,queueName,channel
From: https://www.cnblogs.com/zhengwei-cq/p/18161851

相关文章

  • SQL217 对所有员工的薪水按照salary降序进行1-N的排名
    示例:droptableifexists`salaries`;CREATETABLE`salaries`(`emp_no`int(11)NOTNULL,`salary`int(11)NOTNULL,`from_date`dateNOTNULL,`to_date`dateNOTNULL,PRIMARYKEY(`emp_no`,`from_date`));INSERTINTOsalariesVALUES(10001,88958,'2......
  • access数据库批量更新中无法使用replace,出现“表达式中 'replace' 函数未定义”的替代
    如果我们想要批量修改数据库table_name表中aa字段中数据,将“|bbbb”删除sql的批量更新中,通用语法是:UPDATEtable_nameSETaa=REPLACE(aa,'|bbbb','')但是,如果是access数据库,就可能出现以下的报错信息:MicrosoftJETDatabaseEngine错误'80040e14'表达式中'replace'函......
  • 力扣-419. 甲板上的战舰
    1.题目题目地址(419.甲板上的战舰-力扣(LeetCode))https://leetcode.cn/problems/battleships-in-a-board/题目描述给你一个大小为mxn的矩阵board表示甲板,其中,每个单元格可以是一艘战舰'X'或者是一个空位'.',返回在甲板board上放置的战舰的数量。战舰只能水平......
  • SystemVerilog -- 6.4 Interface ~ Clocking Blocks
    SystemVerilogClockingBlocks默认情况下,模块端口和接口不指定信号之间的任何时序要求或同步方案。在clocking和endclocking之间定义的时钟块正是这样做的。它是与特定时钟同步的信号集合,有助于指定时钟和信号之间的定时要求。这将允许测试编写者更多地关注事务,而不是担心信号......
  • 攻防世界---unseping
    一.题干二.pop链的构造反序列化执行__wakeup魔术方法->waf函数对传入的参数进行过滤->执行__destruct魔术方法->利用call_user_func_array执行回调函数ping三.绕过过滤1.首先,waf函数过滤了ls命令,可以使用空变量${Z}或者''或者""绕过:<?phphighlight_file(__FILE__);class......
  • Rabbitmq系列02---Exchange
    个人理解:交换机的类型划分个人理解是能过routingkey来划分的,一是否按routingkey找队列;fanout就是不按routingkey找队列,Direct和Topicr按routingkey找队列,只是一个模糊找,一个精准找,而headers不按routingkey是按消头中的内容找队列。一、Fanout(订阅模式|广播模式)  Fanout......
  • Java面向对象03——三大特性之继承
    一、继承(extends)1.1、继承是什么继承就是Java允许我们用extends关键字,让一个类与另一个类建立起一种父子关系;被继承的类称为父类(基类、超类),继承父类的类都称为子类(派生类) ,当子类继承父类后,就可以直接使用父类公共的属性和方法了当子类继承父类后,就可以直接使用父类公共的......
  • 17-项目风险管理(8/10 十大管理)
    15.1管理基础15.1.1项目风险概述项目风险是一种不确定的事件或条件,一旦发生,会对项目目标产生某种正面或负面的影响。项目风险既包括对项目目标的威胁,也包括促进项目目标的机会。风险源于所有项目之中的不确定因素。项目在不同阶段会有不同的风险。风险会随着项目的进展而变......
  • 18-项目采购管理(9/10 十大管理)
    16.1管理基础16.1.1协议/采购合同项目采购管理包括从项目团队外部采购或获取所需产品、服务或成果的各个过程。例如合同、订购单、协议备忘录(MOA)和服务水平协议(SLA)。被授权采购项目所需货物、服务的人员可以是项目团队、管理层或组织采购部的成员。因应用领域不同,协议可以是......
  • 19-项目干系人管理(10/10 十大管理)
    17.1管理基础17.1.1管理的重要性每个项目干系人,他们会受到项目积极或消极的影响,或者能对项目施加积极或消极的影响。项目经理和团队管理干系人的能力决定着项目的成败。为提高项目成功的概率,尽早开始识别干系人并引导干系人参与。干系人满意度应作为项目目标加以识别和管理......