首页 > 其他分享 >rabbitmq

rabbitmq

时间:2023-07-09 20:55:42浏览次数:35  
标签:return String rabbitmq Bean msg message public

一.死信队列

1.Config配置类

package com.yufou.studyrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: YuFou
 * @Description: ttl队列
 * @Date: Created in 16:18 2023/7/9
 */
@Configuration
public class TtlQueueConfig {
    //普通交换机
    public static final String X_EXCHANGE = "X";
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    public static final String QUEUE_C = "QC";
    //死信队列
    public static final String DEAD_LETTER_QUEUE_D = "QD";

    @Bean
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    @Bean
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    @Bean("qa")
    public Queue qa(){
        return QueueBuilder
                .durable(QUEUE_A)
                .deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey("YD")
                .ttl(10000) //(创建Queue时设置ttl延迟时间)
                .build();
    }

    @Bean("qb")
    public Queue qb(){
        return QueueBuilder
                .durable(QUEUE_B)
                .deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey("YD")
                .ttl(40000)
                .build();
    }
    @Bean("qc")
    public Queue qc(){
        return QueueBuilder
                .durable(QUEUE_C)
                .deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey("YD")
                .build();
    }
    @Bean("qd")
    public Queue qd(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
    }

    @Bean
    public Binding queueABingX(@Qualifier("qa") Queue qa,
                               @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(qa).to(xExchange).with("XA");
    }

    @Bean
    public Binding queueBBingX(@Qualifier("qb") Queue qb,
                               @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(qb).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueCBingX(@Qualifier("qc") Queue qc,
                               @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(qc).to(xExchange).with("XC");
    }
    @Bean
    public Binding queueDBingY(@Qualifier("qd") Queue qd,
                               @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(qd).to(yExchange).with("YD");
    }
}

2.Controller发送消息

 @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable("message") String msg) {
        log.info("当前时间:{},发送一条消息给两个TTL队列,消息是:{}",new Date(),msg);
        rabbitTemplate.convertAndSend("X","XA","消息来自延迟10秒TTL队列:"+msg);
        rabbitTemplate.convertAndSend("X","XB","消息来自延迟40秒TTL队列:"+msg);
    }
    @GetMapping("/sendMsg/{message}/{ttl}")
    public void sendMessageTTL(@PathVariable("message") String msg,
                               @PathVariable("ttl") String ttl) {
        log.info("当前时间:{},发送一条延迟{}秒的消息给TTL队列,消息是:{}",new Date(),Integer.valueOf(ttl),msg);
        rabbitTemplate.convertAndSend("X","XC",msg,message -> {
            message.getMessageProperties().setExpiration(String.valueOf(Integer.valueOf(ttl)*1000));
            return message;
        });
    }

3.Consumer消费消息

@Component
@Slf4j
public class DeadLetterConsumer {
    @RabbitListener(queues = "QD")
    public void receiveDDLMessage(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间为:{},接收到死信队列的消息:{}",new Date(),msg);
    }
}

二.基于插件的延迟队列

1.Config配置类

package com.yufou.studyrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @Author: YuFou
 * @Description: 基于插件的延迟队列
 * @Date: Created in 18:20 2023/7/9
 */
@Configuration
public class DelayQueueConfig {
    //交换机
    public static final String DELAYED_EXCHANGE = "delayed.exchange";
    //队列
    public static final String DELAYED_QUEUE = "delayed.queue";
    //routingkey
    public static final String DELAYED_ROUTINGKEY = "delayed.routingKey";

    // CustomExchange 自定义交换机
    @Bean("delayedExchange")
    public CustomExchange delayedExchange() {
//        Map<String, Object> arguments = new HashMap<>();
//        arguments.put("x-delayed-type", "direct");
        CustomExchange customExchange = new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, null);
        customExchange.getArguments().put("x-delayed-type", "direct");
        return customExchange;
    }

    @Bean("delayedQueue")
    public Queue delayedQueue() {
       return QueueBuilder.durable(DELAYED_QUEUE)
                .build();
    }

    @Bean
    public Binding delayedQueueBindDelayedExchange(){
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTINGKEY).noargs();

    }
}

2.Controller发送消息

