首页 > 其他分享 >利用RabbitMQ 的死信队列来做定时任务

利用RabbitMQ 的死信队列来做定时任务

时间:2023-07-09 18:32:39浏览次数:37  
标签:DL return 队列 RabbitMQ 死信 交换机 import

常用的应用场景

死信队列常常用作延时关闭订单(如订单的超时后的取消订单等),虽然小项目中可以用定时轮询的方法进行检查,但是数据量一旦比较大时,定时轮询将给数据库带来不小的压力,而且定时间隔无法进行动态调整,特别是一个系统中,同时存在好几个定时器的时候,就显得非常的麻烦,同时给数据库造成巨大的访问压力。这时候就可以使RabbitMQ的死信队列。

概念解释

DLX

  • Dead Letter Exchange 的缩写
  • DLX也叫死信邮箱(网上的译法),死信交换机(字面翻译)。归根结底就是一个交换机,当队列中出现死信时,通过这个交换机将死信重新发送到死信队列中(指定好rabbitmq会自动发送)。

什么是死信

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

死信交换机

  • 在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。

什么是死信队列

  • 死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已。

消息变成死信后,会被重新投递(publish)到另一个交换机上(Exchange),这个交换机就被称作DLX及死信交换机,然后交换机根据绑定规则转发到对应的队列上,监听该队列就可以被重新消费。

生产者-->发送消息-->交换机-->队列-->变成死信队列-->DLX交换机-->队列-->监听-->消费者

示例代码

添加依赖

implementation 'org.springframework.boot:spring-boot-starter-amqp'

添加配置

spring:
  rabbitmq:
    host: 10.0.0.19
    port: 5672
    username: guiyun
    password: 111222

添加配置类

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Guiyun
 * @date 2020/1/7 下午 1:51
 */
@Configuration
public class RabbitConfig{

    /**
     * 死信队列 交换机标识符
     */
    private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    /**
     * 死信队列 交换机绑定键标识符
     */
    private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";


    /**
     * 创建死信交换机
     * @return
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }


    /**
     * 创建一个死信队列.
     * @return
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
        //声明 死信交换机
        args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
        //声明 死信路由键
        args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 定义死信队列转发队列.
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * 死信路由通过 KEY_R 绑定键绑定到死信队列上.
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }

}

生产者向业务队列发送消息

@Autowired
RabbitTemplate rabbitTemplate;

public void spend(JSONObject data) {
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    MessagePostProcessor messagePostProcessor = message -> {
        MessageProperties messageProperties = message.getMessageProperties();
        // 设置编码
        messageProperties.setContentEncoding("utf-8");
        // 设置过期时间5秒
        messageProperties.setExpiration(5000);
        return message;
    };
    rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", data, messagePostProcessor, correlationData);
}

死信队列消费者

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author Guiyun
 * @date 2020/1/7 上午 11:09
 */
@Component
@RabbitListener(queues = "DL_QUEUE")
public class DeadLetterConsumer {

    private static final Logger log = LoggerFactory.getLogger(DeadLetterConsumer.class);


    @RabbitListener(queues = {"REDIRECT_QUEUE"})
    public void redirect(JSONObject json) throws IOException {
        log.info(json.toJSONString());
    }
}


标签:DL,return,队列,RabbitMQ,死信,交换机,import
From: https://blog.51cto.com/zhangzhixi/6668691

相关文章

  • ds:队列的基本实现
     一.顺序队1.入队判断队满,出队判断队空;2.顺序队定义时,要注意front、rear是下标,不是指针。typedefstruct{intdata[maxsize];intrear,front;//front:队头元素的下标。rear:队尾元素的后一个位置的下标(下一个待插入的位置),}sqListQueue;3,如果判断队......
  • centos7 安装 rabbitmq
    1、下载RabbitMQ安装包(请自行下载erlang和对应版本的rabbitmq)2、上传安装包到Linux中将上面三个软件上传到/usr/local/software目录下(如果没有software需要自己创建)3、安装文件(分别按照以下顺序安装)进入software文件夹,依次使用如下命令 rpm-ivherlang-21.3-1.el7.......
  • 2023-07-08:RabbitMQ如何做到消息不丢失?
    2023-07-08:RabbitMQ如何做到消息不丢失?答案2023-07-08:1.持久化发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。2.确认机制消费者通过basic.ack命令向......
  • RabbitMQ基本配置
    1.用户角色配置自带的guest/guest超级管理员五中不同角色配置:普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。策略制定者(policymaker):可登陆管理控制台,同时可以对policy进行管理。但无法查看节点的相关信息。监控者(monitoring):登录......
  • 谈谈队列(Queue)
    写在前面蒟蒻发第二篇博客了!作者依然是个新手,依然没有脑子,因此本文可能存在大量不足之处,还请多多指教。对于各种错误,欢迎批评指正!队列队列(Queue),是一种数据结构,在STL中可直接调用。具体地来说,队列是一种操作受限的线性表,只允许在表的一端进行插入,而在表的另一端进行删除。这也......
  • 2023年7月7日,线程池的调用原理,线程池底层,任务队列
    线程池的调用原理线程池的七大参数:核心线程数、最大线程数、任务队列、拒绝策略、闲置时间、时间单位、线程工厂任务进入线程池后线程池的执行顺序:核心线程(用完)---处理完一个任务后会取出任务队列中的第一个任务来执行任务队列(装满)普通线程(用完)拒绝策略深入线程池ExecutorServicep......
  • 消息队列-八股文
    消息队列选型-√kafka:优点:吞吐量高,性能高缺点:功能单一,有丢失消息的风险rocketMQ:优点:功能完善,性能好缺点:客户端仅支持JavaRocketMQ事务消息实现-※RocketMQ底层实现原理-※消息队列如何保证可靠传输可靠传输:不能多不能少1.消费者实现幂等性,哪怕多收消息,......
  • RabbitMq
    1,RabbitMq简介是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。官网安装2,RabbitMq几个术语1.Exchange-交换......
  • BZOJ 1915: [Usaco2010 Open]奶牛的跳格子游戏 单调队列优化dp
    1915:[Usaco2010Open]奶牛的跳格子游戏TimeLimit: 4Sec  MemoryLimit: 64MBSubmit: 281  Solved: 110[Submit][Status][Discuss]Description奶牛们正在回味童年,玩一个类似跳格子的游戏,在这个游戏里,奶牛们在草地上画了一行N个格子,(3<=N<=250,000),编号为1..N......
  • 2023-07-06:RabbitMQ中的AMQP是什么?
    2023-07-06:RabbitMQ中的AMQP是什么?答案2023-07-06:AMQPAMQP(AdvancedMessageQueuingProtocol)是一个应用层协议的开放标准,旨在设计面向消息的中间件。基于AMQP协议的客户端和消息中间件可以自由地传递消息,不受客户端、中间件产品或开发语言的限制。其目标是实现一种被广泛应用......