首页 > 其他分享 >SpringBoot集成多个RabbitMq(多个MQ链接)

SpringBoot集成多个RabbitMq(多个MQ链接)

时间:2023-12-16 23:22:05浏览次数:39  
标签:SpringBoot 多个 队列 RabbitMq mq1 死信 dead letter public

##2023年12月16日 20:25:36  项目中使用RabbitMQ作为应用间信息互通,本次梳理下关于MQ的使用。

1、引入依赖

<!--  引入依赖,使用v2.5.6版本 -->
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.5.6</version>
        </dependency>
</dependencies> 

2、在Nacos中配置MQ信息,此次项目中使用了两个MQ实例(①已方提供MQ Server,本项目内各系统信息互通;②甲方提供MQ Server,与甲方做信息互通),本次介绍mq1

spring:
  rabbitmq:
    mq1:
      host: 127.0.0.1
      port: 5672
      username: admin
      password: ****
      enable: false ##队列是否启用,在Configuration中来确认是否初始化MQ
    mq2:
      host: 127.0.0.2
      port: 5672
      username: admin
      password: ***
      enable: false

3、编写配置文件

@Data
@Component("mq1RabbitmqConfig") 
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {
    private String host;
    private Integer port;
    private String username;
    private String password;

    @Autowired
    private ReturnCallBack returnCallBack;
    @Autowired
    private ConfirmCallBack confirmCallBack;

    @Bean(value = "mq1ConnectionFactory")   //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此
    public ConnectionFactory createConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        //开启发送到交换机和队列的回调
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        return connectionFactory;
    }
    @Bean(name = "mq1RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此
    public RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //发送消息时设置强制标志,仅当提供了returnCallback时才适用
        rabbitTemplate.setMandatory(true);
        //确保消息是否发送到交换机,成功与失败都会触发
        rabbitTemplate.setConfirmCallback(confirmCallBack);
        //确保消息是否发送到队列,成功发送不触发,失败触发
        rabbitTemplate.setReturnsCallback(returnCallBack);
        return rabbitTemplate;
    }
    @Bean("mq1RabbitListenerContainerFactory")//命名mq1的RabbitListenerContainerFactory,如果项目中只有一个mq则不必如此
    SimpleRabbitListenerContainerFactory mq1RabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(listenerContainerFactory, connectionFactory);
        return listenerContainerFactory;
    }
}

4、编写回调callback,此一举措是为了记录交换机、队列本身是否健康,如业务无此细纠,也可不记录。

ConfirmCallBack.java :消息发送到交换机成功、失败都会回调
ReturnCallBack.java:消息发送队列失败回调
@Component
@Slf4j
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if(!ack){
            log.info("消息发送交换机失败:{}",s);
        }else{
            log.info("消息发送交换机成功");
        }
    }
}

@Component
@Slf4j
public class ReturnCallBack implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息发送队列失败:{}", JSON.toJSON(returnedMessage));
    }
}

 

5、MQ工具类,方便各业务点调用,减少代码冗余。

@Component
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class RabbitMQUtil {

    @Resource(name = "mq1RabbitTemplate")  //初始化mq1的RabbitTemplate对象,如果项目中只有一个MQ,则无需这么麻烦,
    private RabbitTemplate mq1RabbitTemplate;

    public void test1(List ob){
        if(ObjectUtil.isNotEmpty(pos)){
          //TODO
        }
    }
    public void test2(String str){
          //TODO
    }
}

  

6、生产者Producer

@Service
public class producer {

    @Autowired(required = false)  //一定要加required=false,否则mq配置不启用时,这里会报错
    private RabbitMQUtil rabbitMQUtil;
public void producer(List ob) { if(rabbitMQUtil != null){ rabbitMQUtil.test1(ob); } }
}

7、消费者Consumer(Listener)

①此处采用动态绑定生成队列的方式,可以满足多据点的模式,部署A据点后,根据A的特征值自动创建关于A的exchage、queue(mq1Queue_A),免去了在java类中显式声明并绑定exchage和queue的代码

②此处使用了死信队列 x-dead-letter-exchange,可以在msg消费失败时让msg进入对应死信队列,进而监听死信队列进行补偿操作。