@GetMapping("/sendDelayMsg/{msg}/{delayTime}")
    public void sendDelayMessage(@PathVariable("msg") String msg,
                                 @PathVariable("delayTime") Integer delayTime){
        log.info("当前时间:{},发送一条延迟{}秒的消息给delay队列,消息是:{}",new Date(),delayTime,msg);
        rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE,
                DelayQueueConfig.DELAYED_ROUTINGKEY,
                msg, message -> {
            message.getMessageProperties().setDelay(delayTime);
            return message;
        });
    }

3.Consumer消费消息

@Slf4j
@Component
public class DelayedConsumer {
    @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE)
    public void receiveDelayQueueMsg(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间为:{},接收到延迟队列的消息:{}", new Date(), msg);
    }
}

标签:return,String,rabbitmq,Bean,msg,message,public
From: https://www.cnblogs.com/yufou/p/17539361.html

相关文章

  • 利用RabbitMQ 的死信队列来做定时任务
    常用的应用场景死信队列常常用作延时关闭订单(如订单的超时后的取消订单等),虽然小项目中可以用定时轮询的方法进行检查,但是数据量一旦比较大时,定时轮询将给数据库带来不小的压力,而且定时间隔无法进行动态调整,特别是一个系统中,同时存在好几个定时器的时候,就显得非常的麻烦,同时给数据......
  • centos7 安装 rabbitmq
    1、下载RabbitMQ安装包(请自行下载erlang和对应版本的rabbitmq)2、上传安装包到Linux中将上面三个软件上传到/usr/local/software目录下(如果没有software需要自己创建)3、安装文件(分别按照以下顺序安装)进入software文件夹,依次使用如下命令 rpm-ivherlang-21.3-1.el7.......
  • 2023-07-08:RabbitMQ如何做到消息不丢失?
    2023-07-08:RabbitMQ如何做到消息不丢失?答案2023-07-08:1.持久化发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。2.确认机制消费者通过basic.ack命令向......
  • RabbitMQ基本配置
    1.用户角色配置自带的guest/guest超级管理员五中不同角色配置:普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。策略制定者(policymaker):可登陆管理控制台,同时可以对policy进行管理。但无法查看节点的相关信息。监控者(monitoring):登录......
  • RabbitMq
    1,RabbitMq简介是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。官网安装2,RabbitMq几个术语1.Exchange-交换......
  • 2023-07-06:RabbitMQ中的AMQP是什么?
    2023-07-06:RabbitMQ中的AMQP是什么?答案2023-07-06:AMQPAMQP(AdvancedMessageQueuingProtocol)是一个应用层协议的开放标准,旨在设计面向消息的中间件。基于AMQP协议的客户端和消息中间件可以自由地传递消息,不受客户端、中间件产品或开发语言的限制。其目标是实现一种被广泛应用......
  • Linux安装RabbitMQ详细教程
    一、环境准备1、RabbitMQ版本和Erlang版本兼容性关系https://www.rabbitmq.com/which-erlang.html2、ErLang安装教程https://blog.csdn.net/laterstage/article/details/131513793?spm=1001.2014.3001.55013、RabbitMQ的安装依赖于erlang所以先安装4、RabbitMQ下载链接weg......
  • rabbitmq在springboot中实战技巧
    一.简介rabbitmq是基于AMQP(AdvancedMessageQueuingProtocol:高级消息队列协议),采用Erlang语言编写的消息队列。二、mq能用来做什么异步处理:将非核心业务(比如日志、邮件、监控等)从主流程剥离,提升主流程的响应时效。削峰:当并发大的情况下,可以将消息暂存在消息队列中,消费者按照......
  • docker启动RabbitMQ以及常见问题解决
    docker启动MQ容器下载docker镜像dockersearchrabbitmqdockerpullrabbitmqdockerrun-d--hostnamemy-rabbit--namerabbit-p15672:15672-p5672:5672rabbitmq:latest启动容器后浏览器无法访问dockerexec-it3b124f0c9712/bin/bashrabbitmq-pluginsenab......
  • RabbitMQ03
    1.RabbitMQ死信队列1.1死信队列简介在实际开发项目是,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中进行处理。死信队列:RabbitMQ中并不是直接声明......