1. Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息
1.1. 版本说明
构件 | 版本 |
---|---|
spring-boot | 2.7.18 |
spring-boot-starter-amqp | 2.7.18 |
1.2. 概述
flowchart TB subgraph app["Spring Boot 应用"] direction TB template1["第一个 RabbitTemplate"] listener1["第一个监听器"] template2["第二个 RabbitTemplate"] listener2["第二个监听器"] end subgraph mq1["第一个 RabbitMQ"] direction TB exchange1["第一个交换机"] --> queue1["第一个队列"] end subgraph mq2["第二个 RabbitMQ"] direction TB exchange2["第二个交换机"] --> queue2["第二个队列"] end template1 --> exchange1 queue1 --> listener1 template2 --> exchange2 queue2 --> listener21.3. RabbitMQ 信息
IP:端口 | 用户名 | virtual host |
---|---|---|
127.0.0.1:5672 | first-user | /first-virtual-host |
127.0.0.1:5672 | second-user | /second-virtual-host |
1.4. Spring 配置
spring:
application:
name: spring-rabbit-multiple-broker-demo
rabbitmq:
first-broker:
addresses: 127.0.0.1:5672
username: first-user
password: first-user
virtual-host: /first-virtual-host
second-broker:
addresses: 127.0.0.1:5672
username: second-user
password: second-user
virtual-host: /second-virtual-host
1.5. 定义常量
public class SpringRabbitMultipleBrokerConstants {
public static final String FIRST_QUEUE = "spring-rabbit-multiple-broker-demo-first-queue";
public static final String FIRST_EXCHANGE = "spring-rabbit-multiple-broker-demo-first-exchange";
public static final String SECOND_QUEUE = "spring-rabbit-multiple-broker-demo-second-queue";
public static final String SECOND_EXCHANGE = "spring-rabbit-multiple-broker-demo-second-exchange";
}
1.6. 定义配置属性
@Data
@ConfigurationProperties(prefix = "rabbitmq")
public class SpringRabbitMultipleBrokerProperties {
private RabbitProperties firstBroker;
private RabbitProperties secondBroker;
}
1.7. 定义两个 ConnectionFactory
@Bean
@Primary
public ConnectionFactory firstConnectionFactory(SpringRabbitMultipleBrokerProperties multipleBrokerProperties) throws Throwable {
RabbitProperties rabbitProperties = multipleBrokerProperties.getFirstBroker();
return generateConnectionFactory(rabbitProperties);
}
@Bean
public ConnectionFactory secondConnectionFactory(SpringRabbitMultipleBrokerProperties multipleBrokerProperties) throws Throwable {
RabbitProperties rabbitProperties = multipleBrokerProperties.getSecondBroker();
return generateConnectionFactory(rabbitProperties);
}
private ConnectionFactory generateConnectionFactory(RabbitProperties rabbitProperties) throws Throwable {
RabbitConnectionFactoryBean connectionFactoryBean = new RabbitConnectionFactoryBean();
PropertyMapper map = PropertyMapper.get();
map.from(rabbitProperties.determineHost()).whenNonNull().to(connectionFactoryBean::setHost);
map.from(rabbitProperties::determinePort).to(connectionFactoryBean::setPort);
map.from(rabbitProperties::determineUsername).whenNonNull().to(connectionFactoryBean::setUsername);
map.from(rabbitProperties::determinePassword).whenNonNull().to(connectionFactoryBean::setPassword);
map.from(rabbitProperties::determineVirtualHost).whenNonNull().to(connectionFactoryBean::setVirtualHost);
connectionFactoryBean.afterPropertiesSet();
com.rabbitmq.client.ConnectionFactory factory = connectionFactoryBean.getObject();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
return connectionFactory;
}
1.8. 定义两个 RabbitTemplate
@Bean
public RabbitTemplate firstRabbitTemplate(ConnectionFactory firstConnectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(firstConnectionFactory);
return template;
}
@Bean
public RabbitTemplate secondRabbitTemplate(ConnectionFactory secondConnectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(secondConnectionFactory);
return template;
}
1.9. 定义两个 SimpleRabbitListenerContainerFactory
@Bean
public SimpleRabbitListenerContainerFactory firstRabbitListenerContainerFactory(ConnectionFactory firstConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(firstConnectionFactory);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory secondRabbitListenerContainerFactory(ConnectionFactory secondConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(secondConnectionFactory);
return factory;
}
1.10. 测试
@Component
@Slf4j
public class SpringRabbitMultipleBrokerDemo implements ApplicationRunner {
@Resource
private RabbitTemplate firstRabbitTemplate;
@Resource
private RabbitTemplate secondRabbitTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
String firstPayload = "first payload";
firstRabbitTemplate.convertAndSend(FIRST_EXCHANGE, null, firstPayload);
log.info("sent a message to first broker, payload: {}", firstPayload);
String secondPayload = "second payload";
secondRabbitTemplate.convertAndSend(SECOND_EXCHANGE, null, secondPayload);
log.info("sent a message to second broker, payload: {}", secondPayload);
}
@RabbitListener(
containerFactory = "firstRabbitListenerContainerFactory",
bindings = {
@QueueBinding(
value = @Queue(
name = FIRST_QUEUE,
durable = "true",
declare = "true"
),
exchange = @Exchange(
name = FIRST_EXCHANGE,
durable = "true",
type = FANOUT,
declare = "true"
)
)
}
)
public void listenToFirstBroker(Message<String> message) {
log.info(
"received a message from first broker, payload: {}",
message.getPayload()
);
}
@RabbitListener(
containerFactory = "secondRabbitListenerContainerFactory",
bindings = {
@QueueBinding(
value = @Queue(
name = SECOND_QUEUE,
durable = "true",
declare = "true"
),
exchange = @Exchange(
name = SECOND_EXCHANGE,
durable = "true",
type = FANOUT,
declare = "true"
)
)
}
)
public void listenToSecondBroker(Message<String> message) {
log.info(
"received a message from second broker, payload: {}",
message.getPayload()
);
}
}
启动程序,控制台将输出以下日志:
sent a message to first broker, payload: first payload
sent a message to second broker, payload: second payload
received a message from second broker, payload: second payload
received a message from first broker, payload: first payload
标签:payload,Spring,Boot,Broker,broker,second,message,public,first
From: https://www.cnblogs.com/jason207010/p/18459782