首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2023-08-01 16:05:28浏览次数:24  
标签:NAME 队列 RabbitMQ 交换机 消息 channel 消费者

目录

  • 一. 使用场景
  • 1. 异步处理
  • 2. 应用解耦
  • 3. 流量削峰
  • 二. 五种消息模型
  • 一. HelloWorld
  • 消息确认ACK
  • 二. Worker模型
  • 订阅模型分类
  • 二. 广播模型--Fanout
  • 三 . 订阅模式--Direct定向
  • 三 . 订阅模式--Topic通配
  • 二. 持久化
  • 1. ACK机制
  • 2. 持久化交换机
  • 3. 持久化队列
  • 4. 持久化消息
  • 三. SpringAMQP
  • 1. 环境搭建

消息队列是典型的生产者-消费者模型,生产者只管发送消息,消费者只管监听队列取出消息,没有业务逻辑的侵入,实现了生产者和消费者的解耦,这篇博客主要从如下几个方面整理相关RabbitMQ的知识点

  1. 消息中间件在项目中的使用场景
  2. RabbitMQ常见的五种消息模型
  3. 如何避免消息的丢失
  4. SpringAMQP

目前比较主流的两种MQ分别是JMSAMQP

  • AMQP(Advance Message Queuing Protocol) 高级消息队列协议,他是一种协议,而不是具体的实现,换句话说,只要满足它的协议,使用什么客户端,什么语言实现都没关系!
  • JMS(Java MessageService) 实际上是JMS API ,是sun公司提出来的消息标准

相同点: 两个担任的角色差不多,都是依据接口实现服务间的调用
不同点: 前者跨平台,跨语言,支持五种消息模型,后者只适用于java,仅规定了两种消息模型

常见的MQ产品

  • ActiveMQ: 基于JMS
  • RabbitMQ: 基于AMQP协议,erlang语言开发,稳定性好
  • RockerMQ: 基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量

一. 使用场景

1. 异步处理

假设这样一个应用场景:
新用户来注册了,我们要求他填写手机的验证码,前端异步把手机号发送到后台,调用短息微服务发送短信,这时,用户可以接着填写剩下的信息,等验证码来了,一并提交给后台

2. 应用解耦

假设这样一个应用场景:

商品微服务,调用A方法,修改了商品的信息,静态页面微服务得重新生成静态页,搜索微服务得重新创建新的文档,但是问题来了,有关静态页的生成的所有逻辑方法都在静态页微服务,搜索微服务雷同,我们总不至于在 A()里面去再写一遍关于生成静态页和创键新文档的逻辑吧,这是用RabbitMQ的作用来了

  • 不同的微服务之间使用RabbitMQ进行通信, 商品修改了,那我们就使用Dirct模式,并把商品id当作消息体,发送给静态页微服务以及搜微服务就好了,他只管发送,不管别的微服务怎么处理实现了 应用间的解耦

3. 流量削峰

流量削峰一般在秒杀活动中应用广泛

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再做后续处理.

二. 五种消息模型

点击进入官网教程

一. HelloWorld

通过这个模型,可以知道,生产者想发送消息要做哪些准备工作 消费者想接收消息需要哪些准备工作,以及如何接收

RabbitMQ_发送消息

  • P(producer/ publisher):生产者,发送消息的服务.
  • C(consumer):消费者,接收消息的服务,
  • 红色区域就是MQ中的Q,可以把它理解成一个邮箱
  • 首先信件来了不强求必须马上马去拿
  • 其次,它是有最大容量的(受主机和磁盘的限制,是一个缓存区)
  • 然后,不仅仅我们可以去拿新建,家人亲戚朋友,甚至是小偷,只要他有钥匙,就可以去拿(多个消费者监听同一个队列,争抢消息)

坐标

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

java代码,RabbitMQ-HelloWorld

生产者

private final static String QUEUE_NAME = "simple_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World!";
        // 向指定的队列中发送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }

RabbitMQ_RabbitMQ_02

消费者

private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列, 队列可以重复声明,但是做好和生产者声明的参数保持一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {//这是个匿名内部类

            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用(回调函数)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。 - 一般为了安全,我们不选择把第二个参数写成true   我们选择手动ack
        channel.basicConsume(QUEUE_NAME, true, consumer); //将上面的队列和消费者进行绑定,第二个字段设置为true,自动进行消息确认,一旦有消息就回调函数接收
        // 消费者类启动后,整个的过程不会停止,就像js的绑定事件,可以看到右上角和左下角的红点一直亮着
    }

消息确认ACK

如果上面的消费者出现异常的话,程序也就停止了,那我们的业务逻辑就没办法执行,因此我们禁用autoACK,选择手动ack

/*@param queue the name of the queue
* @param autoAck true if the server should consider messages
* @param callback an interface to the consumer object
*/
  channel.basicConsume(QUEUE_NAME, false, consumer);

