首页 > 其他分享 >RabbitMQ 使用细节 → 优先级队列与ACK超时

RabbitMQ 使用细节 → 优先级队列与ACK超时

时间:2024-02-18 09:11:58浏览次数:35  
标签:优先级 ACK RabbitMQ 队列 rabbit org java 超时

开心一刻

  今天坐在太阳下刷着手机

  老妈走过来问我:这么好的天气,怎么没出去玩

  我:我要是有钱,你都看不见我的影子

  老妈:你就不知道带个碗,别要边玩?

  我:......

优先级队列

  说到队列,相信大家一定不陌生,是一种很基础的数据结构,它有一个很重要的特点:先进先出

  但说到优先级队列,可能有些小伙伴还不清楚,因为接触的不多嘛

  示例基于: RabbitMQ 3.9.11 

  业务场景

  我手头上正好有一个项目,系统之间通过 RabbitMQ 通信,调度系统 是消息生产者, 文件生成系统 是消息消费者

  默认情况下,先下发的消息会先被消费,也就是先进队列的消息会先出队列

  业务高峰期,重要程度不同的文件都需要生成,那如何保证重要文件先生成了?

  1、调整调度

    1.1 将重要文件的调度提前,保证重要文件的消息先进入队列;但需要考虑调度能否提前,如果生成文件依赖的上游数据还未就绪了?

    1.2 将普通文件的调度延后,有点围魏救赵的感觉,万一某一天不需要生成重要文件,那服务器岂不是有一段时间的空置期,而这段空置期本可以生成普通文件

    总的来说就是不够灵活:有重要文件的时候先生成重要文件,没有重要文件的时候生成普通文件

  2、提高服务器配置

  这个就不用过多解释了把,加大 文件生成系统 的硬件配置,提高其文件生成能力

  保证文件(不论重要还是普通)都能在调度的时间开始生成,也就无需区分重要与普通了

  那么重要文件先生成这个命题就不成立了

  想想都美,可实际情况,大家都懂的!

  3、优先级队列

  RabbitMQ 的 Priority Queue 非常契合这个业务场景,详情请往下看

  队列优先级

  相较于普通队列,优先级队列肯定有一个标志来标明它是一个优先级队列

  这个标志就是参数: x-max-priority ,定义优先级的最大值

  我们先来看下 RabbitMQ 控制台如何配置

  相关参数配置好之后,点击 Add queue 即创建出了一个 优先级队列 

  创建完成之后,你会发现队列上有一个 Pri 标志,说明这是一个优先级队列

  实际开发工程中,一般不会在 RabbitMQ 控制台创建队列,往往是服务启动的时候,通过服务自动创建 exchange 、 queue 

  实现也非常简单

@Configuration
public class RabbitConfig {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(QSL_EXCHANGE, true, false);
    }

    @Bean
    public Queue queue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", 5);
        return new Queue(QSL_QUEUE, true, false, false, args);
    }

    @Bean
    public Binding bindingQueue() {
        return BindingBuilder.bind(queue()).to(directExchange()).with("com.qsl");
    }
}
View Code

  服务启动成功后,我们可以在 RabbitMQ 控制台看到队列: com.qsl.queue ,其 x-max-priority 等于 5

  消息优先级

  消息属性 priority 可以指定消息的优先级

  停止服务后,我们手动往队列 com.qsl.queue 中放一些带有优先级的消息

  优先级分别是: 3,1,5,5,10,4 对应的消息体分别是: 3,1,5_1,5_2,10,4 

  此时队列中共有 6 个消息准备就绪

  启动服务,进行消息消费,消费顺序如下

  可以总结出一个规律:优先级高的先出队列,优先级相同的,先进先出

  那优先级是 10 的那个消息是什么情况,它为什么不是第一个出队?

  因为队列 com.qsl.queue 的最大优先级是 5,即使消息的优先级设置成 10,其实际优先级也只有 5,这样是不是就理解了?

  实际开发工程中,基本不会在 RabbitMQ 控制台手动发消息,肯定是由服务发送消息

  我们模拟下带有优先级的消息发送

  是不是 so easy ! 

  x-max-priority

  值支持范围是 1 ~ 255 ,推荐使用 1 ~ 5 之间的值,如果需要更高的优先级则推荐 1 ~ 10 

   1 ~ 10 已经足够使用,不推荐使用更高的优先级,更高的优先级值需要更多的 CPU 和 内存资源 

  没有设置优先级的消息将被视为优先级为 0,优先级高于队列最大优先级的消息将被视为以队列最大优先级发布的消息

  数据结构

  底层数据结构:堆

  具体请看:数据结构之堆 → 不要局限于堆排序

