服务异步通讯——RabbitMQ复习随笔
微服务间通讯有同步和异步两种方式:
同步通讯:就像打电话,需要实时响应。
异步通讯:就像发邮件,不需要马上回复。
两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。
而上文我们通过 Feign 方式进行的调用则属于同步通讯方式,虽然调用可以实时得到结果,但存在下面的问题:
使用异步通讯则可以解决耦合度高、级联失败等问题,提高性能,减少资源的浪费。
认识消息队列MQ
异步调用常见实现就是事件驱动模式:
我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。
在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。
订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。
其中的关键部分——Broker 是一种中间件,它负责在事件驱动模式中的各个部分之间传递消息。在消息发送方发送消息后,Broker 会将消息存储在一个消息队列中,然后等待接收消息的一方来取出消息。这样,Broker 就可以缓解各个部分之间的通讯压力,同时也可以保证消息的可靠传递。
目前主流的MQ(消息队列)中间件有:RabbitMQ、ActiveMQ、RocketMQ、Kafka。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
- 追求可用性:Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性:RabbitMQ、RocketMQ
- 追求吞吐能力:RocketMQ、Kafka
- 追求消息低延迟:RabbitMQ、Kafka
一、RabbitMQ 快速入门
我们可以通过 Docker 完成 RabbitMQ 的快速部署。
- 从镜像仓库中拉取镜像:
docker pull rabbitmq:3-management
- 执行 run 命令运行容器:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
RabbitMQ 启动后,输入 ip:15672 进行登录,随后输入设定的账号密码进入管理面板的UI界面。
RabbitMQ 中的一些概念:
- publisher:生产者
- consumer:消费者
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
我们新建一个演示项目尝试着用 java 发送一个消息并接收它:
- 在父工程中导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 下面首先提供了一个发送消息的测试方法作为 Publisher(生产者):
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("username");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
- 用一个主方法模拟 Consumer(消费者)接收消息:
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("username");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息...");
}
在消费者接收数据前,我们可以在 RabbitMQ 的界面的 Queue 中看到名为 simple.queue
的队列,其中包含了“hello, rabbitmq!”这条消息,consumer 方法运行后便将它打印在控制台上了。
显然,上述代码重复臃肿而过于冗长。好在,Spring 为我们操作 RabbitMQ 和其他 MQ 提供了封装好的模块——SpringAMQP。
二、SpringAMPQ 操作 RabbitMQ
SpringAMQP 是基于 AMQP 协议定义的一套 API 规范,提供了模板来发送和接收消息。包含两部分,其中 spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现。
Ⅰ、BasicQueue
以下步骤用于构建一个基于 SpringAMQP 实现的基本消息队列(BasicQueue):
- 引入 AMQP 依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在 publisher 中编写测试方法,向 simple.queue 发送消息:
# 在配置文件添加 SpringAMQP 的连接信息配置
spring:
rabbitmq:
host: 127.0.0.1 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: username # 用户名
password: 123321 # 密码
//注入对象
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试 SpringAMQP 封装的基本消息队列API
*/
@Test
public void testSendMessage2SimpleQueue() {
String queueName = "simple.queue";
String message = "hello, SpringAMQP!";
rabbitTemplate.convertAndSend(queueName, message);
}
- 在 consumer 中编写消费逻辑,监听 simple.queue:
# 配置信息,下略
spring:
# ...
//监听基本消息队列(BasicQueue)
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.err.println("消费者接收到simple.queue的消息是:" + msg);
}
Ⅱ、WorkQueue
RabbitMQ 提供了五种不同的消息队列模型,对应了五种不同的做法。除了上方测试过的BasicQueue(基本队列) 外,还有WorkQueue(工作队列)、FanoutQueue(广播队列)、DirectQueue(路由队列)、TopicQueue(话题队列)。
WorkQueue——工作队列模型,多个消费者共同接收处理消息,每条消息仅一个消费者处理,结束后销毁。可以提高消息处理速度,避免队列消息堆积。
- 在 publisher 服务中添加一个测试方法,循环发送50条消息到 simple.queue 队列:
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
- 编写两个消费者,都监听 simple.queue:
//用工作队列互斥接收消息
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue01(String msg) throws InterruptedException {
System.out.println("消费者01接收到work.queue的消息是:[" + msg + "]" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue02(String msg) throws InterruptedException {
System.err.println("消费者02接收到work.queue的消息是:[" + msg + "]" + LocalTime.now());
Thread.sleep(20);
}
可以看到俩个消费者分别各处理了部分消息,打印在了控制台上。
另外,我们通过设置 prefetch 来控制消费者预取的消息数量:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
Ⅲ、FanoutQueue
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了 exchange(交换机)。
常见 exchange 类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
其中,Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue:
- 在 consumer 服务声明 Exchange、Queue、Binding;
在 consumer 服务创建一个类,添加@Configuration
注解,并声明 FanoutExchange、Queue 和绑定关系对象 Binding,代码如下:
/**
* 使用@Bean注入配置FanoutExchange
* 为广播交换机绑定队列:发送消息时将向全部队列发布相同消息
*/
@Configuration
public class FanoutConfig {
//注入交换机对象
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
//注入第一个广播队列
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
//为第一个队列绑定交换机
@Bean
public Binding fanoutBinging1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {//参数的类型和名称须于上面定义的方法相同
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutBinging2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
- 在 consumer 服务声明两个消费者:
//用广播交换机同步接收消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.err.println("消费者接收到fanout.queue1的消息是:" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.err.println("消费者接收到fanout.queue2的消息是:" + msg);
}
- 在 publisher 服务发送消息到 FanoutExchange:
@Test
public void testSendFanoutExchange() {
String exchangeName = "fanout.exchange";
String message ="hello, every one!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
结果自然是两个消费者能同时收到消息。
Ⅳ、DirectQueue
Direct Exchange 会将接收到的消息根据规则路由到指定的 Queue,因此称为路由模式(routes)。
- 每一个 Queue 都与 Exchange 设置一个
BindingKey
- 发布者发送消息时,指定消息的
RoutingKey
- Exchange 将消息路由到
BindingKey
与消息RoutingKey
一致的队列
- 在 consumer 服务声明 Exchange、Queue;
但这次我们不使用配置类@Bean
注入方式配置 Exchange,而是利用@RabbitListener
声明Exchange、Queue、RoutingKey。
//使用路径交换机指定接收消息的队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者接收到direct.queue1的消息是:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}//路由标识
))
public void listenDirectQueue2(String msg) {
System.out.println("消费者接收到direct.queue2的消息是:" + msg);
}
- 在 publisher 服务发送消息到 DirectExchange:
@Test
public void testSendDirectExchange() {
String exchangeName = "direct.exchange";
String message ="hello, blue!";
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
publisher 发送消息后,只有 routingKey
是 "blue" 的监听队列 direct.queue1
的 consumer 能够收到消息。
Ⅴ、TopicQueue
TopicExchange 与DirectExchange类似,区别在于 routingKey 必须是多个单词的列表,并且以 . 分割。
Queue 与 Exchange 指定 BindingKey 时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
- 在 consumer 服务声明 Exchange、Queue:
//使用话题交换机指定接收消息的队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue1(String msg) {
System.out.println("消费者接收到topic.queue1的消息是:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = {"#.news"}//路由标识
))
public void listenTopicQueue2(String msg) {
System.out.println("消费者接收到topic.queue2的消息是:" + msg);
}
- 在 publisher 服务发送消息到 TopicExchange:
@Test
public void testSendTopicExchange() {
String exchangeName = "topic.exchange";
rabbitTemplate.convertAndSend(exchangeName, "china.news", "二十大于近日正式闭幕。");
rabbitTemplate.convertAndSend(exchangeName, "china.stock.market", "股市状况良好。");
rabbitTemplate.convertAndSend(exchangeName, "japan.news", "安倍晋三于今日被刺杀。");
}
其中,消息 "china.news" 同时符合 "china.#" 和 "#.news";"china.stock.market" 符合关键字 "china.#";"japen.news" 则符合 "#.news"。它们将被合适的 consumer 接收并打印。
三、消息转换器
在 SpringAMQP 的发送方法中,接收消息的类型是 Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
为了测试,我们在 publisher 中发送 Map 对象的消息:
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("simple.queue","", msg);
}
可队列的结果并不是原本的字符串数据。
事实上,Spring 的对消息对象的处理是由 org.springframework.amqp.support.converter.MessageConverter 来处理的。而默认实现是 SimpleMessageConverter,基于JDK的 ObjectOutputStream 完成序列化。
显然,JDK 序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用 JSON 方式来做序列化和反序列化。
如果要修改只需要定义一个 MessageConverter 类型的 Bean 即可:
- 在 publisher 和 consumer 两个服务中都引入依赖:
<!-- jackson依赖 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
- 我们在 publisher 服务声明 MessageConverter:
/**
* 注入消息转换器
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}