初识MQ
同步和异步通讯
微服务间通讯有同步和异步两种方式:
同步通讯:就像打电话,需要实时响应。
异步通讯:就像微信,不需要马上回复。
同步通讯
SpringCloud中Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:
优点:
时效性较强,可以立即得到结果
缺点:
耦合度高:每次加入新的需求,都要修改原来的代码。
性能和吞吐能力下降:如果调用链过长则响应时间等于每次调用的时间之和。
有额外的资源消耗:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源。
有级联失败问题:如果服务提供者出现问题,所有调用方都会跟着出问题。如同多米诺骨牌一样,迅速导致整个微服务群故障。
异步通讯
Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。
好处:
吞吐量提升:无需等待接收方服务处理完成,响应更快速
故障隔离:服务没有直接调用,不存在级联失败问题
调用间没有阻塞,不会造成无效的资源占用
耦合度极低,每个服务都可以灵活插拔,可替换
流量削峰:不管发布事件的流量波动多大,都由Broker接收,接收方服务可以按照自己的速度去处理事件
缺点:
架构复杂了,业务没有明显的流程线,不好管理
需要依赖于Broker的可靠、安全、性能
技术对比
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现:RabbitMQ;ActiveMQ;RocketMQ;Kafka
几种常见MQ的对比:
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
快速入门
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址: https://www.rabbitmq.com/
安装RabbitMQ
下载镜像
方式一:在线拉取
docker pull rabbitmq:3.8-management
方式二:从本地加载,将镜像包上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
安装MQ
执行下面的命令来运行MQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=test \ -e RABBITMQ_DEFAULT_PASS=123 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8-management
查看
docker ps
MQ的基本结构
RabbitMQ中的一些角色:
publisher:生产者
consumer:消费者
exchange:交换机,负责消息路由
queue:队列,缓存消息
virtualHost:虚拟主机,隔离不同用户的exchange、queue、消息的隔离。
RabbitMQ消息模型
RabbitMQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法:
基本消息队列 (BasicQueue)
工作消息队列 (WorkQueue)
发布订阅 (Publish、Subscribe),又根据交换机类型不同分为三种:
Fanout Exchange:广播
Direct Exchange:路由
Topic Exchange:主题
入门案例
简单队列模式的模型图:
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息
publisher实现
基本消息队列的消息发送流程:
1. 建立connection
2. 创建channel
3. 利用channel声明队列
4. 利用channel向队列发送消息
5. 关闭连接和channel
1 public class PublisherTest { 2 @Test 3 public void testSendMessage() throws IOException, TimeoutException { 4 // 1.建立连接 5 ConnectionFactory factory = new ConnectionFactory(); 6 // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 7 factory.setHost("192.168.1.1"); 8 factory.setPort(5672); 9 factory.setVirtualHost("/"); 10 factory.setUsername("test"); 11 factory.setPassword("123"); 12 // 1.2.建立连接 13 Connection connection = factory.newConnection(); 14 15 // 2.创建通道Channel 16 Channel channel = connection.createChannel(); 17 18 // 3.创建队列 19 String queueName = "simple.queue"; 20 channel.queueDeclare(queueName, false, false, false, null); 21 22 // 4.发送消息 23 String message = "hello, rabbitmq!"; 24 channel.basicPublish("", queueName, null, message.getBytes()); 25 System.out.println("发送消息成功:【" + message + "】"); 26 27 // 5.关闭通道和连接 28 channel.close(); 29 connection.close(); 30 } 31 }
consumer实现
基本消息队列的消息接收流程:
1. 建立connection
2. 创建channel
3. 利用channel声明队列
4. 订阅消息:定义consumer的消费行为handleDelivery()
5. 利用channel将消费者与队列绑定
1 public class ConsumerTest { 2 public static void main(String[] args) throws IOException, TimeoutException { 3 // 1.建立连接 4 ConnectionFactory factory = new ConnectionFactory(); 5 // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 6 factory.setHost("192.168.150.101"); 7 factory.setPort(5672); 8 factory.setVirtualHost("/"); 9 factory.setUsername("itcast"); 10 factory.setPassword("123321"); 11 // 1.2.建立连接 12 Connection connection = factory.newConnection(); 13 14 // 2.创建通道Channel 15 Channel channel = connection.createChannel(); 16 17 // 3.创建队列 18 String queueName = "simple.queue"; 19 channel.queueDeclare(queueName, false, false, false, null); 20 21 // 4.订阅消息 22 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, 25 AMQP.BasicProperties properties, byte[] body) throws IOException { 26 // 5.处理消息 27 String message = new String(body); 28 System.out.println("接收到消息:【" + message + "】"); 29 } 30 }); 31 System.out.println("等待接收消息。。。。"); 32 } 33 }
标签:队列,factory,RabbitMQ,MQ,消息,channel From: https://www.cnblogs.com/sunny-sml/p/17644634.html