@Slf4j
@Component
@ConditionalOnExpression(value = "${spring.rabbitmq.mq1.enable:false} && '${aa.bb}' eq 'cc' ")  //mq1队列启用,并且满足某些条件才初始化
public class MQ1ccRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "mq1Queue_A"
                    , arguments = {@Argument(name = "x-dead-letter-exchange", value = "dead.letter.exchange"),
                    @Argument(name = "x-dead-letter-routing-key", value = "dead.letter.routekey")}
        ), exchange = @Exchange(name = "mq1_fanout_exchange",type = ExchangeTypes.FANOUT) ),ackMode = "MANUAL") public void mq1listener1(Message message,Channel channel, String poJson) throws IOException { try { MyClass po = JSONUtil.toBean(poJson,MyClass.class); if(myService.saveOrUpdate(po)){ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }else { //第二个参数为false时,进入死信队列消费,true时重新进入队列头 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } } catch (Exception e) { log.info("队列{}消费省级目录信息失败:{},异常信息:{}",RabbitMqConstants.DOWN_REGION_QUEUES + "_${unified.areaCode}",poJson,e.getMessage()); //第二个参数为false时,进入死信队列消费,true时重新进入队列头 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } } }

 

8、显式绑定死信队列的exchange和queue(其他队列也可以这样绑,死信队列和其他队列没有任何区别,只是说可以在普通队列中去用 x-dead-letter-exchange 去标记一个队列的死信队列,让普通队列消费失败时,将消息扔到死信队列。

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true")
public class BrainDiedConfig {

    @Resource(name = "mq1RabbitTemplate")
    private RabbitTemplate brainRabbitTemplate;
    @Bean("dead.letter.exchange")
    public Exchange diedExchange(){
        return new DirectExchange("dead.letter.exchange");
    }

    @Bean("dead.letter.queues")
    public Queue diedQueue(){
        return new Queue("dead.letter.queues",true);
    }

    @Bean
    public Binding bindingDied(@Qualifier("dead.letter.queues") Queue diedQueue,@Qualifier("dead.letter.exchange") Exchange diedExchange){
        return BindingBuilder.bind(diedQueue).to(diedExchange).with("dead.letter.routeke").noargs();
    }
}

9、消费死信队列

    @RabbitListener(queues = "dead.letter.queues")
    public void diedLetterConsumer(Message message, Channel channel, String poJson){
        try {
            String queueName = (String) message.getMessageProperties().getHeaders().get("x-first-death-queue");
            String exchangeName = (String) message.getMessageProperties().getHeaders().get("x-first-death-exchange");
            dao.save(queueName, exchangeName, poJson);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

10、死信队列建表语句

create table died_letter
(
    id            bigint auto_increment
        primary key,
    queue_name    varchar(50)                        not null comment '队列名称'
    exchange_name varchar(50)                        not null comment '交换机名称',
    msg_json      mediumtext                         not null comment '队列信息',
    create_time   datetime default CURRENT_TIMESTAMP not null comment '生成时间',
    is_handle     tinyint  default 0                 not null comment '是否被处理,0:未处理;1:已处理'
)
    comment '死信队列信息';

  

  

  

  

 

 

标签:SpringBoot,多个,队列,RabbitMq,mq1,死信,dead,letter,public
From: https://www.cnblogs.com/lansetuerqi/p/17908336.html

相关文章

  • 第八章:SpringBoot-Java工程及关系和修改启动logo(了解)
    一、springbootJava工程(了解)二、关闭&修改启动logo(了解)......
  • Springboot整合MybatisPlus
    1、引入mybatis-plus坐标<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.2.0</version></dependency>2、配置数据源spring:datasource:use......
  • 第三章:SpringBoot集成jsp、mybatis的逆向工程和集成
    一、springboot继承jsp二、mybatis逆向工程三、集成mybatis四、案例-集成mybatis五、集成mybatis总结......
  • 第四章:mapper映射文件存放位置、springboot支持事务
    一、mapper映射文件存放位置二、springboot支持事务......
  • 二、SpringBoot的配置文件
    1、核心配置文件properties2、核心配置文件yml3、两种核心配置文件同时存在4、设置maven私服仓库5、多环境下核心配置文件6、多环境下核心配置文件yml7、获取自定义配置8、将自定义配置映射到对象......
  • SpringBoot-Validate优雅实现参数校验
    1、是什么?它简化了JavaBeanValidation的集成。JavaBeanValidation通过JSR380,也称为BeanValidation2.0,是一种标准化的方式,用于在Java应用程序中对对象的约束进行声明式验证。它允许开发人员使用注解来定义验证规则,并自动将规则应用于相应的字段或方法参数为了我们......
  • 【转载】Springboot2.x 事务
    参考https://blog.csdn.net/MinggeQingchun/article/details/119579941https://www.cnblogs.com/myitnews/p/12364455.htmlhttps://blog.csdn.net/dndndnnffj/article/details/109092116本文代码下载环境环境版本操作windows10vscode1.84.2SpringBo......
  • springboot008基于位置的多分馆图书馆推荐系统vue
    第一章整体文档第二章需求分析2.3开发工具及技术2.3.1SpringBoot框架2.3.2Maven环境2.3.3MySQL数据库2.3.4Vue.js框架第四章网站功能实现4.1系统实现4.2后台功能4.2.1管理员登录界面图4.2.1.1登录界面4.2.2系统首页4.3前台功能4.3.1新用户注册登录4.3.2首页4.4数据......
  • springboot009保险信息网站(vue)
    1绪论1.1研究背景随着互联网的不断发展,现在人们获取最新资讯的主要途径来源于网上新闻,当下的网上信息宣传门户网站的发展十分的迅速。而保险产品,作为当下人们非常关注的一款能够给人们带来医疗、生活、养老或者理财需求的产品逐渐的被广大的人们所接受。当前的保险公司对于保险......
  • 基于SpringBoot的网上租赁系统-计算机毕业设计源码+LW文档
    摘要本课题是根据用户的需要以及网络的优势建立的一个基于SpringBoot的网上租贸系统,来满足用户网络商品租赁的需求。本网上租贸系统应用Java技术,MYSQL数据库存储数据,基于SpringBoot框架开发。在网站的整个开发过程中,首先对系统进行了需求分析,设计出系统的主要功能模块,其次对网......