同步通讯和异步通讯
微服务基于Feign的调用就属于同步方式,存在一些问题
异步调用方案
异步调用常见实现就是事件驱动模式
优势一:服务解耦
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量削峰
什么是MQ
MQ的安装
docker在线拉取:
docker pull rabbitmq:3-management
执行下面的命令来运行MQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management2.集群部署
接下来,我们看看如何安装RabbitMQ的集群。
2.1.集群分类
在RabbitMQ的官方文档中,讲述了两种集群的配置方式:
- 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。 - 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。
我们先来看普通模式集群。
2.2.设置网络
首先,我们需要让3台MQ互相知道对方的存在。
分别在3台机器中,设置 /etc/hosts文件,添加如下内容:
``` 192.168.150.101 mq1 192.168.150.102 mq2 192.168.150.103 mq3 ```
并在每台机器上测试,是否可以ping通对方:
RabbitMQ的结构与概念
SpringAMQP
什么是SpringAMQP
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
利用SpringAMQP实现HelloWorld中的基础消息队列功能
1.引入AMQP依赖
因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中:
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.在publisher中编写测试方法,向simple.queue发送消息
2.1在publisher服务中编写application.yml,添加mq连接信息:
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码
2.2在publisher服务中新建一个测试类,编写测试方法:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; //类似于redisTemlate的命令工具类 @Test public void testSimpleQueue() { String queueName = "simple.queue"; //queueName 消息队列名称 String message = "hello, spring amqp!"; //向simple.queue这个消息队列发送的信息 rabbitTemplate.convertAndSend(queueName, message); //用RabbitTemplate.convertAndSend 发送
} }
3.在consumer中编写消费逻辑,监听simple.queue这个消息队列
3.1在consumer服务中编写application.yml,添加mq连接信息:
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码
3.2在consumer服务中新建一个类,编写消费逻辑:
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") //用RabbitListener注解监听指定消息队列 public void listenSimpleQueueMessage(String msg) throws InterruptedException { //msg 服务者发送的message消息 System.out.println("spring 消费者接收到消息 :【" + msg + "】"); } }
Work Queue 工作队列
Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下: 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue 在consumer服务中定义两个消息监听者,都监听simple.queue队列 消费者1每秒处理50条消息,消费者2每秒处理10条消息
步骤一:生产者循环发送消息到simple.queue
@Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, message__"; for (int i = 0; i < 50; i++) { // 发送消息 rabbitTemplate.convertAndSend(queueName, message + i); // 避免发送太快 Thread.sleep(20); } }
步骤二:编写两个消费者,都监听simple.queue
//在consumer服务中添加一个消费者,也监听simple.queue: @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者1接收到消息:【" + msg + "】"); Thread.sleep(25); } @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage2(String msg) throws InterruptedException { System.err.println("spring 消费者2接收到消息:【" + msg + "】"); Thread.sleep(100); }
修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限,从而避免效率低的服务器与效率高的服务器获得相同的工作量从而导致运行效率下降
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码 listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
发布( Publish )、订阅( Subscribe )
利用SpringAMQP演示FanoutExchange的使用
步骤1:在consumer服务声明Exchange、Queue、Binding
在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
@Configuration public class FanoutConfig { // 声明FanoutExchange交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } // 声明第1个队列 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //绑 定队列1和交换机 @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } // ... 略, 以相同方式声明第2个队列,并完成绑定 }
步骤2:在consumer服务声明两个消费者
在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
步骤3:在publisher服务发送消息到FanoutExchange
@Test public void testFanoutExchange() { // 队列名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, everyone!"; // 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
发布订阅-DirectExchange
步骤1:在consumer服务声明Exchange、Queue
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,
并利用@RabbitListener声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到Direct消息:【"+msg+"】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到Direct消息:【"+msg+"】 "); }
步骤2:在publisher服务发送消息到DirectExchange
@Test public void testDirectExchange() { // 队列名称 String exchangeName = "itcast.direct"; // 消息 String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; // 发送消息,参数依次为:交换机名称,RoutingKey,消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
发布订阅-TopicExchange
步骤1:在consumer服务声明Exchange、Queue
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,
并利用@RabbitListener声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到Topic消息:【"+msg+"】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到Topic消息:【"+msg+"】"); }
步骤2:在publisher服务发送消息到TopicExchange
@Test public void testTopicExchange() { // 队列名称 String exchangeName = "itcast.topic"; // 消息 String message = "喜报!孙悟空大战哥斯拉,胜!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
测试发送Object类型消息
说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
我们在consumer中利用@Bean声明一个队列:
@Bean public Queue objectMessageQueue(){ return new Queue("object.queue"); }
在publisher中发送消息以测试:
@Test public void testSendMap() throws InterruptedException { // 准备消息 Map<String,Object> msg = new HashMap<>(); msg.put("name", "Jack"); msg.put("age", 21); // 发送消息 rabbitTemplate.convertAndSend("object.queue", msg); }
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
我们在publisher服务引入依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
我们在publisher服务声明MessageConverter:
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
我们在consumer服务引入Jackson依赖:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
我们在consumer服务定义MessageConverter:
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
然后定义一个消费者,监听object.queue队列并消费消息:
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String, Object> msg) { System.out.println("收到消息:【" + msg + "】"); }
标签:服务,String,RabbitMQ,queue,消息,msg,consumer,public From: https://www.cnblogs.com/Yukino1903/p/16927345.html