首页 > 其他分享 >由一次生产事故思考队列在实际项目中的应用价值

由一次生产事故思考队列在实际项目中的应用价值

时间:2023-02-08 15:03:30浏览次数:40  
标签:思考 事故 队列 DIRECT RabbitMQ 死信 延时 channel

  话说从一名.Neter 转到Java开发也已经有3年多时间了,期间也积累了一些经验。今天就来谈谈 RabbitMQ 在实际项目中的应用。

  那是2020年的某个周末,突然收到反馈,商城页面打开超级慢,无法下单。这是生产级别的事故啊,这个月的绩效要泡汤了。赶紧查看日志了解情况,经过沟通,了解到这个事故是有时间间隔发生的,通过在阿里云服务器上的排查,发现RDS数据库服务器的CPU在某些时间段已经飙到了100%,这就难怪了。

  既然问题找到了,那就得找出是哪些语句消耗了大量的CPU资源,经过一番排查,确定问题是由前几天新上的促销活动,其中有一个结算的计算逻辑,同事使用了MySQL的Event来实现,每隔1分钟执行一次,正是由于该语句消耗了大量的CPU资源,当天赶紧对该语句进行了优化,优化之后进行了监控,发现CPU的资源消耗降到了70%左右。

  虽然暂时解决了,但考虑到后续通过这种定时任务来执行,实际上还是会存在发生类似事故的隐患。就开始思考,应该把这种定时任务异步化来处理。通过对XXL-JOB上面的定时任务进行梳理,发现类似订单未支付30分钟自动取消、会员续约到期的定时推送,用户注册30天未实名的提醒,拼团未成功自动取消等任务,应该是可以通过延迟消息队列来实现的,这样既能做到异步解耦,也能避免定时轮询查询数据库造成资源紧张。

  既然决定了将这些定时任务修改为使用延迟队列来实现,那接下来就是选择哪个消息队列中间件的问题了。鉴于目前系统已经应用了RabbitMQ来实现了部分异步功能,并且RabbitMQ也自带了延迟消息的功能,加之团队对RabbitMQ的使用也有了一定的经验,自然首选中间件就是RabbitMQ。

  关于RabbitMQ原理方面的一些知识,后面专门找时间写一篇总结,本篇着重介绍使用RabbitMQ如何实现这个延迟队列,来替代某些定时任务。

延时队列的概念:

  顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后被消费。普通队列是即时消费的,延时队列是根据延时时间,多久之后才能消费的。

RabbitMQ实现延时队列的方式:

  • 通过RabbitMQ的高级特性TTL和配合死信队列实现
  • 安装rabbitmq_delayed_message_exchange插件,默认该插件不存在,需要下载安装,下载链接:http://www.rabbitmq.com/community-plugins.html,把下载的插件解压后放到该目录/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5/plugins下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

何为TTL?

  TTL是RabbitMQ中消息或者队列的一个高级特性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。为什么延时队列要介绍它?TTL就是一种消息过期策略。给我们的消息做过期处理,当消息在队列中存活了指定时间之后,该队列就会将这个消息直接丢弃。在RabbitMQ中并没有直接实现好的延时队列,我们可以使用TTL这种高级特性再配合死信队列,就可以实现延时队列的功能。

  有两种方式可以来设置这个TTL,第一种是通过在创建队列的时候就设置队列的 x-message-ttl 属性,使用这种方式,消息被设定TTL值,一旦消息过期,就会被队列丢弃。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

  第二种方式,通过在发送消息的时候,给消息添加 propeties 属性指定过期时间来实现,使用这种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg
body".getBytes());

 RabbitMQ实现延时队列的代码实现:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DelayQueueProductTest {
  /**
   * 延时队列交换机
   */   
    private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
  /**
   * 死信队列交换机
   */     
    private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
  
    /**
     * 延时队列
     */
    private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
    /**
     * 死信队列
     */
    private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
    /**
     * 延时队列ROUTING_KEY
     */
    private static final String DIRECT_DELAY_ROUTING_KEY =
            "delay.queue.routingKey";
    /**
     * 死信队列ROUTING_KEY
     */
    private static final String DIRECT_DEAD_ROUTING_KEY =
            "dead.queue.routingKey";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        Connection connection = createConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        sendMsg(channel);
        Thread.sleep(10000);
        closeConnection(connection, channel);
    }

    private static void sendMsg(Channel channel) throws IOException {
        // 创建延时交换器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY,
                BuiltinExchangeType.DIRECT);
        Map<String, Object> map = new HashMap<>(16);
        // 在延时交换器上指定死信交换器
        map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
        // 在延时交换器上指定死信队列的routing-key
        map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
        // 设定延时队列的延长时长 10s
        map.put("x-message-ttl", 10000);
        // 创建延时队列
        channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map);
        // 在延时交换器上绑定延时队列
        channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY,
                DIRECT_DELAY_ROUTING_KEY);
        // 创建死信交换器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC,
                true, false, null);
        // 创建死信队列
        channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null);
        // 在死信交换器上绑定死信队列
        channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD,
                DIRECT_DEAD_ROUTING_KEY);
        channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY,
                null, "hello world".getBytes());
    }

    private static void closeConnection(Connection connection, Channel channel)
            throws IOException, TimeoutException {
        // 关闭资源
        channel.close();
        connection.close();
    }

    private static Connection createConnection() throws IOException,
            TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 和RabbitMQ建立一个链接
        return factory.newConnection();
    }
}
  到这里,不难发现,RabbitMQ实现延时队列无非是利用了TTL这个特性,让消息在过期的时候丢弃到指定队列,死信队列其实也是一个普通队列。

 

标签:思考,事故,队列,DIRECT,RabbitMQ,死信,延时,channel
From: https://www.cnblogs.com/lh1315/p/17026694.html

相关文章

  • 关于一次close socket函数bug事故引发的思考
    前因后果昨天一下午肝完了作业1-12全部部分,就剩下13了。这次作业整体上较为简单,就是写一个webserver,然后用各种IO和IPC去实现多线程,多进程以及线程和进程的通信。这完全属......
  • 队列的顺序存储
      #include<iostream>usingnamespacestd;#defineMAXSize10typedefintElemtype;typedefstruct{ Elemtypedata[MAXSize]; intfront,rear;}SqQueue;boolIsEmp......
  • 【tyvj1305】最大子序和(单调队列)
    problem给你一个长为n的序列求一个长不超过m的连续子段,使子段和最大solution如果n<=10^3,我们很容易写出枚举(s是前缀和,区间[l,r]的和就是s[r]-s[l-1]。枚举l,r即可。for(int......
  • 【POJ2259】Team Queue(队列,模拟)
    problem有n个小组,进行排队。当一个人来到队伍时,若队伍中有自己小组成员时,他就直接站到其后面如果没有,则站到队伍最后面,形成自己小组的第一个入队元素。出队列时,给出出队指令......
  • 阻塞式并发队列
    --BlockingQueue:阻塞式队列--可以实现生产者消费者模式 --LinkedBQ:链表实现    --ArrayBQ:数组实现,有限队列    --Delay......
  • 增量上线java代码引发的生产事故
    事情起因某一天晚上,运维说由于业务流程问题,智能运维告警太多,需要处理,同事就改了一点与业务无关的代码,直接把class文件扔上去。到了早上9点,上游通知整个上午都下单失败,吓得......
  • 12.6用程序来表示人类的思考方式
    到目前为止,我们已经用程序表示了直觉、想法、习惯以及经验等。不过,除此之外,人类还有一个思考方式。思考方式是思考方法的节奏。人类大脑中有类似于“石头、石头、布、剪刀......
  • 让计算机”思考“——12.1作为”工具“的程序和为了”思考“的程序
    程序就如同是由计算机执行的各种指令罗列起来的文章。计算机内部的CPU,通过对该文章的内容进行解析和运行,来控制连接到计算机的各种外围设备。具体来说,控制就是指CPU和各......
  • 12.2用程序来表示人类的思考方式
    那么,如何才能让计算机思考呢?接下来,我们就一边用C语言制作《猜拳游戏》,一边来尝试各种思考方式。在猜拳游戏中,程序需要让计算机像猜拳选手一样来思考。因此,为了制作该游......
  • 12.3用程序来表示人类的思考习惯
    即使是成年人,可能偶尔也会像代码清单12-1这样猜拳时随意决定出什么。不过,并不是所有人都如此。例如,“小A同学喜欢出石头”,像这样,出拳习惯是因人而异的。习惯也是人类的......