首页 > 其他分享 >微服务之RabbitMQ

微服务之RabbitMQ

时间:2022-11-26 14:45:00浏览次数:41  
标签:服务 String RabbitMQ queue 消息 msg consumer public

同步通讯和异步通讯

 

 

微服务基于Feign的调用就属于同步方式,存在一些问题

 

 

 

 

 

 

 

 

异步调用方案

异步调用常见实现就是事件驱动模式

优势一:服务解耦

 

 

 

优势二:性能提升,吞吐量提高

 

 

 

优势三:服务没有强依赖,不担心级联失败问题

 

 

 

 

优势四:流量削峰

 

 

 

 

 

 什么是MQ

 

 

 MQ的安装

 docker在线拉取:

docker pull rabbitmq:3-management

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
2.集群部署
接下来,我们看看如何安装RabbitMQ的集群。
2.1.集群分类
在RabbitMQ的官方文档中,讲述了两种集群的配置方式:
- 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。 - 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。


我们先来看普通模式集群。
2.2.设置网络
首先,我们需要让3台MQ互相知道对方的存在。
分别在3台机器中,设置 /etc/hosts文件,添加如下内容:
``` 192.168.150.101 mq1 192.168.150.102 mq2 192.168.150.103 mq3 ```
并在每台机器上测试,是否可以ping通对方:    

RabbitMQ的结构与概念

 

 

 

 

 

 

 

 

 

 

SpringAMQP

什么是SpringAMQP

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

 

 

利用SpringAMQP实现HelloWorld中的基础消息队列功能

1.引入AMQP依赖

因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中:

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.在publisher中编写测试方法,向simple.queue发送消息

2.1在publisher服务中编写application.yml,添加mq连接信息:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机 
    username: itcast # 用户名
    password: 123321 # 密码

2.2在publisher服务中新建一个测试类,编写测试方法:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate; //类似于redisTemlate的命令工具类
    @Test
    public void testSimpleQueue() { 
        String queueName = "simple.queue";   //queueName 消息队列名称
        String message = "hello, spring amqp!"; //向simple.queue这个消息队列发送的信息
        rabbitTemplate.convertAndSend(queueName, message); //用RabbitTemplate.convertAndSend  发送 
} }

3.在consumer中编写消费逻辑,监听simple.queue这个消息队列

3.1在consumer服务中编写application.yml,添加mq连接信息:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机 
    username: itcast # 用户名
    password: 123321 # 密码

3.2在consumer服务中新建一个类,编写消费逻辑:

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")  //用RabbitListener注解监听指定消息队列
    public void listenSimpleQueueMessage(String msg) throws InterruptedException { //msg 服务者发送的message消息
        System.out.println("spring 消费者接收到消息 :【" + msg + "】");
    }
}

 

 

Work Queue 工作队列

 

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

 

 

模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下: 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue 在consumer服务中定义两个消息监听者,都监听simple.queue队列 消费者1每秒处理50条消息,消费者2每秒处理10条消息

步骤一:生产者循环发送消息到simple.queue

@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message__";
    for (int i = 0; i < 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        // 避免发送太快
        Thread.sleep(20);
    }
}

步骤二:编写两个消费者,都监听simple.queue

 

//在consumer服务中添加一个消费者,也监听simple.queue:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    System.out.println("spring 消费者1接收到消息:【" + msg + "】");
    Thread.sleep(25);
}

@RabbitListener(queues = "simple.queue") 
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
    System.err.println("spring 消费者2接收到消息:【" + msg + "】");
    Thread.sleep(100);
}

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限,从而避免效率低的服务器与效率高的服务器获得相同的工作量从而导致运行效率下降

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机 
    username: itcast # 用户名
    password: 123321 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

 

 

发布( Publish )、订阅( Subscribe )

 

 

 

 

利用SpringAMQP演示FanoutExchange的使用

 

 

步骤1:在consumer服务声明Exchange、Queue、Binding

 

 

在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:

@Configuration
public class FanoutConfig {
    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    // 声明第1个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //绑 定队列1和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //  ... 略, 以相同方式声明第2个队列,并完成绑定
}

步骤2:在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2") 
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

步骤3:在publisher服务发送消息到FanoutExchange

@Test
public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    // 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息 
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

 

 

发布订阅-DirectExchange

 

 

 

 

步骤1:在consumer服务声明Exchange、Queue

在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,

并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到Direct消息:【"+msg+"】 ");
}

