首页 > 其他分享 >SpringBoot 集成多个rabbitmq

SpringBoot 集成多个rabbitmq

时间:2023-02-24 10:55:15浏览次数:32  
标签:集成 connectionFactory SpringBoot springframework String rabbitmq org import amqp

1. pom.xml 配置

<!-- RabbitMQ -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置文件

spring:
  rabbitmq:
    first:
      host: 101.22.08.161
      port: 5672
      username: dhcs
      password: dhcs
      virtualhost: /
    second:
      host: 101.32.80.138
      port: 5672
      username: admin
      password: admin
      virtualhost: /account

3. 编写配置类

package com.example.mybatis.config.mq;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class RabbitMQConfig {

    // ============================================ 定义连接connectionFactory ====================================================================
    @Bean(name = "mainConnectionFactory")
    @Primary
    public ConnectionFactory mainConnectionFactory(
            @Value("${spring.rabbitmq.first.host}") String host,
            @Value("${spring.rabbitmq.first.port}") int port,
            @Value("${spring.rabbitmq.first.username}") String username,
            @Value("${spring.rabbitmq.first.password}") String password,
            @Value("${spring.rabbitmq.first.virtualhost}") String virtualhost) {
        return connectionFactory(host, port, username, password, virtualhost);
    }

    @Bean(name = "secondConnectionFactory")
    public ConnectionFactory secondConnectionFactory(
            @Value("${spring.rabbitmq.second.host}") String host,
            @Value("${spring.rabbitmq.second.port}") int port,
            @Value("${spring.rabbitmq.second.username}") String username,
            @Value("${spring.rabbitmq.second.password}") String password,
            @Value("${spring.rabbitmq.second.virtualhost}") String virtualhost) {
        return connectionFactory(host, port, username, password, virtualhost);
    }


    public CachingConnectionFactory connectionFactory(String host, int port, String username, String password, String virtualhost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        return connectionFactory;
    }

    // ============================================ 定义RabbitTemplate ====================================================================
    @Bean(name = "mainRabbitTemplate")
    @Primary
    public RabbitTemplate mainRabbitTemplate(@Qualifier("mainConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate mainRabbitTemplate = new RabbitTemplate(connectionFactory);
        mainRabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return mainRabbitTemplate;
    }

    @Bean(name = "secondRabbitTemplate")
    public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
        secondRabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return secondRabbitTemplate;
    }


    // ============================================ 定义监听Listener ====================================================================
    @Bean(name = "mainFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory mainFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("mainConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "secondFactory")
    public SimpleRabbitListenerContainerFactory secondFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(10);
        factory.setMaxConcurrentConsumers(20);
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

4. 发送消息

package com.example.mybatis.config.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class Sender1 {

    @Resource(name = "mainRabbitTemplate")
    protected RabbitTemplate rabbitTemplate;

    public void test() {
        this.sendMessage("dhcs-exchange", "DHCS2KF1", "1111111111");
    }

    /**
     * @param exchange   交换机
     * @param routingKey 路由
     * @param message    消息内容
     * @return boolean
     * @Description 发送消息
     */
    private boolean sendMessage(String exchange, String routingKey, String message) {
        long start = System.currentTimeMillis();
        try {
            Message messageObj = MessageBuilder.withBody(message.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .build();
            rabbitTemplate.convertAndSend(exchange, routingKey, messageObj);
        } catch (Exception e) {
            log.error("【mq发送失败】", e);
            return false;
        } finally {
            long end = System.currentTimeMillis();
            log.info("【mq发送】入参:{},耗时:{}ms", message, (end - start));
        }
        return true;
    }
}

package com.example.mybatis.config.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class Sender2 {

    @Resource(name = "secondRabbitTemplate")
    protected RabbitTemplate rabbitTemplate;

    public void test() {
        this.sendMessage("ssoAccount_exchange", "", "123456789");
    }

    /**
     * @param exchange   交换机
     * @param routingKey 路由
     * @param message    消息内容
     * @return boolean
     * @Description 发送消息
     */
    private boolean sendMessage(String exchange, String routingKey, String message) {
        long start = System.currentTimeMillis();
        try {
            Message messageObj = MessageBuilder.withBody(message.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .build();
            rabbitTemplate.convertAndSend(exchange, routingKey, messageObj);
        } catch (Exception e) {
            log.error("【mq发送失败】", e);
            return false;
        } finally {
            long end = System.currentTimeMillis();
            log.info("【mq发送】入参:{},耗时:{}ms", message, (end - start));
        }
        return true;
    }
}

5. 接收消息

package com.example.mybatis.config.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Listener1 {

    @RabbitListener(queuesToDeclare = @Queue("DHCS2KF1"), containerFactory = "mainFactory")
    @RabbitHandler
    public void process(String message) {
        log.info("Listener1 接收消息:{} ", message);
        try {
            // 逻辑处理
            System.out.println("Listener1确认消费完成 ..............................");
        } catch (Exception e) {
            log.error("Listener1确认消费异常", e);
        } finally {
            log.error("Listener1确认消费:{}", message);
        }
    }
}
package com.example.mybatis.config.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Listener2 {

    @RabbitListener(queuesToDeclare = @Queue("sso.account.chewu.queue"), containerFactory = "secondFactory")
    @RabbitHandler
    public void process(String message) {
        log.info("Listener2 接收消息:{} ", message);
        try {
            // 逻辑处理
            System.out.println("Listener2确认消费完成 ..............................");
        } catch (Exception e) {
            log.error("Listener2确认消费异常", e);
        } finally {
            log.error("Listener2确认消费:{}", message);
        }
    }
}

标签:集成,connectionFactory,SpringBoot,springframework,String,rabbitmq,org,import,amqp
From: https://www.cnblogs.com/lovedaodao/p/17150506.html

相关文章

  • docker 安装rabbitmq3.11-management
    1,dockerpullrabbitmq:3.11-managementmanagement版本表示带有web管理后台2,创建映射目录mkdir-p/docker/rabbitmq/data3,启动dockerrun-p5672:5672-p15672......
  • websocket接口自动化集成pytest测试框架
    每天进步一点点,关注我们哦,每天分享测试技术文章本文章出自【码同学软件测试】码同学公众号:自动化软件测试,领取资料可加:magetest码同学抖音号:小码哥聊软件测试 01web......
  • SpringBoot 集成 MybatisPlus
    MybatisPlus是Mybatis的升级版本,是对Mybatis的简化,因为他们的口号就是“为简化开发而生”。1、创建数据表CREATETABLE​​User​​(​​id​​INTNOTNULL,​​us......
  • 118、商城业务---分布式事务---RabbitMQ延时队列定时关单模拟
    1、使用RabbitMq实现延时队列方法12、基于我们的业务我们使用下面这种方式实现延时队列1、导入依赖<dependency><groupId>org.springfram......
  • springboot集成flume实现多系统日志收集
    本次demo实现的功能:使用flume框架收集目标工程的日志信息,并发送到kafka,最终完成kafka的消费1、配置工程配置logback:<!--此处为flume日志采集的配置--><appende......
  • RabbitMQ 总结
    为什么使用MQ:异步,解耦,肖峰使用消息队列带来的一些问题:系统可用性降低:系统可用性在某种程度上降低,为什么这样说呢?在加入MQ之前,你不用考虑      消息丢失......
  • 117、商城业务---分布式事务---RabbitMQ延时队列
    1、定时任务存在的问题即任务过期时间为30min,任务在第31min过期,但是在第60分钟才被扫描到2、延时队列是先设置一个过期队列,里面消息过期后不会丢弃而是通过交换机放......
  • SpringBoot解决跨域方案
    SpringBoot解决跨域的几种方式跨域资源共享(CORS):通过修改Http协议header的方式,实现跨域。说的简单点就是,通过设置HTTP的响应头信息,告知浏览器哪些情况在不符合同源策略的条......
  • 115、商城业务---分布式事务---使用Springboot提供的Seata解决分布式事务
    https://seata.io/zh-cn/seata使用SeataAT模式控制分布式事务的步骤:1、每一个想控制分布式事务的服务对应的数据库都需要创建一个UNDO_LOG表CREATETABLE`undo_log`......
  • Springboot 集成 Fastjson2
    Springboot整合Fastjson2排除默认的Jackson<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>......