首页 > 其他分享 >Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息

Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息

时间:2024-10-12 09:44:13浏览次数:7  
标签:payload Spring Boot Broker broker second message public first

1. Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息

1.1. 版本说明

构件 版本
spring-boot 2.7.18
spring-boot-starter-amqp 2.7.18

1.2. 概述

flowchart TB subgraph app["Spring Boot 应用"] direction TB template1["第一个 RabbitTemplate"] listener1["第一个监听器"] template2["第二个 RabbitTemplate"] listener2["第二个监听器"] end subgraph mq1["第一个 RabbitMQ"] direction TB exchange1["第一个交换机"] --> queue1["第一个队列"] end subgraph mq2["第二个 RabbitMQ"] direction TB exchange2["第二个交换机"] --> queue2["第二个队列"] end template1 --> exchange1 queue1 --> listener1 template2 --> exchange2 queue2 --> listener2

1.3. RabbitMQ 信息

IP:端口 用户名 virtual host
127.0.0.1:5672 first-user /first-virtual-host
127.0.0.1:5672 second-user /second-virtual-host

1.4. Spring 配置

spring:
  application:
    name: spring-rabbit-multiple-broker-demo
rabbitmq:
  first-broker:
    addresses: 127.0.0.1:5672
    username: first-user
    password: first-user
    virtual-host: /first-virtual-host
  second-broker:
    addresses: 127.0.0.1:5672
    username: second-user
    password: second-user
    virtual-host: /second-virtual-host

1.5. 定义常量

public class SpringRabbitMultipleBrokerConstants {
    public static final String FIRST_QUEUE = "spring-rabbit-multiple-broker-demo-first-queue";
    public static final String FIRST_EXCHANGE = "spring-rabbit-multiple-broker-demo-first-exchange";
    public static final String SECOND_QUEUE = "spring-rabbit-multiple-broker-demo-second-queue";
    public static final String SECOND_EXCHANGE = "spring-rabbit-multiple-broker-demo-second-exchange";
}

1.6. 定义配置属性

@Data
@ConfigurationProperties(prefix = "rabbitmq")
public class SpringRabbitMultipleBrokerProperties {
    private RabbitProperties firstBroker;
    private RabbitProperties secondBroker;
}

1.7. 定义两个 ConnectionFactory

@Bean
@Primary
public ConnectionFactory firstConnectionFactory(SpringRabbitMultipleBrokerProperties multipleBrokerProperties) throws Throwable {
    RabbitProperties rabbitProperties = multipleBrokerProperties.getFirstBroker();
    return generateConnectionFactory(rabbitProperties);
}

@Bean
public ConnectionFactory secondConnectionFactory(SpringRabbitMultipleBrokerProperties multipleBrokerProperties) throws Throwable {
    RabbitProperties rabbitProperties = multipleBrokerProperties.getSecondBroker();
    return generateConnectionFactory(rabbitProperties);
}

private ConnectionFactory generateConnectionFactory(RabbitProperties rabbitProperties) throws Throwable {
    RabbitConnectionFactoryBean connectionFactoryBean = new RabbitConnectionFactoryBean();
    PropertyMapper map = PropertyMapper.get();
    map.from(rabbitProperties.determineHost()).whenNonNull().to(connectionFactoryBean::setHost);
    map.from(rabbitProperties::determinePort).to(connectionFactoryBean::setPort);
    map.from(rabbitProperties::determineUsername).whenNonNull().to(connectionFactoryBean::setUsername);
    map.from(rabbitProperties::determinePassword).whenNonNull().to(connectionFactoryBean::setPassword);
    map.from(rabbitProperties::determineVirtualHost).whenNonNull().to(connectionFactoryBean::setVirtualHost);
    connectionFactoryBean.afterPropertiesSet();
    com.rabbitmq.client.ConnectionFactory factory = connectionFactoryBean.getObject();
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
    return connectionFactory;
}

1.8. 定义两个 RabbitTemplate

@Bean
public RabbitTemplate firstRabbitTemplate(ConnectionFactory firstConnectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    template.setConnectionFactory(firstConnectionFactory);
    return template;
}

@Bean
public RabbitTemplate secondRabbitTemplate(ConnectionFactory secondConnectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    template.setConnectionFactory(secondConnectionFactory);
    return template;
}

1.9. 定义两个 SimpleRabbitListenerContainerFactory

