首页 > 其他分享 >【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例

【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例

时间:2023-04-09 22:05:44浏览次数:34  
标签:插件 示例 队列 args rabbitmq 死信 交换机 message public


文章目录

  • 设置队列ttl
  • 配置文件
  • 生产者
  • 消费者
  • 设置消息ttl
  • 延迟插件的使用
  • 修改配置文件
  • 修改生产者
  • 修改消费者

设置队列ttl

代码架构:

【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例_spring


创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD

配置文件

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
@Configuration
public class Rabbitmqconfig {
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB"; 
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; 
    public static final String DEAD_LETTER_QUEUE = "QD";
    // 声明xExchange
   @Bean("xExchange")
   public DirectExchange xExchange(){
       return new DirectExchange(X_EXCHANGE); } // 声明xExchange
   @Bean("yExchange")
   public DirectExchange yExchange(){
       return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); }
    //声明队列A ttl为10s并绑定到对应的死信交换机
    @Bean("queueA")
    public Queue queueA()
    {
        Map<String,Object> args = new HashMap<String,Object>();
        args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明当前队列绑定的死信交换机
        args.put("x-dead-letter-routing-key","YD");
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }
    @Bean
    public Binding queueaBingX(@Qualifier("queueA")Queue queueA,
                               @Qualifier("xExchange")DirectExchange exchange)
    {
        return BindingBuilder.bind(queueA).to(exchange).with("XA");//通过XA路由键让交换机与队列A绑定
    }
    //声明队列A ttl为40s并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB()
    {
        Map<String,Object> args = new HashMap<String,Object>();
        args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明当前队列绑定的死信交换机
        args.put("x-dead-letter-routing-key","YD");
        args.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }
    @Bean
    public Binding queuebBingX(@Qualifier("queueB")Queue queueA,
                               @Qualifier("xExchange")DirectExchange exchange)
    {
        return BindingBuilder.bind(queueA).to(exchange).with("XB");//通过XB路由键让交换机与队列B绑定
        //这里队列A和队列B绑定的是同一个交换机
    }
    @Bean("queueD")
    public Queue queueD()
    {
        return new Queue(DEAD_LETTER_QUEUE);//声明死信队列
    }
    @Bean
    //死信队列与交换机通过yd路由键绑定 这里队列绑定的是y交换机而不是x交换机,上面两个队列绑定的是x交换机
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange){
       return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者

@RestController
public class Producer {
@Autowired
    private RabbitTemplate rabbitTemplate;
        @GetMapping("sendMsg/{message}")
        public void sendMsg(@PathVariable String message){
            System.out.println("当前时间"+new Date().toString()+" 发送的消息:"+message);
    rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10的队列"+message);
    rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40的队列"+message);
        }
}

消费者

@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = "QD")
    //配置文件已经声明了死信队列 Queue("QD");//声明死信队列
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("当前时间:" +new Date().toString()+",收到死信队列信息"+msg); }
}

启动项目:

【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例_配置文件_02


【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例_java_03


控制台:

【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例_java_04

设置消息ttl

上面是对队列属性设置了过期时间,但如果有很多数据需要设置不同的过期时间则需要很多队列,这样明显浪费不必要的内存,这里也可以对消息设置不同过期时间:
再定义一个新队列,这里队列不再设置ttl属性:

@Bean("queueC")
  public Queue queueC(){
      Map<String, Object> args = new HashMap<>();
      //声明当前队列绑定的死信交换机
   args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由key
      args.put("x-dead-letter-routing-key", "YD"); //没有声明TTL属性
       return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
  }
    //声明队列B绑定X交换机
    @Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                                        @Qualifier("xExchange") DirectExchange xExchange)
    {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC"); 
  }

修改生产者:

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime)
    { 
        rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
        correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData; });
        System.out.println("当前时间:{}"+ new Date().toString()+"发送一条时长"+ttlTime+"毫秒TTL信息给队列C:"
                + message); }

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

延迟插件的使用

官网https://www.rabbitmq.com/community-plugins.html

下载:

【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例_配置文件_05


https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

允许使用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例_自定义_06


修改绑定关系:

【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例_自定义_07

修改配置文件

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@Configuration 
public class DelayedQueueConfig {
    //自定义交换机 我们在这里定义的是一个延迟交换机
   @Bean 
  public CustomExchange delayedExchange() 
   { 
       Map<String, Object> args = new HashMap<>(); 
       //自定义交换机的类型 
    args.put("x-delayed-type", "direct"); 
    return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } 
    @Bean 
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, 
            @Qualifier("delayedExchange") CustomExchange delayedExchange) 
    {
      return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    } }