在回调函数中添加:

...业务逻辑
// 手动进行ACK
channel.basicAck(envelope.getDeliveryTag(), false);

这样,在RabbitMQ中的消息状态是这样的 Ready-->Uacked-- 出现异常 --> Ready

二. Worker模型

RabbitMQ_微服务_03

工作队列,竞争消费模式,可以看到,通同一个队列我们绑定上了多个消费者,消费者争抢着消费消息,这可以有效的避免消息堆积,比如对于短信微服务集群来说就可以使用这种消息模型,来了请求,大家抢着消费掉,别等着

  • 如何实现?
  • 相对于上面的HelloWorld这其实就是相同的服务我们启动了多次罢了,自然就是这种架构
  • 能者多劳?

给队列添加一条属性,不再是队列把任务平均分配开给消费者,而是让消费者,消费完了后,问队列要新的任务,这样能者多劳

// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);

订阅模型分类

  1. 不同的订阅模型是根据交换机(Exchange)的类型划分的
  2. 订阅模型有三种
  • Fanout: 广播模型,将消息发送给绑定给交换机的所有队列(因为他们使用的是同一个RoutingKey)
  • Direct: 定向:把消息发送给拥有指定Routing Key (路由键)的队列
  • Topic: 通配符,把消息传递给拥有 符合Routing Patten(路由模式)的队列

其中每个消费者拥有属于自己的队列,生产者直接把消息发送给交换机,由交换机决定到底把消息发送给谁

二. 广播模型--Fanout

RabbitMQ_持久化_04

我们看一下,如何做到,一条消息被多个消费者消费

  • 这个模型的特点就是它在发送消息的时候,并没有指明Rounting Key , 或者说他指定了Routing Key,但是所有的消费者都知道,大家都能接收到消息,就像听广播

生产者

public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明exchange,指定类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        // 消息内容
        String message = "Hello everyone";
        // 发布消息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [生产者] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}

RabbitMQ_发送消息_05

对于生产者来说,他不在去声明队列了, 获取完Channel之后,直接去创建交换机,然后发送消息


消费者:

// 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

对于消费者,现在没有和它的队列直接绑定的生产者了,它要多做一件事,就是把自己的队列绑定到交换机上Exchange,当它们做完这件事之后,他们都会收到相应的消息

三 . 订阅模式--Direct定向

RabbitMQ_持久化_06

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

拥有不同的RoutingKey的消费者,会收到来自交换机的不同信息,而不是大家都使用同一个Routing Key ,和广播模型区分开来

生产者

// 声明exchange,指定类型为direct
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    // 发送消息,并且指定routing key 为:insert ,代表新增商品
    channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());

消费者,可以绑定多个RoutingKey收到不同的消息

// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

三 . 订阅模式--Topic通配


这个模式支持使用通配符

如: routingkey 为  api.*
那么 api.XXX 能匹配上,但是api.xxx.yyy 匹配不上
  • 区分开和#, #要比更自由
// 声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

// 发送消息,并且指定routing key 为:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());

消费者

// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");

二. 持久化

1. ACK机制

  • 上面的消费者的ACK机制可以有效的避免因为消费者端的出现异常而导致信息丢失

但是如果MQ挂了呢?

2. 持久化交换机

3. 持久化队列

4. 持久化消息


三. SpringAMQP

SpringAMQP帮我们实现了--生产者确认机制,对于不可路由的消息交换机会告诉生产者,使其重新发送

1. 环境搭建

坐标

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

生产者

rabbitmq:
    host: 192.168.43.150
    username: changwu..
    password: 2424zcw......
    virtual-host: /test
    template:  # 配置模板,给amqp template使用
      retry: # 发送失败,重试的信息
        enabled: true  # 开启失败重试
        initial-interval: 10000ms # 第一次重试的时间长
        max-interval: 30000ms   # 最长的重试时间间隔
        multiplier: 2       # 下次重试的时间的倍数,这里的2,说明,下次重试的时间,是上次的两倍
      exchange: ly.item.exchange # 交换机,这配置的是默认的交换机,rabbitmq发送往交换机发送消息,一会,不写交换机的话,默认创建这个交换机
    publisher-confirms: true # 开启生产者确认,消息发送失败会重试

生产者使用AmqpTemplate模板发送消息

try{     // 新增商品后, 发送消息, 路由键 消息体
    amqpTemplate.convertAndSend("item.update",1);
}catch (Exception e
){
    System.out.println("修改结束,发送消息失败=="+e);
}

消费端

他不需要AmqpTemplate模板发送消息,因此不配置

rabbitmq:
    host: 192.168.43.150
    username: changwu
    password: 2424zcw..
    virtual-host: /XXX

