首页 > 其他分享 >SpringBoot集成多个RabbitMq(多个MQ链接)

SpringBoot集成多个RabbitMq(多个MQ链接)

时间:2023-12-26 16:36:49浏览次数:38  
标签:SpringBoot 多个 exchange 队列 RabbitMq 死信 dead letter public

##2023年12月16日 20:25:36  项目中使用RabbitMQ作为应用间信息互通,本次梳理下关于MQ的使用。

1、引入依赖

<!--  引入依赖,使用v2.5.6版本 -->
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.5.6</version>
        </dependency>
</dependencies>

 

2、在Nacos中配置MQ信息,此次项目中使用了两个MQ实例(①已方提供MQ Server,本项目内各系统信息互通;②甲方提供MQ Server,与甲方做信息互通),本次介绍mq1

spring:
  rabbitmq:
    mq1:
      host: 127.0.0.1
      port: 5672
      username: admin
      password: ****
      enable: false ##队列是否启用,在Configuration中来确认是否初始化MQ
    mq2:
      host: 127.0.0.2
      port: 5672
      username: admin
      password: ***
      enable: false

 

3、编写配置文件

@Data
@Component("mq1RabbitmqConfig") 
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {
    private String host;
    private Integer port;
    private String username;
    private String password;

    @Autowired
    private ReturnCallBack returnCallBack;
    @Autowired
    private ConfirmCallBack confirmCallBack;

    @Bean(value = "mq1ConnectionFactory")   //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此
    public ConnectionFactory createConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        //开启发送到交换机和队列的回调
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        return connectionFactory;
    }
    @Bean(name = "mq1RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此
    public RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //发送消息时设置强制标志,仅当提供了returnCallback时才适用
        rabbitTemplate.setMandatory(true);
        //确保消息是否发送到交换机,成功与失败都会触发
        rabbitTemplate.setConfirmCallback(confirmCallBack);
        //确保消息是否发送到队列,成功发送不触发,失败触发
        rabbitTemplate.setReturnsCallback(returnCallBack);
        return rabbitTemplate;
    }
    @Bean("mq1RabbitListenerContainerFactory")//命名mq1的RabbitListenerContainerFactory,如果项目中只有一个mq则不必如此
    SimpleRabbitListenerContainerFactory mq1RabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(listenerContainerFactory, connectionFactory);
        return listenerContainerFactory;
    }
}

 

4、编写回调callback,此一举措是为了记录交换机、队列本身是否健康,如业务无此细纠,也可不记录。

ConfirmCallBack.java :消息发送到交换机成功、失败都会回调
ReturnCallBack.java:消息发送队列失败回调
@Component
@Slf4j
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if(!ack){
            log.info("消息发送交换机失败:{}",s);
        }else{
            log.info("消息发送交换机成功");
        }
    }
}

@Component
@Slf4j
public class ReturnCallBack implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息发送队列失败:{}", JSON.toJSON(returnedMessage));
    }
}

 

5、MQ工具类,方便各业务点调用,减少代码冗余。

@Component
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class RabbitMQUtil {

    @Resource(name = "mq1RabbitTemplate")  //初始化mq1的RabbitTemplate对象,如果项目中只有一个MQ,则无需这么麻烦,
    private RabbitTemplate mq1RabbitTemplate;

    public void test1(List ob){
     //发送到fanout类型的交换机,跟这个交换机绑定的队列都会收到这一条消息,故第二个参数routekey无需填写
         mq1RabbitTemplate.convertAndSend("fanout.exchange",null, JSON.toJSONString(ob));
    }
    public void test2(String str){
      //发送到direct类型的交换机,根据routekey去找发送到哪个队列里,只有这一个队列才能收到这条消息
         mq1RabbitTemplate.convertAndSend("direct.exchange","routekey",str);
    }
}

 

6、生产者Producer

@Service
public class producer {

    @Autowired(required = false)  //一定要加required=false,否则mq配置不启用时,这里会报错
    private RabbitMQUtil rabbitMQUtil;

    public void producer(List ob) {
        
        if(rabbitMQUtil != null){
            rabbitMQUtil.test1(ob);
        }
    }
}

 

