首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2023-01-01 13:55:07浏览次数:59  
标签:队列 rabbitmq 交换机 消息 rabbit RabbitMQ 节点

介绍#

AMQP和JMS消息服务

  • 什么是JMS: Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口
    • JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API
    • 是由Sun公司早期提出的消息标准,旨在为java应用提供统一的消息操作,包括create、send、receive
    • JMS是针对java的,那微软开发了NMS(.NET消息传递服务)

特性

  • 面向Java平台的标准消息传递API
  • 在Java或JVM语言比如Scala、Groovy中具有互用性
  • 无需担心底层协议
  • 有queues和topics两种消息传递模型
  • 支持事务、能够定义消息格式(消息头、属性和内容)

常见概念

  • JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
  • JMS生产者(Message Producer):生产消息的服务
  • JMS消费者(Message Consumer):消费消息的服务
  • JMS消息:数据对象
  • JMS队列:存储待消费消息的区域
  • JMS主题:一种支持发送消息给多个订阅者的机制
  • JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)

mq需要的类

  • ConnectionFactory :连接工厂,JMS 用它创建连接
  • Connection :JMS 客户端到JMS Provider 的连接
  • Session: 一个发送或接收消息的线程
  • Destination :消息的目的地;消息发送给谁.
  • MessageConsumer / MessageProducer: 消息消费者,消息生产者

RabbitMQ消息队列和核⼼概念#

RabbitMQ:http://www.rabbitmq.com/

  • 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错,与SpringAMQP完美的整合、API丰富易用
  • 文档:https://www.rabbitmq.com/getstarted.html

核心概念, 了解了这些概念,是使用好RabbitMQ的基础

image-20220909071135102

  • Broker
    • RabbitMQ的服务端程序,可以认为一个mq节点就是一个broker
  • Producer生产者
    • 创建消息Message,然后发布到RabbitMQ中
  • Consumer消费者:
    • 消费队列里面的消息
  • Message 消息
    • 生产消费的内容,有消息头和消息体,也包括多个属性配置,比如routingKey路由键
  • Queue 队列
    • 是RabbitMQ 的内部对象,用于存储消息,消息都只能存储在队列中
  • Channel 信道
    • 一条支持多路复用的通道,独立的双向数据流通道,可以发布、订阅、接收消息。
    • 信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道
  • Connection连接
    • 是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑,一个连接上可以有多个channel进行通信
  • Exchange 交换器
    • 生产者将消息发送到 Exchange,交换器将消息路由到一个或者多个队列中,里面有多个类型,后续再一一介绍,队列和交换机是多对多的关系。
  • RoutingKey 路由键
    • 生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则
    • 最大长度255 字节
  • Binding 绑定
    • 通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 ( BindingKey ),这样 RabbitMQ 就知道如何正确地将消息路由到队列了
生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和 RoutingKey相匹配时,消息会被路由到对应的队列中
  • Virtual host 虚拟主机
    • 用于不同业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个VirtualHost 里面不能有相同名称的Exchange或Queue
    • 默认是 /
      • /dev
      • /test
      • /pro

Docker安装RabbitMQ#

#拉取镜像
docker pull rabbitmq:management

docker run -d --hostname rabbit_host1 --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

#介绍
-d 以守护进程方式在后台运行
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--name:指定容器名
--hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中

-e 参数
  RABBITMQ_DEFAULT_USER 用户名
  RABBITMQ_DEFAULT_PASS 密码

主要端口介绍

4369 erlang 发现口

5672 client 端通信口

15672 管理界面 ui 端口

25672 server 间内部通信口

访问管理界面

ip:15672

注意事项!!!!

- Linux服务器检查防火墙是否关闭
- 云服务器检查网络安全组是否开放端口

CentOS 7 以上默认使用的是firewall作为防火墙
查看防火墙状态
firewall-cmd --state
停止firewall
systemctl stop firewalld.service
禁止firewall开机启动
systemctl disable firewalld.service

管控台介绍#

image-20220909073520620

image-20220909073723103

队列与交换机概念#

队列#

简单队列#

Producer--->Queue --->Consumer
生产者发送消息到队列,监听该队列的消费者取出消息

image-20220909080907274

队列轮询#

