首页 > 其他分享 >RabbitMQ学习笔记(三)

RabbitMQ学习笔记(三)

时间:2022-11-19 10:23:15浏览次数:56  
标签:队列 确认 笔记 学习 死信 消息 RabbitMQ channel

三 利用RabbitMQ高级特性,完善项目的可靠性

3.1 如何保证消息的可靠性

3.1.1 发送方

需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理

需要使用RabbitMQ消息返回机制,若没发现目标队列,中间件会通知发送方

3.1.2 消费方

需要使用RabbitMQ消费端确认机制,确认消息没有发生处理异常

需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

3.1.3 RabbitMQ自身

大量堆积的消息会给RabbitMQ产生很大的压力,需要使用RabbitMQ消息过期时间,防止消息大量积压

过期后会直接被丢弃,无法对系统运行异常发出警报,需要使用RabbitMQ死信队列,收集过期消息,以供分析

3.1.4 总结

需要引入RabbitMQ新特性,来确保消息可靠性:

发送端确认机制
消费端确认机制
消息返回机制
消费端限流机制
消息过期机制
死信队列

3.2 发送端确认机制

3.2.1 什么是发送端确认机制

消息发送后,若中间件收到消息,会给发送端一个应答
生产者接收应答,用来确认这条消息是否正常发送到中间件

3.2.2 三种确认机制

单条同步确认机制的实现方法(最优

配置channel,开启确认模式: channel.confirmSelect(),每发送一条消息,调用channel.waitForConfirms()方法,等待确认

try (Connection connection = connectionFactory.newConnection();
     Channel channel = connection.createChannel()) {
    channel.confirmSelect();
    String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
    channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
     log.info("message sent");
     if (channel.waitForConfirms()){
         log.info("message success");
     }else {
         log.info("message failed");
     }
}
多条同步确认机制的实现方法(不推荐)

配置channel,开启确认模式: channel.confirmSelect()

发送多条消息后,调用channel.waitForConfirms()方法,等待确认

//发送多条消息有一条确认,不推荐使用
try (Connection connection = connectionFactory.newConnection();
     Channel channel = connection.createChannel()) {
    channel.confirmSelect();
    for (int i = 0; i < 10; i++) {
        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
        channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
        log.info("message sent");
    }
     if (channel.waitForConfirms()){
         log.info("message success");
     }else {
         log.info("message failed");
     }
}
异步确认机制的实现方法(不推荐)

配置channel,开启确认模式: channel.confirmSelect()

在channel上添加监听: addConfirmListener,发送消息后,会回调此方法,通知是否发送成功

异步确认有可能是单条,也有可能是多条,取决于MQ

image-20221117154544052

//异步确认产生并发,问题,而且接受的应答还不在一个线程
try (Connection connection = connectionFactory.newConnection();
     Channel channel = connection.createChannel()) {
    channel.confirmSelect();
    ConfirmListener confirmListener = new ConfirmListener() {
        @Override
        /**
         * deliveryTag 发送端消息序号
         * multiple true多条消息 false单条消息
         */
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        log.info("成功==deliveryTag:{},multiple:{}",deliveryTag,multiple);
        }

        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            log.info("失败==deliveryTag:{},multiple:{}",deliveryTag,multiple);
        }
    };
    channel.addConfirmListener(confirmListener);
    for (int i = 0; i < 10; i++) {
        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
        channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
        log.info("message sent");
    }

    Thread.sleep(100000);
}

image-20221118175459023

3.3 消息返回机制

3.3.1 消息真被路由了吗?

消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃

消息丢弃后,订单处理流程停止,业务异常

需要使用RabbitMQ消息返回机制,确认消息被正确路由

image-20221118182753037

3.3.2 消息返回机制的原理

消息发送后,中间件会对消息进行路由

若没有发现目标队列,中间件会通知发送方

Return Listener 会被调用

3.3.4 消息返回的开启方法

在RabbitMQ基础配置中有一个关键配置项: Mandatory

Mandatory若为false, RabbitMQ将直接丢弃无法路由的消息

Mandatory若为true, RabbitMQ才会处理无法路由的消息

代码实现:

餐厅微服务给订单微服务发消息,没有正确的陆游会回调俩接口都可以:ReturnListener ReturnCallback

 try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {

//                channel.addReturnListener(new ReturnListener() {
//                    @Override
//                    public void handleReturn(int replyCode,
//                                             String replyText,
//                                             String exchange,
//                                             String routingKey,
//                                             AMQP.BasicProperties properties,
//                                             byte[] body) throws IOException {
//                        log.info("消息路由失败==replyCode:{} replyCode:{} exchange:{} routingKey:{} properties:{} body:{} ",
//                                replyCode,replyText,exchange,routingKey,properties,new String(body)
//                                );
//                    }
//                });
                
                channel.addReturnListener(new ReturnCallback() {
                    @Override
                    public void handle(Return returnMessage) {
                        log.info("消息路由失败==replyCode:{}",returnMessage);
                    }
                });
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.restaurant",
                        "key.order",
                        true,
                        null,
                        messageToSend.getBytes());
            Thread.sleep(1000);
            }