步骤2:在publisher服务发送消息到DirectExchange

@Test
public void testDirectExchange() {
    // 队列名称
    String exchangeName = "itcast.direct";
    // 消息 
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息,参数依次为:交换机名称,RoutingKey,消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

 

 

发布订阅-TopicExchange

 

 

 

 

步骤1:在consumer服务声明Exchange、Queue

在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,

并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到Topic消息:【"+msg+"】");
}
 
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到Topic消息:【"+msg+"】");
}

步骤2:在publisher服务发送消息到TopicExchange

@Test
public void testTopicExchange() {
    // 队列名称
    String exchangeName = "itcast.topic";
    // 消息 
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

 

 

 

测试发送Object类型消息

说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

我们在consumer中利用@Bean声明一个队列:

@Bean
public Queue objectMessageQueue(){
    return new Queue("object.queue");
}

在publisher中发送消息以测试:

@Test
public void testSendMap() throws InterruptedException {
    // 准备消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "Jack"); 
    msg.put("age", 21);
    // 发送消息
    rabbitTemplate.convertAndSend("object.queue", msg);
}

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

我们在publisher服务引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

我们在publisher服务声明MessageConverter:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter(); 
}

我们在consumer服务引入Jackson依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

我们在consumer服务定义MessageConverter:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter(); 
}

然后定义一个消费者,监听object.queue队列并消费消息:

@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg) {
    System.out.println("收到消息:【" + msg + "】"); 
}

 

 

标签:服务,String,RabbitMQ,queue,消息,msg,consumer,public
From: https://www.cnblogs.com/Yukino1903/p/16927345.html

相关文章

  • 服务器配置怎么查看
    服务器配置怎么查看​在我们找服务器商买服务器时,一般都是根据自己需求来选择需要什么配置的服务器。​选服务器时主要看CPU、内存、硬盘、带宽、这几个主要配置今天艾西就......
  • 微服务之Docker
    大型项目组件较多,运行环境也较为复杂,部署时会碰到一些问题:依赖关系复杂,容易出现兼容性问题开发、测试、生产环境有差异 Docker如何解决大型项目依赖关系复杂,不同组件......
  • knock端口敲门服务
    端口敲门服务,即:knockd服务。该服务通过动态的添加iptables规则来隐藏系统开启的服务,使用自定义的一系列序列号来“敲门”,使系统开启需要访问的服务端口,才能对外访问。不使......
  • [Bug0061] RabbitMQ 报错 An unexpected connection driver error occured
    问题Java项目连接RabbitMQ报错Anunexpectedconnectiondrivererroroccured场景集成RabbitMQ的开源项目启动报错原因yml中端口配置错误,也是小白很容易犯错之一,配......
  • Windows操作系统下RabbitMQ下载安装
    前言Rabbitmq是一个消息队列Erlang可视化工具注意Rabbitmq和Erlang是有版本对应关系的查看地址:https://www.rabbitmq.com/which-erlang.html下文安装使用24.2.2+v3.......
  • 兆骑科创,海外高层次人才引进服务平台
    兆骑科创,海外高层次人才引进服务平台推动国家创新驱动高质量现代化发展是我们当前建设社会主义现代化强国的重要战略举措,随着当今科技水平不断向上攀升,技术发展日新月异,经济......
  • 搭建NFS文件服务器
    一、服务器端安装yuminstallrpcbindnfs-utils立即启动,配置开机自动启动systemctlstartrpcbind&&systemctlenablerpcbindsystemctlstartnfs-server&&s......
  • 微服务、gGRPC、protobuf、rest和json
    微服务、gGRPC、protobuf、rest和json到目前为止,基于REST的API已经成为大多数服务间通信的首选架构。虽然基于REST/JSON的通信有几个好处,并且得到跨语言和提供......
  • 【Azure 应用服务】Azure Powershell Function 出错 The term 'Connect-AzAccount' is
    问题描述在AzureFunction中,执行Powershell的Function脚本时,先后出现1:[Error]ERROR:Theterm'Connect-AzAccount'isnotrecognizedasanameofacmdlet,function,......
  • ssh连远程centos服务器时,提示报错Unable to negotiate with **** port 22: no matchin
    解决:~% cd.ssh进入.ssh目录  会看到一个config,然后打开加入这段代码:Host*HostkeyAlgorithms+ssh-rsaPubkeyAcceptedKeyTypes+ssh-rsa这样就可以了。......