首页 > 其他分享 >RabbitMq延迟队列

RabbitMq延迟队列

时间:2022-11-16 14:13:52浏览次数:65  
标签:队列 RabbitMq QUEUE 死信 TTL message public 延迟

RabbitMq延迟队列

延迟队列概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列使用场景

  1. 订单在三十分钟之内未支付则自动取消。
  2. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  3. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

RabbitMQ 中的 TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间。单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

消息设置TTL

队列设置TTL

两者的区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

延迟队列实战

创建一个 SpringBoot项目

添加依赖

<!-- rabbitmq依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>2.0.18</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<!-- rabbitMq 测试依赖-->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

添加配置信息

spring:
  rabbitmq:
    # 自己的ip地址
    host: localhost
    # 端口号
    port: 5672
    # 用户名
    username: zjh
    # 密码
    password: zjh

队列 TTL

代码架构图

创建两个队列 QUEUE_A 和 QUEUE_B,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 NORMAL_EXCHANGE 和死信交换机 DEAD_EXCHANGE,它们的类型都是direct,创建一个死信队列 DEAD_QUEUE,它们的绑定关系如下:

声明队列、交换机以及绑定他们的关系

/**
 * @author zjh
 */
@Configuration
public class TtlQueueConfig {

    /**
     * 普通交换器名称
     */
    public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";

    /**
     * 死信交换器名称
     */
    public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE";

    /**
     * 普通队列名称A
     */
    public static final String QUEUE_A = "QUEUE_A";

    /**
     * 普通队列名称B
     */
    public static final String QUEUE_B = "QUEUE_B";

    /**
     * 死信队列名称
     */
    public static final String DEAD_QUEUE = "DEAD_QUEUE";


    /**
     * 声明普通交换机
     */
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    /**
     * 声明死信交换机
     */
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_EXCHANGE);
    }

    /**
     * 声明普通队列 A TTL为 10s
     */
    @Bean
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(8);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置routingKey
        arguments.put("x-dead-letter-routing-key", "dead");
        // 设置过期时间 单位:ms
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A)
                .withArguments(arguments)
                .build();
    }

    /**
     * 声明普通队列 B TTL为 40s
     */
    @Bean
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(8);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置routingKey
        arguments.put("x-dead-letter-routing-key", "dead");
        // 设置过期时间 单位:ms
        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B)
                .withArguments(arguments)
                .build();
    }

    /**
     * 声明死信队列
     */
    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /**
     * 普通交换器和队列A绑定
     */
    @Bean
    public Binding aQueueBindingNormalExchange(@Qualifier("queueA") Queue queueA,
                                               @Qualifier("normalExchange") DirectExchange normalExchange){
        return BindingBuilder.bind(queueA).to(normalExchange).with("A");
    }

    /**
     * 普通交换器和队列B绑定
     */
    @Bean
    public Binding bQueueBindingNormalExchange(@Qualifier("queueB") Queue queueB,
                                              @Qualifier("normalExchange") DirectExchange normalExchange){
        return BindingBuilder.bind(queueB).to(normalExchange).with("B");
    }

    /**
     * 死信交换器和死信队列绑定
     */
    @Bean
    public Binding deadQueueBindingDeadExchange(@Qualifier("deadQueue") Queue deadQueue,
                                               @Qualifier("deadExchange") DirectExchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
    }
}

生产者代码

/**
 * @author zjh
 */
@Api(value = "发送消息")
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public SendMsgController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @ApiOperation(value = "开始发消息", httpMethod = "GET")
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{}, 发送一条信息给两个ttl队列:{}", LocalTime.now(), message);

        rabbitTemplate.convertAndSend("NORMAL_EXCHANGE", "A", "消息来自ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("NORMAL_EXCHANGE", "B", "消息来自ttl为40s的队列:" + message);
    }
}

消费者代码

/**
 * @author zjh
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    /**
     * 接收消息
     */
    @RabbitListener(queues = "DEAD_QUEUE")
    public void receiveDeadQueue(Message message, Channel channel) {
        // 获取消息
        String msg = new String(message.getBody());
        log.info("当前时间:{}, 收到死信队列的消息:{}", LocalTime.now(), msg);
    }
}

测试

请求地址:http://localhost:8080/ttl/sendMsg/测试消息

结果

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

延时队列优化

代码架构图

在这里新增了一个队列 QUEUE_C ,绑定关系如下, 该队列不设置TTL 时间

在上方 TtlQueueConfig 类中增加以下配置

/**
 * 普通队列名称C
 */
public static final String QUEUE_C = "QUEUE_C";

/**
 * 声明普通队列C
 */
@Bean
public Queue queueC(){
    Map<String, Object> arguments = new HashMap<>(8);
    // 设置死信交换机
    arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    // 设置routingKey
    arguments.put("x-dead-letter-routing-key", "dead");
    return QueueBuilder.durable(QUEUE_C)
            .withArguments(arguments)
            .build();
}

/**
 * 普通交换器和队列C绑定
 */
@Bean
public Binding cQueueBindingNormalExchange(@Qualifier("queueC") Queue queueC,
                                           @Qualifier("normalExchange") DirectExchange normalExchange){
    return BindingBuilder.bind(queueC).to(normalExchange).with("C");
}

消息生产者代码

ttlTime单位:ms

@ApiOperation(value = "发送过期时间 TTL", httpMethod = "GET")
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg (@PathVariable String message, @PathVariable String ttlTime) {
    // RabbitMq 只会检查第一个消息是否过期, 如果过期就会丢到死信队列, 如果第一个消息延时时长很长, 而第二个消息延时时长很短, 第二个并不会优先得到执行

    log.info("当前时间:{}, 发送一时长{}毫秒TTL信息给队列QUEUE_C:{}",
            LocalTime.now(), ttlTime, message);

    rabbitTemplate.convertAndSend("NORMAL_EXCHANGE", "C", message, msg -> {
        // 发送消息的时候 延迟时长
        msg.getMessageProperties().setExpiration(ttlTime);
        return msg;
    });
}