ACK超时

  之前一直不知道这一点,直到有一次碰到了如下异常

  一查才知道ACK超时了

  超时异常

  从消费者获取到消息(消息投递成功)开始,在超时时间(默认30分钟)内未确认回复,则关闭通道,并抛出 PRECONDITION_FAILED 通道异常

  并且消息会重新进入队列,等待再次被消费

  ACK超时的配置项: consumer_timeout ,默认值是 1800000 ,单位是毫秒,也就是 30 分钟

  可用命令 rabbitmqctl eval 'application:get_env(rabbit,consumer_timeout).' 查看

  判断是否ACK超时的调度间隔是一分钟,所以 consumer_timeout 不支持低于一分钟的值,也不建议低于五分钟的值

  我们将 consumer_timeout 调整成 2 分钟,看看超时异常

  有 2 种调整方式

  1、修改 /etc/rabbitmq.conf 

    配置文件没有则新建,然后在配置文件中将 consumer_timeout 设置成 120000 (没有该配置项则新增)

    然后重启 rabbitmq 

  2、动态修改

    执行命令 rabbitmqctl eval 'application:set_env(rabbit,consumer_timeout,120000).' 即可

    不需要重启 rabbitmq 

    需要注意的是,这种修改不是永久生效,一旦 rabbitmq 重启, consumer_timeout 将会恢复到默认值

  我们用第 2 种方式进行调整

  然后我们在消费端睡眠 3 分钟后进行ACK

 

  最后在 rabbitmq 控制台手动发送一个消息,异常信息如下

2024-02-15 13:08:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|com.qsl.rabbit.listener.TestListener|INFO|28|消费者接收到消息:6
2024-02-15 13:10:47|AMQP Connection 192.168.3.225:5672|org.springframework.amqp.rabbit.connection.CachingConnectionFactory|ERROR|1575|Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 120000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0)
2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|com.qsl.rabbit.listener.TestListener|ERROR|33|消息确认异常:
java.lang.IllegalStateException: Channel closed; cannot ack/nack
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1175)
    at com.sun.proxy.$Proxy50.basicAck(Unknown Source)
    at com.qsl.rabbit.listener.TestListener.onMessage(TestListener.java:31)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
    at java.lang.Thread.run(Thread.java:745)