修改生产者

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {
                    correlationData.getMessageProperties().setDelay(delayTime);
                    return correlationData;
                }
        );
    }

修改消费者

public static final String DELAYED_QUEUE_NAME = "delayed.queue";

 @RabbitListener(queues = DELAYED_QUEUE_NAME) 

 public void receiveDelayedQueue(Message message){ 
 
 String msg = new String(message.getBody()); 
 
 log.info("当前时间:{},收到延时队列的消息:{}", 
 new Date().toString(), msg); 
 }

小结:
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景


标签:插件,示例,队列,args,rabbitmq,死信,交换机,message,public
From: https://blog.51cto.com/u_15980129/6179216

相关文章

  • Blender动画节点插件用户指南-接口
    推荐:将 NSDT场景编辑器 加入你的3D开发工具链。接口大部分界面位于节点编辑器中,而其余部分该界面存在于其他区域,例如3D视口。动画节点有自己的节点编辑器,您可以通过以下方式选择单击“涂料表”图标在工具栏中。一个新的可以通过单击“新建”按钮来添加节点树。节点......
  • RabbitMQ 10 头部模式
    头部模式是根据头部信息来决定的,在发送的消息中是可以携带一些头部信息的(类似于HTTP),可以根据这些头部信息来决定路由到哪一个消息队列中。定义配置类。importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.sprin......
  • 欢迎使用园子的 vscode 插件
    为了方便大家通过vscode编辑博文,我们做了一个小插件,插件名称是“博客园Cnblogs客户端”,插件列表中搜索“博客园”或者"cnblogs"可以找到。插件商店下载地址:https://marketplace.visualstudio.com/items?itemName=cnblogs.vscode-cnb插件安装好之后会在activitybar中出现......
  • vscode launch&attach及常用插件使用必备指南
    .vscode下创建文件launch.json{//使用IntelliSense了解相关属性。//悬停以查看现有属性的描述。//欲了解更多信息,请访问:https://go.microsoft.com/fwlink/?linkid=830387"version":"0.2.0","configurations":[{"na......
  • 描述符示例详解
    代码这里要创建一个描述符,根据要求(如隐藏敏感信息、正确地设置日期的格式)对属性的值进行变换,并返回修改后的版本:fromdataclassesimportdataclassfromdatetimeimportdatetimefromfunctoolsimportpartialfromtypingimportCallableclassBaseFieldTransformatio......
  • 『0002』 - Atom编辑器编写智能合约(Smart Contract)插件安装配置
    作者:黎跃春,编辑器选择理论上讲任何编辑器都可以编写Solidity合约代码,比如:WebStorm,VSCode,Sublime,等等。我选择的是Atom,没有任何理由,因为Atom轻量并且界面漂亮。移步https://atom.io/地址,下载安装Atom。autocomplete-solidity代码自动补齐linter-solium、linter-solidity代码错误检查......
  • IDEA-常用设置及好用插件记录
    1、常用设置设置开启选择工作空间 设置工作空间编码集    2、好用插件......
  • VSCode插件 : ESLint Prettier Vite
    #创建项目pnpmcreatevitevite-eslint--templatevuecdvite-eslintpnpminstallpnpmrundev#安装插件pnpmiprettier-D#创建配置文件echo{}>.prettierrc.json#安装ESLintpnpmieslinteslint-plugin-vue-D#安装eslint-config-prettierpnpm......
  • 7个最新的时间序列分析库介绍和代码示例
    时间序列分析包括检查随着时间推移收集的数据点,目的是确定可以为未来预测提供信息的模式和趋势。我们已经介绍过很多个时间序列分析库了,但是随着时间推移,新的库和更新也在不断的出现,所以本文将分享8个目前比较常用的,用于处理时间序列问题的Python库。他们是tsfresh,autots,darts......
  • 4.简单示例提示
    笔记软件在2023/4/614:01:12推送该笔记简单示例提示如果出错的开始点和结束点在同一个位置,VSCode会在那个单词的位置上打上波浪线如果你想要把波浪线加到行未为止,就把endposition​设置为Number.MAX_VALUE​运行语言服务器步骤:通过快捷键(Ctrl+Shift+B)启动build任务。......