image-20220911145822286

  • ​ 理解:
				           |--->Consumer(msg*10)
Producer(msg*30)--->Queue --(---->Consumer(msg*10)
				           |--->Consumer(msg*10)
生产者发送大量消息到队列,多个消费者监听该队列,均量取出消息			   
				   

队列公平#

  • ​ 理解:
轮询策略的优化,当多个消费者的处理数据能力不均时,轮询策略会导致资源调用不合理,比如A消费者5msg/秒,B消费者10msg/s。

此时如果投放30条msg,B:15msg1.5秒处理完,A:15msg3秒处理完。

应优化为,B:20msg2秒处理完,A:10msg2秒处理完。

交换机#

image-20220911150546783

  • RabbitMQ的Exchange 交换机
    • 生产者将消息发送到 Exchange,交换器将消息路由到一个或者多个队列中,交换机有多个类型,队列和交换机是多对多的关系。
    • 交换机只负责转发消息,不具备存储消息的能力,如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失
    • RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的基本不用
  • 交换机类型
    • Direct Exchange 定向
      • 将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
      • 例子:如果一个队列绑定到该交换机上要求路由键 “aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb
      • 处理路由健
    • Fanout Exchange 广播
      • 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息
      • Fanout交换机转发消息是最快的,用于发布订阅,广播形式,中文是扇形
      • 不处理路由健
    • Topic Exchange 通配符
      • 主题交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
      • 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上
      • 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词
      • 例子:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。
    • Headers Exchanges(少用)
      • 根据发送的消息内容中的headers属性进行匹配, 在绑定Queue与Exchange时指定一组键值对
      • 当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;
      • 如果完全匹配则消息会路由到该队列,否则不会路由到该队列
      • 不处理路由键

三种模式介绍#

发布订阅(FANOUT)

  • 什么是rabbitmq的发布订阅模式
    • 发布-订阅模型中,消息生产者不再是直接面对queue(队列名称),而是直面exchange,都需要经过exchange来进行消息的发送, 所有发往同一个fanout交换机的消息都会被所有监听这个交换机的消费者接收到
    • 发布订阅-消息模型引入fanout交换机
    • 文档:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
  • 发布订阅模型应用场景
    • 微信公众号
    • 新浪微博关注

image-20220911150913025

  • rabbitmq发布订阅模型
    • 通过把消息发送给交换机,交互机转发给对应绑定的队列
    • 交换机绑定的队列是排它独占队列,自动删除

image-20220911150922160

路由模式(Direct)

  • 什么是rabbitmq的路由模式
    • 文档:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
    • 交换机类型是Direct
    • 队列和交换机绑定,需要指定一个路由key( 也叫Bingding Key)
    • 消息生产者发送消息给交换机,需要指定routingKey
    • 交换机根据消息的路由key,转发给对应的队列

image-20220911151301231

  • 例子:日志采集系统 ELK
    • 一个队列收集error信息-》告警
    • 一个队列收集全部信息-》日常使用

主题模式(Topic)

  • 背景:

    • 如果业务很多路由key,怎么维护??
    • topic交换机,支持通配符匹配模式,更加强大
    • 工作基本都是用这个topic模式
  • 什么是rabbitmq的主题模式

    • 文档 https://www.rabbitmq.com/tutorials/tutorial-five-java.html
    • 交换机是 topic, 可以实现发布订阅模式fanout和路由模式Direct 的功能,更加灵活,支持模式匹配,通配符等
    • 交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词,一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order, 而 *.order ,只会匹配 info.order, 之间是使用. 点进行分割多个词的; 如果是 ., 则info.order、error.order都会匹配
    • 注意
      • 交换机和队列绑定时用的binding使用通配符的路由健
      • 生产者发送消息时需要使用具体的路由健

    image-20220911151331597

  • 测试,下面的匹配规则是怎样的

quick.orange.rabbit 只会匹配  *.orange.* 和 *.*.rabbit ,进到Q1和Q2
lazy.orange.elephant 只会匹配 *.orange.* 和 lazy.#,进到Q1和Q2
quick.orange.fox 只会匹配 *.orange.*,进入Q1
lazy.brown.fox 只会匹配azy.#,进入Q2
lazy.pink.rabbit 只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次)