3.4 消费方确认机制

3.4.1 消费端处理异常怎么办?

默认情况下,消费端接收消息时,消息会被自动确认(ACK)

消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况

需要使用RabbitMQ消费端确认机制,确认消息被正确处理

3.4.2 消费端ACK类型

自动ACK:消费端收到消息后,会自动签收消息

手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息

3.4.3 手动ACK类型

单条手动ACK: multiple=false

channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

多条手动ACK: multiple=true

//每五条签收
if (message.getEnvelope().getDeliveryTag() % 5 ==0){
    channel.basicAck(message.getEnvelope().getDeliveryTag(),true);
}

推荐使用单条ACK

3.4.4 重回队列

若设置了重回队列,消息被NACK之后,会返回队列末尾,等待进一步被处理

一般不建议开启重回队列,因为第一次处理异常的消息,再次处理,基本上也是异常

 //重回队列
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);

3.5 消费端限流机制

3.5.1 消费端处理的过来吗?

业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃

需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

3.5.2 RabbitMQ-QoS

针对以上问题, RabbitMQ开发了QoS (服务质量保证)功能

QoS功能保证了在一定数目的消息未被确认前,不消费新的消息

QoS功能的前提是不使用自动确认

3.5.3 QoS原理

QoS原理是当消费端有一定数量的消息未被ACK确认时,RabbitMQ不给消费端推送新的消息

RabbitMQ使用QoS机制实现了消费端限流

3.5.4 消费端限流机制参数设置

prefetchCount:针对一个消费端最多推送多少未确认消息

global: true:针对整个消费端限流false: 针对当前channel

prefetchSize : 0(单个消息大小限制,一般为0)

prefetchSize与global两项, RabbitMQ暂时未实现

没使用QoS会把消息全部推送到消费端,消费端消息全处在unacked状态,然后消费端一条一条的处理。

优点并不一定是怕把消费者给挤爆,主要原因是,堆积到一个消费者上面,导致新的消费者也抢不过来,因为旧的消息早就推送了。

代码:

每次只推送2个ACK应答,这样其他的消费者就可以拿到了

channel.queueBind(
        "queue.restaurant",
        "exchange.order.restaurant",
        "key.restaurant");
channel.basicQos(2);
channel.basicConsume("queue.restaurant", false, deliverCallback, consumerTag -> {
});
while (true) {
    Thread.sleep(100000);
}

3.6 消息过期机制

3.6.1 队列爆满怎么办?

默认情况下,消息进入队列,会永远存在,直到被消费
大量堆积的消息会给RabbitMQ产生很大的压力

3.6.2 RabbitMQ的过期时间(TTL)

RabbitMQ的过期时间称为TTL (Time to Live),生存时间
RabbitMQ的过期时间分为消息TTL和队列TTL

消息TTL设置了单条消息的过期时间:代码

channel.addConfirmListener(confirmListener);
    String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
    channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
    log.info("message send");

image-20221118221830703

队列TTL设置了队列中所有消息的过期时间:代码

HashMap<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
channel.queueDeclare(
        "queue.restaurant",
        true,
        false,
        false,
        args
);

注意:args.put("x-expire", 15000);不要设置这个,这样队列就被删除了。

3.6.3 如何找到适合自己的TTL?

TTL的设置主要考虑技术架构与业务
TTL应该明显长于服务的平均重启时间
建议TTL长于业务高峰期时间

注意:不建议直接使用建议和死信一起使用

3.7 死信队列

3.7.1 如何转移过期消息?

消息被设置了过期时间,过期后会直接被丢弃

直接被丢弃的消息,无法对系统运行异常发出警报

需要使用RabbitMQ死信队列,收集过期消息,以供分析

3.7.2 什么是死信队列

死信队列:队列被配置了DLX属性(Dead-Letter-Exchange)

当一个消息变成死信(dead message)后,能重新被发布到另一个Exchange,这个Exchange也是一个普通交换机

死信被死信交换机路由后,一般进入一个固定队列

image-20221118223359307

3.7.3 死信队列设置方法

设置转发、接收死信的交换机和队列:

Exchange: dlx.exchange

Queue: dlx.queue

RoutingKey:#

在需要设置死信的队列加入参数:

x-dead-letter-exchange = dlx.exchange

代码:


channel.exchangeDeclare(
        "dlx.exchange",
        BuiltinExchangeType.TOPIC,
        true,
        false,
        null
);

channel.queueDeclare(
        "dlx.queue",
        true,
        false,
        false,
        null
);

channel.queueBind(
        "dlx.queue",
        "dlx.exchange",
        "#"
);

//设置死信队列
 HashMap<String, Object> args = new HashMap<>(16);
        args.put("x-message-ttl", 15000);
        args.put("x-dead-letter-exchange", "dlx.exchange");
        channel.queueDeclare(
                "queue.restaurant",
                true,
                false,
                false,
                args
        );

3.7.4 怎样变成死信

  1. 消息被拒绝(reject/nack)并且requeue=false(不让他重回队列)
//不重回队列
channel.basicNack(message.getEnvelope().getDeliveryTag(), false, false);
  1. 消息过期(TTL到期)
 args.put("x-message-ttl", 15000);
  1. 队列达到最大长度
args.put("x-max-length", 5);

3.8 小结

3.8.1 善用RabbitMQ高级特性

对于RabbitMQ的高级特性,要善加利用
接收端确认、死信队列是非常常用的特性

3.8.2 慎用RabbitMQ高级特性

不要无限追求高级,用上所有RabbitMQ的高级特性

重回队列、发送端确认是不常用的特性,谨慎使用

3.8.2 善用RabbitMQ管控台

管控台是RabbitMQ调试的利器

RabbitMQ高级特性多数都涉及交换机、队列的属性配置,可以在管控台确认配置是否生效

RabbitMQ高级特性很多都可以在管控台进行试验

3.8.3 本章小结

为了确保消息发送,使用了发送端确认机制

为了确保消息正确路由,使用了消息返回机制

为了保证消息正常梳理,使用了消费端确认机制

为了保证消费端稳定,使用消费端限流机制

为了中间件问题,使用过期时间机制

为了处理异常消息,使用死信机制

标签:队列,确认,笔记,学习,死信,消息,RabbitMQ,channel
From: https://www.cnblogs.com/mrwyk/p/16905548.html

相关文章

  • Golang学习之路6-goroutine并发
    @目录前言一、goroutine用法二、goroutine循环三、goroutine提前退出四、goroutine双向管道五、goroutine单向管道六、监听管道如下图,可以看到当我们监听到有写入数据时会......
  • 11.18日学习记录
    2022-11-1813:41:421.TOMCAT的配置tomcat的配置搞了一个上午,还让别人帮了忙,主要是因为tomcat位置放的不好。貌似放在桌面不行,改放在了D盘下,终于解决了之前tomcat启动成......
  • Day2学习:dos的常用指令
     day2学习:dos的常用指令打开方式同时按下win键+r打开运行窗,然后输入cmd,回车也可以在同时按下win+x,或者"开始"菜单上右键,在弹出菜单上选择"命令行提示符"或"命令提......
  • day2学习:dos的常用指令
    day2学习:dos的常用指令打开方式同时按下win键+r打开运行窗,然后输入cmd,回车也可以在同时按下win+x,或者"开始"菜单上右键,在弹出菜单上选择"命令行提示符"或"命令提示......
  • 深度学习基础课:用全连接层识别手写数字(中)
    大家好~我开设了“深度学习基础班”的线上课程,带领同学从0开始学习全连接和卷积神经网络,进行数学推导,并且实现可以运行的Demo程序线上课程资料:本节课录像回放加QQ群,获得......
  • Mysql批量更新性能优化学习
    转自:https://juejin.cn/post/70435968558290698611.更新对表做多行更新的时候通常会遇到以下两种情况:单语句批量更新(updatea=a+1wherepk>500)多语句批量更新(updat......
  • 数字高程模型复习笔记
    数字高程模型知识点总结概论数字地面模型DEM的定义是表示区域D上的三维向量有限序列,是以绝对高程或海拔表示的地面模型。是国家基础空间数据的重要组成部分,它表示地表......
  • Mysql批量插入性能优化学习
    转自:https://www.cnblogs.com/myseries/p/11191134.html1.批量insert1.1一条sql将单条insert改为批量insert,其实个人认为改为replaceinto更好,批量insert时,如果其中一......
  • Redis学习(四)之redis中的数据类型之Hashes类型
      1、hset设置值,hget获取值,hmget可以一次获取一个map的多个key值。 2、hsetmapnamekeyvaluekeyvalue  一些指令可以直接操作map中的key对应的value值 ......
  • 初学linux笔记 第二章 虚拟机VMware16 共享文件
    由于需要把我在WINDOWS上写好的QT程序转移过来,因此这里需要设置共享文件。需要在虚拟机系统上安装VMwareTools,在主机上设置共享文件夹,这里参考了https://blog.csdn.net/xi......