测试

请求地址:http://localhost:8080/ttl/sendExpirationMsg/你好20000毫秒/20000
请求地址:http://localhost:8080/ttl/sendExpirationMsg/你好2000毫秒/2000

结果

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

Rabbitmq 插件解决上述问题

安装延时队列插件

官网地址 https://www.rabbitmq.com/community-plugins.html 下载rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录,我这边是 windows下的rabbitmq,插件版本下载为 3.8.0, 具体安装步骤麻烦自行百度...

安装成功后可以在 Add Exchange 中看到以下选项

代码架构图

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

声明队列、交换机以及绑定他们的关系

/**
 * @author zjh
 */
@Configuration
public class DelayedQueueConfig {

    /**
     * 延迟交换器名称
     */
    public static final String DELAYED_EXCHANGE = "delayed.exchange";

    /**
     * routingKey
     */
    public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";

    /**
     * 延迟队列名称
     */
    public static final String DELAYED_QUEUE = "delayed.queue";

    /**
     * 声明队列
     */
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE);
    }

    /**
     * 声明交换机
     */
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>(8);
        arguments.put("x-delayed-type", "direct");

        /*
            1.交换器的名称
            2.交换机的类型
            3.是否需要持久化
            4.是否需要自动删除
            5.其他的参数
         */
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message",
                true, false, arguments);
    }

    /**
     * 绑定
     */
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange") CustomExchange delayedExchange){

        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

消息生产者代码

@ApiOperation(value = "开始发消息 基于插件的", httpMethod = "GET")
@GetMapping("/sendDelayedMsg/{message}/{delayedTime}")
public void sendDelayedMsg (@PathVariable String message, @PathVariable Integer delayedTime) {

    log.info("当前时间:{}, 发送一时长{}毫秒信息给延迟队列delayed.queue:{}",
            LocalTime.now(), delayedTime, message);

    rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routing.key", message, msg -> {
        // 发送消息的时候 延迟时长 单位ms
        msg.getMessageProperties().setDelay(delayedTime);
        return msg;
    });
}

消息消费者代码

/**
 * @author zjh
 */
@Slf4j
@Component
public class DelayedQueueConsumer {

    /**
     * 监听消息
     * @param message 消息
     */
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE)
    public void receiveDelayedQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{}, 收到延迟队列的消息:{}", LocalTime.now(), msg);
    }
}

测试

请求地址:http://localhost:8080/ttl/sendDelayedMsg/你好20000毫秒/20000
请求地址:http://localhost:8080/ttl/sendDelayedMsg/你好2000毫秒/2000

结果

可以看到第二个消息被先消费掉了,符合预期。

标签:队列,RabbitMq,QUEUE,死信,TTL,message,public,延迟
From: https://www.cnblogs.com/zjh0420/p/16895248.html

相关文章

  • 「Java数据结构」手撕数组队列及环形数组队列。
    目录​​一、队列​​​​1、基本介绍​​​​2、示意图​​​​3、队列的特点​​​​二、数组模拟队列​​​​1、数组队列初始化​​​​2、判断方法​​​​3、增删改查......
  • 昭通玉溪物理机租用供应线路稳定,延迟低
    而这个时候你就会发现,你需要一个小型企业服务器。但是,从哪里开始呢?毕竟服务器并不是一种万能得解决方案,你需要选择适合你企业发展的服务器类型才行。为了能够你充分了解自......
  • 昭通玉溪物理机托管,线路稳定,延迟低
    当然,选择多功能的服务器是最佳的,这样你就不必只将其用于一个目的而已。你需要了解哪些操作和功能有利于你目前业务需求的发展,根据优先级去选择,选择和你业务需求相匹配度更......
  • 昭通玉溪高防物理机,线路稳延迟低
    很多创业者在创业初期都会发现,很多事情其实都可以很简单地去处理。比如说,如果团队之间需要共享文件,通过发送电子邮件或者用U盘分享即可;亦或者如果需要备份数据,则通过插入外......
  • RabbitMQ
    RabbitMQ1.初识MQ1.1.同步和异步通讯微服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像发邮件,不需要马上回复。两种方式各有优劣,打电......
  • 080_阻塞队列 BlockingQueue
    目录简介演示代码抛出异常add()添加元素队列已满时抛出异常remove()移除元素为空时抛出异常有返回值,不抛出异常offer()添加元素队列已满时返回false不抛异常poll()移除......
  • RabbitMQ
    RabbitMQ使用场景服务解耦假设有这样一个场景,服务A产生数据,而服务B,C,D需要这些数据,那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可但是,随着......
  • RabbitMq死信队列
    RabbitMq死信队列实战中代码获取信道可参考https://www.cnblogs.com/zjh0420/p/16891557.html死信的概念死信:顾名思义就是无法被消费的消息,字面意思可以这样理解,一般......
  • rabbitmq集群部署-镜像模式
    一、环境RabbitMQ与Erlang的兼容关系详见:https://www.rabbitmq.com/which-erlang.htmlrabbitmq集群最好是奇数节点,所以一般需要3台设备以上。操作系统:CentOS7Erlang:erl......
  • 225. 用队列实现栈
    225.用队列实现栈请你仅使用两个队列实现一个后入先出(LIFO)的栈,并支持普通栈的全部四种操作(push、top、pop和empty)。实现MyStack类:voidpush(intx)将元素x压入......