quick.brown.fox 没有匹配,默认会被丢弃,可以通过回调监听二次处理

lazy.orange.male.rabbit,只会匹配 lazy.#,进入Q2

三种模式总结#

FANOUT

image-20220911150913025

Producer--->Queue --(---->Consumer(msg*10)
                    (---->Consumer(msg*10)
生产者发送消息到交换机中,绑定了该叫交换机的所有队列能获取相同的信息

Direct

image-20220911151301231

         |->Queue(key(D))---->Consumer(A)
         |  
Producer--->Queue(key{A,B,C}) ------>Consumer(B)
↓
msg(key{A})
msg(key{B})
msg(key{C})
msg(key{D})



生产者发送4条信息附带4个key到交换机中
每一个队列都设置routingKey,每个队列只能获取自己对应routingKey的那条信息
如上逻辑:
Consumer(A)输出:msg(key{D})
Consumer(B)输出:msg(key{A})  msg(key{B})   msg(key{C})

Topic

image-20220911151331597

逻辑对比Direct新增了通配符,* 代表一个词,#代表1个或多个词,案例看下方

quick.orange.rabbit 只会匹配  *.orange.* 和 *.*.rabbit ,进到Q1和Q2
lazy.orange.elephant 只会匹配 *.orange.* 和 lazy.#,进到Q1和Q2
quick.orange.fox 只会匹配 *.orange.*,进入Q1
lazy.brown.fox 只会匹配azy.#,进入Q2
lazy.pink.rabbit 只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次)

quick.brown.fox 没有匹配,默认会被丢弃,可以通过回调监听二次处理

lazy.orange.male.rabbit,只会匹配 lazy.#,进入Q2
         |->Queue(key(*.orange.*))---->Consumer(A)
         |  
Producer--->Queue(key(*.*.rabbit,lazy.#))------>Consumer(B)
↓
msgA(key{quick.orange.rabbit})
msgB(key{lazy.brown.fox})



生产者发送4条信息附带4个key到交换机中
每一个队列都设置routingKey,每个队列只能获取自己对应routingKey的那条信息
如上逻辑:
Consumer(A)输出:msgA
Consumer(B)输出:msgA,msgB

整合AMQP#

  • 什么是Spring-AMQP

    • 官网:https://spring.io/projects/spring-amqp
    • Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等
    • 提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发
    • 总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter,
    • 我们使用 spring-boot-starter-amqp 进行开发
  • 注意: 有些包maven下载慢,等待下载如果失败

    • 删除本地仓库spring相关的包,重新执行 mvn install
    • 建议先使用默认的maven仓库,不用更换地址
    • 基于当前项目仓库地址修改
 <!--引入AMQP-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 代码库 -->
    <repositories>
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

yml配置文件修改

#消息队列
spring:
  rabbitmq:
    host: 101.33.219.225
    port: 5672
    virtual-host: /dev
    password: password
    username: admin

RabbitMQConfig文件

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "order_exchange";

    public static final String QUEUE = "order_queue";

    /**
     * topic 交换机
     * @return
     */
    @Bean
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public Queue orderQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 交换机和队列绑定关系
     */
    @Bean
    public Binding orderBinding(Queue queue, Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

消息消费者

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {

    /**
     * RabbitHandler 会自动匹配 消息类型(消息自动确认)
     * @param msg
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("监听到消息:消息内容:"+message.getBody());
    }
}

消息生产者-测试类

@SpringBootTest
class DemoApplicationTests {
    
  @Autowired
  private RabbitTemplate template;
    
  @Test
  void send() {
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1");
  }
}

可靠性投递#

  • 什么是消息的可靠性投递
    • 保证消息百分百发送到消息队列中去
    • 详细
      • 保证mq节点成功接受消息
      • 消息发送端需要接受到mq服务端接受到消息的确认应答
  • RabbitMQ消息投递路径
    • 生产者-->交换机->队列->消费者
    • 通过两个的点控制消息的可靠性投递
      • 生产者到交换机
        • 通过confirmCallback
      • 交换机到队列
        • 通过returnCallback
  • 建议
    • 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制

confirmCallback(生产者到交换机)#

  • 通过confirmCallback
  • 生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心
#消息队列
spring:
  rabbitmq:
    host: 101.33.219.225
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    #开启二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated
    @Test
    void testConfirmCallback() {
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *
             * @param correlationData 配置
             * @param ack 交换机是否收到消息,true是成功,false是失败
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback==============>");
                System.out.println("correlationData==============>correlationData=" + correlationData);
                System.out.println("ack==================>ack=" + ack);
                System.out.println("cause==============>cause=" + cause);

                if (ack) {
                    System.out.println("发送成功");
                } else {
                    System.out.println("发送失败,记录到日志或者数据库");
                }

            }
        });

        template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME + "ccl", "order.new", "新订单");
    }
发送失败,记录到日志或者数据库

returnCallback(交换机到队列)#

  • 通过returnCallback
  • 消息从交换器发送到对应队列失败时触发
  • 两种模式
    • 交换机到队列不成功,则丢弃消息(默认)
    • 交换机到队列不成功,返回给消息生产者,触发returnCallback
#消息队列
spring:
  rabbitmq:
    host: 101.33.219.225
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    #开启二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated

    #开启二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #true:如交换机处理消息到路由失败,则返回给生产者
    template:
      mandatory: true
  @Test
  void testReturnCallback() {
    //为true,则交换机处理消息到路由失败,则会返回给生产者
    //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
      @Override
      public void returnedMessage(ReturnedMessage returned) {
        int code = returned.getReplyCode();
        System.out.println("code="+code);
        System.out.println("returned="+returned.toString());
      }
    });
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"xxx.order.new","新订单来啦11");
  }
code = 312
returned.toString() = ReturnedMessage [message=(Body:'新订单ReturnCallback' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=order_exchange, routingKey=xdclass.order.new]

ACK#

  • 背景:消费者从broker中监听消息,需要确保消息被合理处理

image-20220911161423207

  • RabbitMQ的ACK介绍
    • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
    • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
    • 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
    • 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked
  • 确认方式
    • 自动确认(默认)
    • 手动确认 manual
#消息队列
spring:
  rabbitmq:
    host: 101.33.219.225
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    #开启二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated

    #开启二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #true:如交换机处理消息到路由失败,则返回给生产者
    template:
      mandatory: true

    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual

ACK + DeliveryTag

  • deliveryTag介绍
  • 表示消息投递序号,每次消费消息或者消息重新投递后, deliveryTag都会增加
  • basicNack和basicReject介绍
    • basicReject一次只能拒绝接收一个消息,可以设置是否requeue。
    • basicNack方法可以支持一次0个或多个消息的拒收,可以设置是否requeue。
   @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("body="+body);

        //告诉broker,消息已经被确认
//        channel.basicAck(msgTag,false);

        //告诉broker,消息拒绝确认
        channel.basicNack(msgTag,false,true);

        channel.basicReject(msgTag,true);
    }

TTL死信+延迟队列#

TTL#

TTL介绍#

  • time to live 消息存活时间

  • 如果消息在存活时间内未被消费,则会别清除

  • RabbitMQ支持两种ttl设置

    • 单独消息进行配置ttl
    • 整个队列进行配置ttl(居多)
  • 什么是rabbitmq的死信队列

    • 没有被及时消费的消息存放的队列
  • 什么是rabbitmq的死信交换机

    • Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机。

image-20220911165613343

  • 消息有哪几种情况成为死信
    • 消费者拒收消息(basic.reject/ basic.nack),并且没有重新入队 requeue=false
    • 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
    • 队列的消息长度达到极限
    • 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

image-20220911165636472

管控台消息TTL测试#

  • 队列过期时间使用参数,对整个队列消息统一过期

    • x-message-ttl
    • 单位ms(毫秒)
  • 消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,已经还在队列里面)

    • expiration
    • 单位ms(毫秒)
  • 两者都配置的话,时间短的先触发

  • RabbitMQ Web控制台测试

    • 新建死信交换机(和普通没区别)

    image-20220911165734381

    • 新建死信队列 (和普通没区别)

    • 死信交换机和队列绑定

      image-20220911170045765

    • 新建普通队列,设置过期时间、指定死信交换机

      image-20220911165921951

    • 测试:直接web控制台往product_qeueu发送消息即可

延迟队列介绍和应用场景#

  • 什么是延迟队列

    • 一种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息s
  • 使用场景

    • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
    • 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;
    • 消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略

高可用集群#

普通集群#

默认的集群模式, 比如有节点 node1和node2、node3,三个节点是普通集群,但是他们仅有相同的元数据,即交换机、队列的结构;

案例:
消息只存在其中的一个节点里面,假如消息A,存储在node1节点,
消费者连接node1个节点消费消息时,可以直接取出来;

但如果 消费者是连接的是其他节点
那rabbitmq会把 queue 中的消息从存储它的节点中取出,并经过连接节点转发后再发送给消费者

问题:
假如node1故障,那node2无法获取node1存储未被消费的消息;
如果node1持久化后故障,那需要等node1恢复后才可以正常消费
如果ndoe1没做持久化后故障,那消息将会丢失

这个情况无法实现高可用性,且节点间会增加通讯获取消息,性能存在瓶颈

项目中springboot+amqp里面需要写多个节点的配置,比如下面

spring.rabbitmq.addresses = 192.168.1.1:5672,192.168.1.2:5672,192.168.1.3:5672

该模式更适合于消息无需持久化的场景,如日志传输的队列
  • 注意:集群需要保证各个节点有相同的token令牌
erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信

image-20220911184213610

image-20220911184219807

集群搭建#

  • 清理单机和网络开发
    • 关闭原先的单节点
    • 阿里网络安全组开放对应的端口
    • 防火墙一定要关闭
  • 准备3个节点安装好rabbitmq,形成集群 (记得每个节点间隔几十秒再启动,如果失败删除宿主机文件重新搭建)
#节点一,主节点,创建-v映射目录
docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168  -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management

#节点二,创建-v映射目录
docker run -d --hostname rabbit_host2 --name rabbitmq2  -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/2/log:/var/log/rabbitmq rabbitmq:management

#节点三,创建-v映射目录
docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/3/log:/var/log/rabbitmq rabbitmq:management
  • 参数说明
--hostname 自定义Docker容器的 hostname

--link 容器之间连接,link不可或缺,使得三个容器能互相通信

--privileged=true 使用该参数,container内的root拥有真正的root权限,否则容器出现permission denied

-v 宿主机和容器路径映射

参数 RABBITMQ_NODENAME,缺省 Unix*: rabbit@$HOSTNAME
参数 RABBITMQ_DEFAULT_USER=admin
参数 RABBITMQ_DEFAULT_PASS=xdclass.net168

Erlang Cookie 值必须相同,也就是一个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同, 相当于不同节点之间通讯的密钥,erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信

配置集群#

节点一配置集群
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

节点二加入集群,--ram是以内存方式加入,忽略该参数默认为磁盘节点。
docker exec -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
rabbitmqctl start_app
exit

节点三加入集群,--ram是以内存方式加入,忽略该参数默认为磁盘节点。
docker exec -it rabbitmq3 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
rabbitmqctl start_app
exit

#查看集群节点状态,配置启动了3个节点,1个磁盘节点和2个内存节点

rabbitmqctl cluster_status

配置文件#

#配置文件修改
#消息队列
spring:
  rabbitmq:
    addresses: 101.33.219.225:5672,101.33.219.225:5673,101.33.219.225:5674
    virtual-host: /dev
    password: xdclass.net168
    username: admin
    #开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated


    #开启消息二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true

    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual

mirror镜像集群#

队列做成镜像队列,让各队列存在于多个节点中
和普通集群比较大的区别就是【队列queue的消息message 】会在集群各节点之间同步,且并不是在 consumer 获取数据时临时拉取,而普通集群则是临时从存储的节点里面拉取对应的数据

结论:
实现了高可用性,部分节点挂掉后,不影响正常的消费
可以保证100%消息不丢失,推荐3个奇数节点,结合LVS+Keepalive进行IP漂移,防止单点故障

缺点:由于镜像队列模式下,消息数量过多,大量的消息同步也会加大网络带宽开销,适合高可用要求比较高的项目
过多节点的话,性能则更加受影响
  • 注意:集群需要保证各个节点有相同的token令牌
erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信

策略policy介绍#

rabbitmq的策略policy是用来控制和修改集群的vhost队列和Exchange复制行为
就是要设置哪些Exchange或者queue的数据需要复制、同步,以及如何复制同步
  • 创建一个策略来匹配队列
    • 路径:rabbitmq管理页面 —> Admin —> Policies —> Add / update a policy
    • 参数: 策略会同步同一个VirtualHost中的交换器和队列数据
      • name:自定义策略名称
      • Pattern:^ 匹配符,代表匹配所有
      • Definition:ha-mode=all 为匹配类型,分为3种模式:all(表示所有的queue)
ha-mode: 指明镜像队列的模式,可选下面的其中一个
  all:表示在集群中所有的节点上进行镜像同步(一般都用这个参数)
  exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定
  nodes:表示在指定的节点上进行镜像同步,节点名称通过ha-params指定
  
ha-sync-mode:镜像消息同步方式 automatic(自动),manually(手动)

image-20220911184835084

  • 配置好后,+2的意思是有三个节点,一个节点本身和两个镜像节点, 且可以看到策略名称 xdclass_mirror

image-20220911184858971

  • 集群重启顺序
    • 集群重启的顺序是固定的,并且是相反的
    • 启动顺序:磁盘节点 => 内存节点
    • 关闭顺序:内存节点 => 磁盘节点
    • 最后关闭必须是磁盘节点,否则容易造成集群启动失败、数据丢失等异常情况

配置文件#

#消息队列
spring:
  rabbitmq:
    addresses: 101.33.219.225:5672,101.33.219.225:5673,101.33.219.225:5674
    virtual-host: /dev
    password: xdclass.net168
    username: admin
    #开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated


    #开启消息二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true

    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual

标签:队列,rabbitmq,交换机,消息,rabbit,RabbitMQ,节点
From: https://www.cnblogs.com/LoveShare/p/17018023.html

相关文章

  • RabbitMQ从入门到精通-消息应答
    一、概念消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消......
  • RabbitMQ脑裂处理
    脑裂现象:NetworkpartitiondetectedMnesiareportsthatthisRabbitMQclusterhasexperiencedanetworkpartition.Thereisariskoflosingdata.Pleaseread......
  • RabbitMQ在centos中docker中的下载,安装,使用。RabbitMQ的基础讲解(全面详细)
    一、消息介绍二、rabbitMQ介绍RabbitMQ简介:RabbitMQ是一个由erlang开发的AMQP(AdvanvedMessageQueueProtocol)的开源实现。核心概念Message消息消息是不具名的,它由消息......
  • 什么是Rabbitmq消息队列? (安装Rabbitmq,通过Rabbitmq实现RPC全面了解,从入门到精通)
    目录Rabbitmq一:消息队列介绍1.介绍2.MQ解决了什么问题1.应用的解耦2.流量削峰3.消息分发(发布订阅:观察者模式)4.异步消息(celery就是对消息队列的封装)3.常见消息队列......
  • RabbitMQ学习笔记02:Hello World!
    参考资料:RabbitMQtutorial-"Helloworld!"—RabbitMQ   前言RabbitMQ是一个中间人,它接受和转发消息。我们可以把它想象成一个邮局:当你把邮件投入邮箱的时候,你......
  • RabbitMQ
    简介安装配置下文安装是在虚拟机中,操作系统为Centos8erlang参考自官网https://www.rabbitmq.com/download.html注意erlang版本匹配有多种安装源,这里使用匹配rabbitm......
  • rabbitmq的安装记录
    安装Erlang,Socat,RabbitMQ###顺序不能变rpm-ivherlang-21.3.8.9-1.el7.x86_64.rpmrpm-ivhsocat-1.7.3.2-1.el6.lux.x86_64.rpmrpm-ivhrabbitmq-server-3.8.1-1......
  • RabbitMQ从入门到精通-工作队列-Work Queues
         工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队......
  • Docker下RabbitMQ下载插件并且安装
    1.先去github下载到对应插件:​​延迟信息插件​​如需其它请访问:​​rabbitmq其它插件下载​​2.自己启动的rabbitmq容器号是多少dockerps或者dockerps-aqf"name=rabb......
  • RabbitMQ
                         ......