@Bean
public SimpleRabbitListenerContainerFactory firstRabbitListenerContainerFactory(ConnectionFactory firstConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(firstConnectionFactory);
    return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory secondRabbitListenerContainerFactory(ConnectionFactory secondConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(secondConnectionFactory);
    return factory;
}

1.10. 测试

@Component
@Slf4j
public class SpringRabbitMultipleBrokerDemo implements ApplicationRunner {

    @Resource
    private RabbitTemplate firstRabbitTemplate;
    @Resource
    private RabbitTemplate secondRabbitTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        String firstPayload = "first payload";
        firstRabbitTemplate.convertAndSend(FIRST_EXCHANGE, null, firstPayload);
        log.info("sent a message to first broker, payload: {}", firstPayload);

        String secondPayload = "second payload";
        secondRabbitTemplate.convertAndSend(SECOND_EXCHANGE, null, secondPayload);
        log.info("sent a message to second broker, payload: {}", secondPayload);
    }

    @RabbitListener(
            containerFactory = "firstRabbitListenerContainerFactory",
            bindings = {
                    @QueueBinding(
                            value = @Queue(
                                    name = FIRST_QUEUE,
                                    durable = "true",
                                    declare = "true"
                            ),
                            exchange = @Exchange(
                                    name = FIRST_EXCHANGE,
                                    durable = "true",
                                    type = FANOUT,
                                    declare = "true"
                            )
                    )
            }
    )
    public void listenToFirstBroker(Message<String> message) {
        log.info(
                "received a message from first broker, payload: {}",
                message.getPayload()
        );
    }

    @RabbitListener(
            containerFactory = "secondRabbitListenerContainerFactory",
            bindings = {
                    @QueueBinding(
                            value = @Queue(
                                    name = SECOND_QUEUE,
                                    durable = "true",
                                    declare = "true"
                            ),
                            exchange = @Exchange(
                                    name = SECOND_EXCHANGE,
                                    durable = "true",
                                    type = FANOUT,
                                    declare = "true"
                            )
                    )
            }
    )
    public void listenToSecondBroker(Message<String> message) {
        log.info(
                "received a message from second broker, payload: {}",
                message.getPayload()
        );
    }
}

启动程序,控制台将输出以下日志:

sent a message to first broker, payload: first payload
sent a message to second broker, payload: second payload
received a message from second broker, payload: second payload
received a message from first broker, payload: first payload

标签:payload,Spring,Boot,Broker,broker,second,message,public,first
From: https://www.cnblogs.com/jason207010/p/18459782

相关文章

  • 深入解析Spring AI框架:在Java应用中实现智能化交互的关键
    今天我们的SpringAI源码分析主题即将结束。我已经对自己感兴趣的基本内容进行了全面的审视,并将这些分析分享给大家。如果你对这个主题感兴趣,可以阅读以下几篇文章。每篇文章都层层递进,深入探讨相关内容。考虑到长文可能让大家感到疲惫,我采用了逐步推进的方式,确保每一篇都简明易懂......
  • 【02】手把手教你0基础部署SpringCloud微服务商城教学-Mybatis篇(下)
    上期回顾:【01】手把手教你0基础部署SpringCloud微服务商城教学-Mybatis篇(上)Part1.续接上文Mybatis-plus的批处理功能接下来我们学习一下IService的批量查询,我们用以往的for循环做一个对比这是for循环部分的代码privateUserbuilderUser(inti){Useruser=new......
  • oracle 19c dgbroker 报错ORA-16664 with ORA-12514如何解决
    alert中一堆这个保存一新***********************************************************************FatalNIconnecterror12504,connectingto:(DESCRIPTION=(CONNECT_DATA=(SERVICE_NAME=)(INSTANCE_NAME=hrz)(CID=(PROGRAM=oracle)(HOST=sd4)(USER=oracle)))(ADDRESS......
  • Spring源码理解 类接口说明
    FactoryBean、BeanFactoryBeanFactoryBeanFactory是管理和生产Bean的,它是用于访问SpringBean容器的根接口。,定义了管理Bean的方法,获取Bean、包含Bean、是否单例Bean、获取Bean类型等。Spring根据他提供了很多实现,如DefaultListableBeanFactory、XmlBeanFactory、Applica......
  • springboot+vue基于Web的辅助教学平台【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展和普及,教育领域正经历着深刻的变革。传统的教学方式逐渐难以满足现代学生的学习需求,他们渴望更加灵活、便捷的学习方式。与此同时,教师也面临教学资源分配不均、管理效率低下等问题。因此,开发一个基于Web的辅......
  • springboot+vue基于springboot+vue的线上学习系统【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着互联网技术的迅猛发展和信息技术的不断革新,线上学习系统逐渐成为教育领域的重要组成部分。近年来,受疫情影响,线上学习需求更是急剧增长,为教育领域带来了前所未有的挑战与机遇。传统的线下教学模式已经难以满足现代教育的多元化需求......
  • springboot+vue基于Springboot校园招聘系统【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景在当今信息化高速发展的时代,校园招聘已成为企业与高校毕业生双向选择的重要渠道。然而,传统的校园招聘方式存在信息不对称、流程繁琐、效率低下等问题,严重制约了招聘双方的沟通与选择。随着互联网的普及和大数据技术的应用,构建一个高效......
  • springboot+vue+java高中素质评价档案系统ssm
    目录功能和技术介绍系统实现截图技术范围开发核心技术介绍:代码执行流程核心代码部分展示系统测试其他springboot项目推荐详细视频演示源码获取功能和技术介绍探索如何设计一个用户友好、响应迅速的系统界面,确保系统后端逻辑的高效和稳定性。研究如何通过SpringBoot......
  • 毕设项目案例实战II基于Java+Spring Boot+MySQL的学生选课系统的设计与实现(源码+数据
    目录一、前言二、技术介绍三、系统实现四、论文参考五、核心代码六、源码获取全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末一、前言随着信息技术的飞速发展和教育信息化的不......
  • 基于SpringBoot+Vue+uniapp的税务门户网站的详细设计和实现(源码+lw+部署文档+讲解等)
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......