1. pom.xml 配置
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置文件
spring:
rabbitmq:
first:
host: 101.22.08.161
port: 5672
username: dhcs
password: dhcs
virtualhost: /
second:
host: 101.32.80.138
port: 5672
username: admin
password: admin
virtualhost: /account
3. 编写配置类
package com.example.mybatis.config.mq;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitMQConfig {
// ============================================ 定义连接connectionFactory ====================================================================
@Bean(name = "mainConnectionFactory")
@Primary
public ConnectionFactory mainConnectionFactory(
@Value("${spring.rabbitmq.first.host}") String host,
@Value("${spring.rabbitmq.first.port}") int port,
@Value("${spring.rabbitmq.first.username}") String username,
@Value("${spring.rabbitmq.first.password}") String password,
@Value("${spring.rabbitmq.first.virtualhost}") String virtualhost) {
return connectionFactory(host, port, username, password, virtualhost);
}
@Bean(name = "secondConnectionFactory")
public ConnectionFactory secondConnectionFactory(
@Value("${spring.rabbitmq.second.host}") String host,
@Value("${spring.rabbitmq.second.port}") int port,
@Value("${spring.rabbitmq.second.username}") String username,
@Value("${spring.rabbitmq.second.password}") String password,
@Value("${spring.rabbitmq.second.virtualhost}") String virtualhost) {
return connectionFactory(host, port, username, password, virtualhost);
}
public CachingConnectionFactory connectionFactory(String host, int port, String username, String password, String virtualhost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
return connectionFactory;
}
// ============================================ 定义RabbitTemplate ====================================================================
@Bean(name = "mainRabbitTemplate")
@Primary
public RabbitTemplate mainRabbitTemplate(@Qualifier("mainConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate mainRabbitTemplate = new RabbitTemplate(connectionFactory);
mainRabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return mainRabbitTemplate;
}
@Bean(name = "secondRabbitTemplate")
public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
secondRabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return secondRabbitTemplate;
}
// ============================================ 定义监听Listener ====================================================================
@Bean(name = "mainFactory")
@Primary
public SimpleRabbitListenerContainerFactory mainFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("mainConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new Jackson2JsonMessageConverter());
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name = "secondFactory")
public SimpleRabbitListenerContainerFactory secondFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(20);
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
}
4. 发送消息
package com.example.mybatis.config.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class Sender1 {
@Resource(name = "mainRabbitTemplate")
protected RabbitTemplate rabbitTemplate;
public void test() {
this.sendMessage("dhcs-exchange", "DHCS2KF1", "1111111111");
}
/**
* @param exchange 交换机
* @param routingKey 路由
* @param message 消息内容
* @return boolean
* @Description 发送消息
*/
private boolean sendMessage(String exchange, String routingKey, String message) {
long start = System.currentTimeMillis();
try {
Message messageObj = MessageBuilder.withBody(message.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(exchange, routingKey, messageObj);
} catch (Exception e) {
log.error("【mq发送失败】", e);
return false;
} finally {
long end = System.currentTimeMillis();
log.info("【mq发送】入参:{},耗时:{}ms", message, (end - start));
}
return true;
}
}
package com.example.mybatis.config.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class Sender2 {
@Resource(name = "secondRabbitTemplate")
protected RabbitTemplate rabbitTemplate;
public void test() {
this.sendMessage("ssoAccount_exchange", "", "123456789");
}
/**
* @param exchange 交换机
* @param routingKey 路由
* @param message 消息内容
* @return boolean
* @Description 发送消息
*/
private boolean sendMessage(String exchange, String routingKey, String message) {
long start = System.currentTimeMillis();
try {
Message messageObj = MessageBuilder.withBody(message.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(exchange, routingKey, messageObj);
} catch (Exception e) {
log.error("【mq发送失败】", e);
return false;
} finally {
long end = System.currentTimeMillis();
log.info("【mq发送】入参:{},耗时:{}ms", message, (end - start));
}
return true;
}
}
5. 接收消息
package com.example.mybatis.config.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Listener1 {
@RabbitListener(queuesToDeclare = @Queue("DHCS2KF1"), containerFactory = "mainFactory")
@RabbitHandler
public void process(String message) {
log.info("Listener1 接收消息:{} ", message);
try {
// 逻辑处理
System.out.println("Listener1确认消费完成 ..............................");
} catch (Exception e) {
log.error("Listener1确认消费异常", e);
} finally {
log.error("Listener1确认消费:{}", message);
}
}
}
package com.example.mybatis.config.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Listener2 {
@RabbitListener(queuesToDeclare = @Queue("sso.account.chewu.queue"), containerFactory = "secondFactory")
@RabbitHandler
public void process(String message) {
log.info("Listener2 接收消息:{} ", message);
try {
// 逻辑处理
System.out.println("Listener2确认消费完成 ..............................");
} catch (Exception e) {
log.error("Listener2确认消费异常", e);
} finally {
log.error("Listener2确认消费:{}", message);
}
}
}
标签:集成,connectionFactory,SpringBoot,springframework,String,rabbitmq,org,import,amqp
From: https://www.cnblogs.com/lovedaodao/p/17150506.html