RabbitMQ-笔记
目录零、资料
一、RabbitMQ 常见的3种模型
1.基本队列
2.工作队列
3.发布订阅(解决前2个模型阅后即焚的问题):Fanout(广播交换机,一对多)、Dircet(直连交换机,一对一)、Topic(主题交换机,模糊匹配)
1.1、基本队列
前提:已经手动在RabbitMQ中创建了用户、虚拟主机、队列 步骤: > 1、添加依赖 > 2、添加配置 > 3、生产者发布消息到队列 > 4、消费者监听消息队列
1、添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、添加配置
spring:
rabbitmq:
host: localhost
port: 5672
username: zs
password: root
virtual-host: itcast
3、生产者发布消息到队列
@Autowired
private RabbitTemplate rabbitTemplate;
public void test1(){
String queueName = "simple.queue";
String msg = "hello mq";
rabbitTemplate.convertAndSend(queueName, msg);
}
4、消费者监听消息队列
@Component
public class ConsumerListener {
@RabbitListener(queues = "simple.queue")
public void listen(String msg) throws InterruptedException{
System.out.println("消费的消息为:"+msg);
}
}
1.2、工作队列
前提:已经手动在RabbitMQ中创建了用户、虚拟主机、队列 步骤: > 1、添加依赖 > 2、添加配置 > 3、生产者发布消息到队列 > 4、消费者监听消息队列
1、添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、添加配置
spring:
rabbitmq:
host: localhost
port: 5672
username: zs
password: root
virtual-host: itcast
listener:
simple:
pre-fetch: 1 # 每次只获取1条消息,处理完再拿下一个消息
3、生产者发布消息到队列
@Autowired
private RabbitTemplate rabbitTemplate;
public void test1(){
String queueName = "simple.queue";
String msg = "hello mq ";
for(int i=0;i<10;i++){
rabbitTemplate.convertAndSend(queueName, msg+i);
}
}
4、消费者监听消息队列
@Component
public class ConsumerListener {
// 消费者1
@RabbitListener(queues = "simple.queue")
public void listen1(String msg) throws InterruptedException{
System.out.println("消费的消息为:"+msg);
}
// 消费者2
@RabbitListener(queues = "simple.queue")
public void listen2(String msg) throws InterruptedException{
System.out.println("消费的消息为:"+msg);
}
}
1.3、发布订阅
交换机类型: > Fanout(广播交换机,一对多) > Dircet(直连交换机,一对一) > Topic(主题交换机,模糊匹配)
1.3.1 Fanout交换机
配置类:
@Configuration
public class FanoutExchangeConfig {
@Bean
public FanoutExchange fanoutExchange(){
String fanoutExchangeName = "itcast.fanout";
return new FanoutExchange(fanoutExchangeName);
}
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
绑定RabbitMQ的监听器:
@Component
public class ConsumerListener3 {
@RabbitListener(queues = "fanout.queue1")
public void listen1(String msg){
System.out.println(msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listen2(String msg){
System.out.println(msg);
}
}
问题:SpringBoot集成rabbitMQ后不会自动创建队列?
原因:https://blog.csdn.net/weixin_48841931/article/details/130137092
1.3.2 Dirct交换机
此处以1个交换机,2个队列为例,2个消费者为例。
spring:
rabbitmq:
host: localhost
port: 5672
username: zs
password: root
virtual-host: itcast
listener:
simple:
prefetch: 1
配置类:
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("itcast.direct");
}
@Bean
public Queue queue1() {
return new Queue("direct.queue1");
}
@Bean
public Binding binding1(Queue queue1, DirectExchange directExchange) {
return BindingBuilder.bind(queue1).to(directExchange).with("key1");
}
@Bean
public Queue queue2(){
return new Queue("direct.queue2");
}
@Bean
public Binding binding2(Queue queue2,DirectExchange directExchange){
return BindingBuilder.bind(queue2).to(directExchange).with("key2");
}
}
监听器(消费者):
@Component
public class ConsumerListener4 {
// 消费者1 接收到消息后的处理方法
@RabbitListener(bindings =@QueueBinding(
value = @Queue(name = "direct.quque1"),
exchange = @Exchange(name="itcast.direct",type = ExchangeTypes.DIRECT),
key="key1"
))
public void listen1(String msg) {
System.out.println("key1 "+msg);
}
// 消费者2 接收到消息后的处理方法
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue2"),
exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
key = "key2"
))
public void listen2(String msg) {
System.out.println("key2 "+msg);
}
}
生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test4_1() {
String msg = "hello mq --direct";
String exchange = "itcast.direct";
rabbitTemplate.convertAndSend(exchange, "key1", msg);
}
@Test
public void test4_2() {
String msg = "hello mq --direct";
String exchange = "itcast.direct";
rabbitTemplate.convertAndSend(exchange, "key2", msg);
}
1.3.3 Topic交换机
此处以1个交换机,2个队列为例,2个消费者为例。
spring:
rabbitmq:
host: localhost
port: 5672
username: zs
password: root
virtual-host: itcast
listener:
simple:
prefetch: 1
配置类:
@Configuration
public class TopicConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("itcast.topic");
}
@Bean
public Queue queue1(){
return new Queue("topic.queue1");
}
@Bean
public Binding binding1(Queue queue1,TopicExchange topicExchange){
return BindingBuilder.bind(queue1).to(topicExchange).with("china.#");
}
@Bean
public Queue queue2(){
return new Queue("topic.queue2");
}
@Bean
public Binding binding2(Queue queue2,TopicExchange topicExchange){
return BindingBuilder.bind(queue2).to(topicExchange).with("us.#");
}
}
监听器(消费者)【注意:这里的注解中的key里的#
表示大于等于0个字符,*
代表1个字符】:
@Configuration
public class ConsumerListener5 {
// 消费者1
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue1"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listen1(String msg){
System.out.println("china : "+msg);
}
// 消费者2
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue2"),
exchange = @Exchange(value = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "us.#"
))
public void listen2(String msg){
System.out.println("us : "+msg);
}
}
生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test5_1() {
String msg = "hello mq --topic";
String exchange = "itcast.topic";
rabbitTemplate.convertAndSend(exchange, "china.zj", msg);
}
@Test
public void test5_2() {
String msg = "hello mq --topic";
String exchange = "itcast.topic";
rabbitTemplate.convertAndSend(exchange, "us.bst", msg);
}
1.4 消息转化器
在上面的使用中,生产者发送消息是通过:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test5_2() {
// 此处省略了 exchange, routingKey, msg 的定义
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
这里的 msg 通过代码提示可以知道是Object类型,但是在RabbitMQ的管理界面可以看到这个Object类型使用的是JDK的序列化,数据又长又看不到,因此需要重写消息转化器。
注意:消费者和生产者如果在2个模块中,则都需要引入依赖、写配置类
依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
配置类:
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyConfig {
@Bean
public MessageConverter jsonMessageConvert(){
return new Jackson2JsonMessageConverter();
}
}
标签:return,String,笔记,Queue,Bean,RabbitMQ,msg,public
From: https://www.cnblogs.com/cywdder/p/18081078