2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|INFO|1436|Restarting Consumer@2e6f610d: tags=[[amq.ctag-hE7fVqLNKO44ytMHalsf2A]], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.3.225:5672/,1), conn: Proxy@3b1ed14b Shared Rabbit Connection: SimpleConnection@13275d8 [delegate=amqp://admin@192.168.3.225:5672/, localPort= 55710], acknowledgeMode=MANUAL local queue size=0
2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2|com.qsl.rabbit.listener.TestListener|INFO|28|消费者接收到消息:6
View Code

  从 RabbitMQ 3.12 开始,可以为每个队列配置过期时长,而之前只能为每个 Rabbit 节点配置过期时长

  如何处理

  如果碰到ACK超时,那么我们该如何处理

  1、增加超时时长

  这往往是最容易想到的,默认 30 分钟不行就改成 60 分钟嘛

  但并不是无脑增加超时时长,默认值往往是综合情况下比较优的一个值,并不推荐加长

  2、异步处理

  用线程池处理异步处理消息,及时进行消息ACK

  但需要考虑拒绝策略,如果用的是: CallerRunsPolicy ,还是有可能触发ACK超时

  3、幂等处理

  消息消费做幂等处理,是规范,而不仅仅只是针对ACK超时

  消息正在消费中,或者已经消费完成,这个消息就不应该再次被消费,可以打印日志然后直接ACK,而无需进行业务处理

  4、自动ACK

  虽然自动ACK可以简化消息确认的流程,但它也可能带来一些潜在的问题,例如:

  消息丢失风险:自动ACK意味着一旦消费者接收到消息, RabbitMQ 就会将其从队列中移除。如果消费者在处理消息时发生故障或崩溃,未处理的消息可能会丢失

  限流作用减弱:ACK机制可以帮助限流,即通过控制ACK的发送速率来限制消费者处理消息的速度。如果使用自动ACK,这种限流作用会减弱,可能导致消费者过快地消费消息,超出其实际处理能力

  缺乏灵活性:自动ACK不允许消费者在处理完消息后再决定是否要确认消息,这限制了消费者的灵活性。例如,消费者可能需要根据消息内容或处理结果来决定是否重新入队或丢弃消息

  等等

  总之,自动ACK慎用

  具体如何处理,需要结合具体业务,选择比较合适的方式

总结

  优先级队列

  通过配置 x-max-priority 参数标明队列是优先级队列

  队列的优先级取值范围推荐 1 ~ 5 ,不推荐超过 10

  通过属性 priority 可以指定消息的优先级,没有设置优先级的消息将被视为优先级为 0,优先级高于队列最大优先级的消息将被视为以队列最大优先级发布的消息

  优先级高的消息先出队列(先被处理),优先级低的消息后出队列(后被处理),优先级相同的则是先进先出

  ACK超时

  ACK超时是一种保护机制,其实可以类比 HTTP 请求超时、数据库连接查询超时

   RabbitMQ 的ACK超时默认是 30 分钟,可以修改配置项 consumer_timeout 进行调整

  至于如何避免ACK超时,需要结合具体的业务选择合适的方式

  示例代码

  spring-boot-rabbitmq

参考

  Classic Queues Support Priorities

  acknowledgement-timeout

标签:优先级,ACK,RabbitMQ,队列,rabbit,org,java,超时
From: https://www.cnblogs.com/youzhibing/p/17935323.html

相关文章

  • HackTheBox - Codify [easy]
    打这台靶机时及其古怪。总是莫名其妙断开连接,请求没有响应。提交时表示flag错误等问题访问80端口的web服务,发现使用nodjs和vm2库。搜索到vm2漏洞:SandboxBypassinvm2|CVE-2023-32314|Snyk 可远程执行代码查看当前用户,可登录使用ssh登录,使用linpeas.sh等工具枚举,发......
  • 阿克曼函数(Ackermann function)部分推导
    相关题目已知\(Ackermannfunction\)为Ack(m,n)={n+1__m=0;Ack(m-1,1)__m>0&&n=0;Ack(m-1,Ack(m,n-1)__m>0&&n>0.}当\(m=1\)时有\(Ack(1,n)\)\(=Ack(0,Ack(1,n-1))=Ack(1,n-1)+1;\)\(=Ack(0,Ack(1,n-2))+1=Ack(1,n-2)+2;\)\(=···\)......
  • RabbitMQ 消息模型
         参考链接:  https://blog.csdn.net/qq_40991313/article/details/126801025?spm=1001.2014.3001.5501 3.5.3.总结描述下Direct交换机与Fanout交换机的差异?   Fanout交换机将消息路由给每一个与之绑定的队列   Direct交换机根据RoutingKey判断路由......
  • jackson序列化问题
    在对对象进行jackson序列化的时候,有时候会出现序列化后的变量名称大小写错误的情况。测试的实体类TestEntity2如下:public class TestEntity2 {    private String aBcd;    private String qWER;    private String qWERty;    private String qWERty......
  • HackTheBox - Drive
    #nmap--top-ports=100010.10.11.235StartingNmap7.94SVN(https://nmap.org)at2024-02-1511:10CSTNmapscanreportfordrive.htb(10.10.11.235)Hostisup(0.12slatency).Notshown:997closedtcpports(reset)PORTSTATESERVICE22/tcpop......
  • hdu 5113 Black And White(DFS染色)
    Problem-5113(hdu.edu.cn)hdu没法提交,我以为我账号又崩了...#include<iostream>#include<cstring>usingnamespacestd;intT,n,m,k,kase;intcolor[30],ans[10][10];boolDFS(intx,inty,intcur){if(x>n)returntrue;for(inti=1;i<=k;i++){......
  • 微信小程序页面跳转:wx.switchTab、wx.reLaunch、wx.redirectTo、wx.navigateTo、wx.na
    引言在微信小程序开发中,页面跳转是一项基础且常用的功能。本文将介绍微信小程序中五种常见的页面跳转方式,并分析它们的使用场景和区别。1.wx.switchTab(跳转主页)wx.switchTab方法用于跳转到tabBar页面,并关闭其他所有非tabBar页面,通常用于跳转到小程序的主页。特点:关闭......
  • 【数据库】postgressql设置数据库执行超时时间
    在这篇文章中,我们将深入探讨PostgreSQL数据库中的一个关键设置:SETstatement_timeout。这个设置对于管理数据库性能和优化查询执行时间非常重要。让我们一起来了解它的工作原理以及如何有效地使用它。什么是statement_timeout?statement_timeout是一个PostgreSQL服务器参数,用于设......
  • 解决jstack的报错:Unable to open socket file
    原文网址:​​解决jstack的报错:Unabletoopensocketfile_IT利刃出鞘的博客-CSDN博客​​简介说明本文介绍解决jstack的报错的方法,报错信息为:Unabletoopensocketfile。分享Java技术星球:​​自学精灵-IT技术星球​​详细报错信息:进程号:Unabletoopensocketfile:......
  • 【Java 并发】【队列应用】【一】ArrayBlockingQueue 的使用-Logback异步日志打印
    1 前言看了那么多Java提供的队列工具,那么我们这节开始看看哪些地方用到了这些队列哈。这一节我们讲解logback异步日志打印中ArrayBlockingQueue的使用。2  异步日志打印模型概述在高并发、高流量并且响应时间要求比较小的系统中同步打印日志已经满足不了需求了,这是因为......