virtual-host,和当前用户绑定的虚拟主机名, 这就Oralce里面,不同限权的用户可以看到的界面,拥有的能力是不用的,在RabbitMQ中,用户只能看到和它相关的虚拟主机下面的信息

@Component
public class Listener {

 /**
     * 注意这里面的异常我门自己不处理,交给springmvc,这样,一旦有异常,触发ack 消息回滚
     * 监听 新增和修改(ES里,他俩是一个方法)
     *
     *  广播模型,消费者直接相关的是队列,
     *  1. 它要把自己绑定到队列上,
     *  2. 声明队列的名字,是否持久化
     *  3. 声明交换机的名字,类型
     *  4. 声明自己监听的路有键
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "search.item.insert.queue",durable = "ture"),
            exchange = @Exchange(name = "ly.item.exchange",type = ExchangeTypes.TOPIC),
            key = {"item.insert","item.update"}
    ))
    public void listenInsertOrUpdate(Long spuId){
        if(spuId==null){
            return;
        }
        // dosomething...
    }



标签:NAME,队列,RabbitMQ,交换机,消息,channel,消费者
From: https://blog.51cto.com/u_15311508/6923309

相关文章

  • RabbitMQ配置和实战
    RabbitMQ安装dockerrun-d--namexd_rabbit-eRABBITMQ_DEFAULT_USER=admin-eRABBITMQ_DEFAULT_PASS=password-p15672:15672-p5672:5672rabbitmq:3.8.15-management#网络安全组记得开放端口4369erlang发现口5672client端通信口15672管理界面ui端口25672......
  • RabbitMQ - MQ幂等、去重的解决方案
    1.场景描述消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件......
  • rabbitmq学习
    rabbitmq学习,rabbitmq教程,rabbitmq安装作用:1、削蜂,2、解耦3、异步处理核心概念:交换机、队列、信道 官网网址:https://www.rabbitmq.com/download.htmldocker安装:dockerrun-it--rm--namerabbitmq-p5672:5672-p15672:15672rabbitmq:3.12-management 默认用......
  • RabbitMQ实战入门
    一、概念RabbitMQ架构模型分为客户端和服务端两部分,客户端包括生产者和消费者,服务端包括虚拟主机、交换器和队列。两者通过连接和信道进行通信。整体的流程是生产者将消息发送到服务器,消费者从服务器中获取对应的消息。具体流程是生产者在发送消息前小确定发送给哪个虚拟机的哪个交......
  • RabbitMQ Exception (403) Reason: "no access to this vhost"
    可能原因:1)没有配置该用户的访问权限,可以通过rabbitmqctladd_vhostadmin来添加,并赋予权限:rabbitmqctlset_permissions-p用户名admin".""."".*"代码在连接的时候,必须制定对应的vhost,否则是没有访问权限:conn,err:=amqp.Dial("amqp://sky:password@ip:5672/admin”)2)M......
  • rabbitmq php 发送延迟消息 java 进行消费
      //-------延迟队列php需要安装好几个扩展 php生产者$connection=newAMQPStreamConnection('xxxxx',5672,'guest','guest');$channel=$connection->channel();//发送消息到交换机,并设置x-delay属性$messageData=['msg'=>json_e......
  • java 项目整合rabbitmq后内存飙高
    Java项目整合RabbitMQ后内存飙高的原因及解决办法在Java项目中使用RabbitMQ作为消息队列的时候,有时候会出现内存飙高的问题,特别是在消息量较大的情况下。本文将探讨这个问题的原因,并提供解决办法。问题分析当Java项目整合RabbitMQ后,内存飙高的原因通常是由于消息的生......
  • rabbitmq告警Error: health check failed. Message: resource alarm(s) in effect:[{r
     这个错误信息表示RabbitMQ的内存使用量超出了预设的阈值,导致触发了RabbitMQ的内存警告阈值报警。RabbitMQ在运行过程中会不断地将消息存储在内存中,如果内存使用量超出了预设的阈值,就会触发内存警告阈值报警。这个阈值的默认值是40%的物理内存,可以通过修改Rabbi......
  • RabbitMQ由于x-message-ttl设置与代码不一致导致declareQueue时IO异常
    具体异常:Causedby:com.rabbitmq.client.ShutdownSignalException:channelerror;protocolmethod:#method<channel.close>(reply-code=406,reply-text=PRECONDITION_FAILED-inequivalentarg'x-message-ttl'forqueue'queryResult'invho......
  • RabbitMQ(二)Exchange
    RabbitMQ(二)Exchange1RabbitMQ的运行机制AMQP中消息的路由过程和JMS存在一些差别,AMQP中增加了Exchange和Binding的角色生产者把消息发布到Exchange上,消息最终到达队列而被消费者接受,而Binding决定交换器的消息应该被发送到哪个队列​ Exchange分发消息的时候根据类型的不同......