一、概念
RabbitMQ架构模型分为客户端和服务端两部分,客户端包括生产者和消费者,服务端包括虚拟主机、交换器和队列。两者通过连接和信道进行通信。
整体的流程是生产者将消息发送到服务器,消费者从服务器中获取对应的消息。具体流程是生产者在发送消息前小确定发送给哪个虚拟机的哪个交换器,再由交换器通过路由键将消息发送给与之绑定的队列。最后消费者到指定的队列中获取自己的消息进行消费。
二、客户端和服务端
生产者和消费者都属于客户端,生产者是消息的发送方,将要发送的消息封装成一定的格式,发送给服务端。消费者是消息的接收方,负责消费消息体。
服务端包含虚拟主机、交换器和队列。虚拟主机用来对交换器和队列进行逻辑隔离,在同一个虚拟主机下,交换器和队列的名称不能重复。交换器负责接收生产者的消息,并根据规则分配给对应的队列。队列负责存储消息,生产者发送的消息会被存储到这里,而消费者从这里获取消息。
三、工作模式
模式 | 介绍 |
简单模式 | 简单模式就是生产者将消息发送给队列,消费者从队列中获取消息即可 |
工作队列模式 | 存在多个消费者,生产者将消息放到队列,多个消费者会依次进行消费。只有在消息条数是消费者数量的整数倍才能做到公平分配 |
广播模式 | 广播模式下,即使只发送一条消息,它对应的所有消费者都可以全部收到 |
路由模式 | 路由模式下,交换器会根据不同的路由键将消息发送给指定的队列,从而被特定的消费者消费。 |
动态路由模式 | 路由模式需要指定明确的路由键,动态路由模式可以支持带通配符的路由键 |
四、管理平台
启动RabbitMQ服务,访问http://localhost:15672/
默认的账户和密码都是guest,我们用这个账户登录后,会进入管理页面。
我们在Admin的标签页面里面添加一个虚拟主机。
五、项目实战
1.引入依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.11</version>
</dependency>
<!--lomobk-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
2.添加相关配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 虚拟主机名称
spring.rabbitmq.virtual-host=my-virtual
3.简单模式测试
生产者发送消息示例代码如下:
package com.example.rabbitmqtest.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author qx
* @date 2023/07/30
* @desc 生产者
*/
@RestController
@RequestMapping("/rabbit")
@Slf4j
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 简单模式-生产者发送消息
*
* @param routingKey 路由键
* @param message 消息
*/
@GetMapping("/producer")
public String send(String routingKey, String message) {
rabbitTemplate.convertAndSend(routingKey, message);
log.info("生产者发送消息:{}", message);
return "生产者发送消息成功";
}
}
消费者接收消息示例代码如下:
package com.example.rabbitmqtest.components;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2023/07/30
* @desc 消费者
*/
@Slf4j
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Consumer {
@RabbitHandler
public void receive(String message) {
log.info("接收到的消息:{}", message);
}
}
我们启动项目,访问生产者发送消息的请求。
我们在控制台可以看到消费者接收到了消息。
2023-07-30 10:34:54.151 INFO 6716 --- [nio-8080-exec-1] c.e.r.controller.ProducerController : 生产者发送消息:hello
2023-07-30 10:34:54.244 INFO 6716 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer : 接收到的消息:hello
4.工作队列模式测试
生产者示例代码如下:
package com.example.rabbitmqtest.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author qx
* @date 2023/07/30
* @desc 生产者
*/
@RestController
@RequestMapping("/rabbit")
@Slf4j
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 工作队列模式-生产者发送消息
*
* @param routingKey 路由键
* @param message 消息
*/
@GetMapping("/work")
public String send(String routingKey, String message) {
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend(routingKey, "第" + i + "条消息:" + message);
}
return "生产者发送消息成功";
}
}
消费者示例代码如下:
package com.example.rabbitmqtest.components;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2023/07/30
* @desc 消费者
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receiveOne(String message) {
log.info("receiveOne接收到的消息:{}", message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receiveTwo(String message) {
log.info("receiveTwo接收到的消息:{}", message);
}
}
启动项目,调用工作队列模式的测试请求
我们在控制台上看到消费者接收到的消息是均分获取的。
2023-07-30 10:49:44.805 INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer : receiveOne接收到的消息:第1条消息:hello
2023-07-30 10:49:44.805 INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer : receiveTwo接收到的消息:第2条消息:hello
2023-07-30 10:49:44.807 INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer : receiveOne接收到的消息:第3条消息:hello
2023-07-30 10:49:44.808 INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer : receiveTwo接收到的消息:第4条消息:hello
2023-07-30 10:49:44.808 INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer : receiveOne接收到的消息:第5条消息:hello
2023-07-30 10:49:44.808 INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer : receiveTwo接收到的消息:第6条消息:hello
2023-07-30 10:49:44.808 INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer : receiveOne接收到的消息:第7条消息:hello
2023-07-30 10:49:44.808 INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer : receiveTwo接收到的消息:第8条消息:hello
2023-07-30 10:49:44.808 INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer : receiveOne接收到的消息:第9条消息:hello
2023-07-30 10:49:44.809 INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer : receiveTwo接收到的消息:第10条消息:hello
5.广播模式测试
生产者示例代码如下:
/**
* 广播模式-生产者发送消息
*/
@GetMapping("/fanout")
public void sendFanout(String exchange, String message) {
rabbitTemplate.convertAndSend(exchange, "", message);
}
消费者示例代码如下:
package com.example.rabbitmqtest.components;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2023/07/30
* @desc 消费者
*/
@Component
@Slf4j
public class FanoutConsumer {
@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "fanout", type = "fanout")))
public void reciveMessage(String message) {
log.info("receiveMessage:{}", message);
}
}
启动项目,进行广播模式的测试
控制台显示接收到的消息情况
2023-07-30 11:24:56.608 INFO 9036 --- [ntContainer#2-1] c.e.r.components.FanoutConsumer : reciveOne:hello
2023-07-30 11:24:56.608 INFO 9036 --- [ntContainer#3-1] c.e.r.components.FanoutConsumer : reciveTwo:hello
6.路由模式测试
/**
* 路由模式
*/
@GetMapping("/direct")
public String sendDirect(String exchange, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return "路由模式发送消息成功";
}
package com.example.rabbitmqtest.components;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2023/07/30
* @desc 路由模式消费者
*/
@Slf4j
@Component
public class DirectConsumer {
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa", "bb"}, exchange = @Exchange(name = "direct", type = "direct")))
public void reciveOne(String message) {
log.info("reciveOne:{}", message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa"}, exchange = @Exchange(name = "direct", type = "direct")))
public void reciveTwo(String message) {
log.info("reciveTwo:{}", message);
}
}
启动项目根据路由键来发送消息
路由键为aa,所以两个消费者都接收到了消息
2023-07-30 11:51:20.254 INFO 3776 --- [ntContainer#2-1] c.e.r.components.DirectConsumer : reciveOne:hello
2023-07-30 11:51:20.254 INFO 3776 --- [ntContainer#3-1] c.e.r.components.DirectConsumer : reciveTwo:hello
路由键为bb,那么只有第一个消费者接收到了消息
2023-07-30 11:52:14.484 INFO 3776 --- [ntContainer#2-1] c.e.r.components.DirectConsumer : reciveOne:hello
7.动态路由模式测试
修改消费者代码
package com.example.rabbitmqtest.components;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2023/07/30
* @desc 动态路由模式消费者
*/
@Slf4j
@Component
public class TopicConsumer {
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa.bb.cc"}, exchange = @Exchange(name = "topic", type = "topic")))
public void reciveOne(String message) {
log.info("reciveOne:{}", message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa.bb.*"}, exchange = @Exchange(name = "topic", type = "topic")))
public void reciveTwo(String message) {
log.info("reciveTwo:{}", message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa.bb.#"}, exchange = @Exchange(name = "topic", type = "topic")))
public void reciveThree(String message) {
log.info("reciveThree:{}", message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa.#"}, exchange = @Exchange(name = "topic", type = "topic")))
public void reciveFour(String message) {
log.info("reciveFour:{}", message);
}
}
启动项目,传入动态的键值数据
我们在控制台上显示的数据知道只有3和4符合我们的动态路由规则,所以只有3和4接收到了服务器的消息。
2023-07-30 11:57:54.554 INFO 1524 --- [ntContainer#9-1] c.e.r.components.TopicConsumer : reciveFour:hello
2023-07-30 11:57:54.556 INFO 1524 --- [ntContainer#8-1] c.e.r.components.TopicConsumer : reciveThree:hello