7、消费者Consumer(Listener)

①此处采用动态绑定生成队列的方式,可以满足多据点的模式,部署A据点后,根据A的特征值自动创建关于A的exchage、queue(mq1Queue_A),免去了在java类中显式声明并绑定exchage和queue的代码

②此处使用了死信队列 x-dead-letter-exchange,可以在msg消费失败时让msg进入对应死信队列,进而监听死信队列进行补偿操作。

③我们也可以用死信队列去做延时队列:给一个队列提前绑定好死信队列,发送消息到队列并设置消息的timeout,到点未消费也会扔到死信队列,通过监听对应的死信队列达到延时队列的作用

@Slf4j
@Component
@ConditionalOnExpression(value = "${spring.rabbitmq.mq1.enable:false} && '${aa.bb}' eq 'cc' ")  //mq1队列启用,并且满足某些条件才初始化
public class MQ1ccRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "mq1Queue_A"
                    , arguments = {@Argument(name = "x-dead-letter-exchange", value = "dead.letter.exchange"),
                    @Argument(name = "x-dead-letter-routing-key", value = "dead.letter.routekey")}
        ),
            exchange = @Exchange(name = "mq1_fanout_exchange",type = ExchangeTypes.FANOUT)
    ),ackMode = "MANUAL")
    public void mq1listener1(Message message,Channel channel, String poJson) throws IOException {
        try {
            MyClass po = JSONUtil.toBean(poJson,MyClass.class);
            if(myService.saveOrUpdate(po)){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                //第二个参数为false时,进入死信队列消费,true时重新进入队列头
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }
        } catch (Exception e) {
            log.info("队列{}消费省级目录信息失败:{},异常信息:{}",RabbitMqConstants.DOWN_REGION_QUEUES + "_${unified.areaCode}",poJson,e.getMessage());
            //第二个参数为false时,进入死信队列消费,true时重新进入队列头
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

 

8、显式绑定死信队列的exchange和queue(其他队列也可以这样绑,死信队列和其他队列没有任何区别,只是说可以在普通队列中去用 x-dead-letter-exchange 去标记一个队列的死信队列,让普通队列消费失败时,将消息扔到死信队列。

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true")
public class BrainDiedConfig {

    @Resource(name = "mq1RabbitTemplate")
    private RabbitTemplate brainRabbitTemplate;
    @Bean("dead.letter.exchange")
    public Exchange diedExchange(){
        return new DirectExchange("dead.letter.exchange");
    }

    @Bean("dead.letter.queues")
    public Queue diedQueue(){
        return new Queue("dead.letter.queues",true);
    }

    @Bean
    public Binding bindingDied(@Qualifier("dead.letter.queues") Queue diedQueue,@Qualifier("dead.letter.exchange") Exchange diedExchange){
        return BindingBuilder.bind(diedQueue).to(diedExchange).with("dead.letter.routeke").noargs();
    }
}

 

9、消费死信队列

@RabbitListener(queues = "dead.letter.queues")
    public void diedLetterConsumer(Message message, Channel channel, String poJson){
        try {
            String queueName = (String) message.getMessageProperties().getHeaders().get("x-first-death-queue");
            String exchangeName = (String) message.getMessageProperties().getHeaders().get("x-first-death-exchange");
            dao.save(queueName, exchangeName, poJson);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

 

10、死信队列建表语句

create table died_letter
(
    id            bigint auto_increment
        primary key,
    queue_name    varchar(50)                        not null comment '队列名称'
    exchange_name varchar(50)                        not null comment '交换机名称',
    msg_json      mediumtext                         not null comment '队列信息',
    create_time   datetime default CURRENT_TIMESTAMP not null comment '生成时间',
    is_handle     tinyint  default 0                 not null comment '是否被处理,0:未处理;1:已处理'
)
    comment '死信队列信息';



标签:SpringBoot,多个,exchange,队列,RabbitMq,死信,dead,letter,public
From: https://blog.51cto.com/u_15002821/8984728

相关文章

  • rabbitMq怎么查看队列日志消息-Tracing日志
    Trace是Rabbitmq用于记录每一次发送的消息,方便使用Rabbitmq的开发者调试、排错。1、启动Tracing插件在RabbitMQ中默认是关闭的,需手动开启。此处rabbitMQ是使用docker部署的##进入rabbitMq中dockerexec-itrabbitmq1bash##启动日志插件rabbitmq-pluginsenablerabbitmq_tr......
  • 基于SpringBoot+Vue的毕业设计系统的开发设计实现(源码+lw+部署文档+讲解等)
    (文章目录)前言:heartpulse:博主介绍:✌全网粉丝10W+,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌:heartpulse:......
  • RabbitMQ vs Kafka:正面交锋!
    介绍作为一名有着大量微服务系统处理经验的软件架构师,我经常遇到一个不断重复的问题:“我应该使用RabbitMQ还是Kafka?”出于某种原因,许多开发人员认为这些技术是可以互换的。虽然在某些情况下确实如此,但RabbitMQ还是Kafka之间存在根本上的差异。因此,不同的场景需要不同的,选择......
  • SpringBoot项目导入的时候没有显示父工程
    一、出现错误导入选中要导入的项目的文件夹,然后点击OK,接下来一直next即可。但是导入完了之后没有出现父模块,只有子模块。二、修正错误CTRL+SHIFT+A打开下面的菜单输入maven,选择AddMavenProjects.选择你要导入的工程的pom文件,然后点击OK,父模块就导入了。三、较好的导入方法从......
  • 使用 PostgreSQL 16.1 + Citus 12.1 作为多个微服务的分布式 Sharding 存储后端
    在本教程中,我们将使用PostgreSQL16.1+Citus12.1作为多个微服务的存储后端,演示此类集群的样例设置和基本操作。Citus12.1实验环境设置Docker快速启动Citus分布式集群docker-compose.ymlversion:"3"services:master:container_name:"${COMPOSE_PROJECT......
  • Qt/C++音视频开发61-多屏渲染/一个解码渲染到多个窗口/画面实时同步
    一、前言多屏渲染就是一个解码线程对应多个渲染界面,通过addrender这种方式添加多个绘制窗体,我们经常可以在展会或者卖电视机的地方可以看到很多电视播放的同一个画面,原理应该类似,一个地方负责打开解码播放,将画面同步传输到多个显示的地方,完全保证了画面的一致性。这样相当于复用......
  • 【SpringBootWeb入门-18】案例-准备工作
    1、案例前言在前面的章节中,我们学习完了后端springbootweb开发的基础知识、MySQL数据库以及Mybatis框架,接下来我们来完成一个Web开发的综合案例,通过这个案例我们学习到前端程序、后端程序以及数据库之间是如何交互协作的,同时通过这个案例我们也学习到根据接口文档、开发服务端接......
  • 启动springboot的测试类,报红:Java HotSpot(TM) 64-Bit Server VM warning: Sharing is
    启动springboot的测试类时,报红:JavaHotSpot(TM)64-BitServerVMwarning:Sharingisonlysupportedforbootloaderclassesbecausebootstrapclasspathhasbeenappended原因:JavaHotSpot(TM)64位服务器虚拟机已附加引导程序类路径解决办法:IDEA—》Settings—》Build......
  • Rabbitmq消息大量堆积,我慌了!
    背景记得有次公司搞促销活动,流量增加,但是系统一直很平稳(我们开发的系统真牛),大家很开心的去聚餐,谈笑风声,气氛融洽,突然电话响起....运维:小李,你们系统使用的rabbitmq的消息大量堆积,导致服务器cpu飙升,赶紧回来看看,服务器要顶不住了小李:好的系统架构描述我们使用rabbitmq主要是为了系统......
  • SpringBoot多级缓存
    SpringBoot实现项目更删改查后,会有新的问题需要解决,就是并发大的问题,一般而言,解决查询并发大的问题,常见的手段是为查询接口增加缓存,从而可以减轻持久层的压力。按照我们以往的经验,在查询接口中增加Redis缓存即可,将查询的结果数据存储到Redis中,执行查询时首先从Redis中命中,如果命中......