首页 > 其他分享 >RabbitMQ基础知识

RabbitMQ基础知识

时间:2024-09-26 20:18:48浏览次数:3  
标签:交换器 队列 RabbitMQ 基础知识 交换机 消息 连接 路由

1.1 什么是MQ?

消息队列(Message Queue),是基础数据结构中 “先进先出” 的一种数据结构。
一般用来解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。

RabbitMQ可以理解为一个邮箱,或者一个邮局,或者是一个邮递员,保证 “张三” 的信件最终传递给 “李四”。

RabbitMQ与上述所描述的邮局(邮箱、邮递员)的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块消息。

1.2.MQ的应用场景

  • 跨系统间的调用
    比如说发送短息的功能,信息在自己应用中处理完成之后,调用第三方发送短信接口,假如第三方接口有问题或者延迟比较大,就会影响自己接口的响应时间,用户的体验就会很差,并且第三方也是不可控的,这时候就可以把发送短信的操作放到队列中执行,如果第一次发送消息失败,还可以重复执行, 这些操作对用户都是无感知的,避免因为第三方系统出现的问题导致自己系统出现问题
  • 系统内的异步调用
    比如发送评论后 异步进行更新排行榜的功能
    比如批量发送消息的功能,加入给100w用户发送消息,时间肯定会很长,改为异步在后台慢慢消费,用户就会无感知,并且很快的通知发送消息方已经发送完成了
  • 消息驱动的场景
    比如当满足一个条件以后,触发后面的一系列操作,这个时候用程序实现起来比较麻烦,这个时候使用消息队列来实现就会相当的简单
  • 跨语言之间的调用
    因为消息队列是和语言无关的,也不是函数之间的调用,而且消息队列也不要求生产端和消费端同时在线,所以很轻松的实现跨语言间的调用
1.3 MQ是怎么实现消息传递的?

  1. 生产者产生消息并把传输的数据(消息)放在队列中,用队列机制来实现消息传递。
  2. 消费者可以到指定的队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
1.4 MQ的几个主要特性

  • 解耦:一个业务需要多个模块共同实现,或一条消息有多个系统对应处理,只需要在主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
  • 异步:主业务执行结束后,从属业务通过MQ异步处理,减少业务的响应时间,提高用户体验。
  • 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
1.5.MQ的缺点

  • 系统可用性降低。依赖服务越多,服务越容易挂掉。需要考虑MQ瘫痪的情况。
  • 系统的复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。
  • 业务一致性。主业务和从属业务一致性的处理。
1.6RabbitMQ的相关术语术语

  • Producer生产者:用来发送消息到队列
  • Consumer消费者:从队列中取出消息消费掉
  • Queue存储消息的容器:它就是队列,它就是消息的载体,每个消息都会通过队列让消费者来消费
  • Channel消息通道:可以和rabbitmq建立一个链接,一个链接中也可以有多个的通道
  • Exchange交换机:决定交换机以什么规则发送到队列中
  • Routing Key:路由的关键字,交换机就是通过它来决定发送到哪个队列中,就是通过它来路由的
1.7RabbitMQ的工作模式(5种)
1.7.1简单工作模式


这里不需要交换机,应用场景:1对1的聊天

1.7.2 work工作模式


左边是一个生产者发送消息到队列中,多右边是多个消费者竞争消费消息,在高并发场景下,容易出现同一个消息被多个消费者消费的问题,需要注意(可以在业务上增加唯一的键值来避免)

应用场景:红包

1.7.3 订阅模式,每个队列的消息都是一样的


左边是一个生产者,把消息发送给交换机,交换机分别把消息发送到多个队列中,最后由消费者来消费

1.7.4 路由模式,根据routing key发送到不同的消息队列中(使用的是定向类型的交换机)


左边是一个生产者发送了一条消息,交换机根据发送的路由key,发送到相匹配的队列中,由消费者来消费

应用场景:日志(不同等级的日志有不同的方法,所以会放到不同的队列中,error走上吗的队列,info走下面的队列)

1.7.5 主题模式,根据routing key分类,发送到不同的消息队列中

主题模式使用的topic类型的交换机,和上面的路由模式类似,主要通过通配符和#来判断消息发送到哪个队列中,发送到哪个队列中,是通过和#来通过routing key 来决定的,其中#匹配一个或多个单词,*匹配一个单词

2. go使用RabbitMQ

安装RabbitMQ扩展包

go get github.com/streadway/amqp

首先编写demo,简单模式和工作模式

RabbitMq其本质是tcp链接,并且是基于内部通道进行的通信,所以一个完整的连接分为连接与创建通道两部分。

连接地址的格式是这种形式:

amqp://admin:[email protected]:5672/

services创建mq.go,实现封装

生产者流程

在 Golang 中创建 rabbitmq 生产者基本步骤是:

  1. 连接 Connection

  2. 创建 Channel

  3. 创建或连接一个交换器

  4. 创建或连接一个队列

  5. 交换器绑定队列

  6. 投递消息

  7. 关闭 Channel

  8. 关闭 Connection

创建连接

// connection
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

创建通道

// channel
channel, err := connection.Channel()

创建交换器

err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil)

参数依次说明:

  • name 交换机名称

  • kind 交换机类型

  • durable 持久化标识

  • autoDelete 是否自动删除

  • internal 是否是内置交换机

  • noWait 是否等待服务器确认

  • args 其它配置

参数说明要点:

autoDelete :

自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。

internal :

内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。

noWait

当 noWait 为 true 时,声明时无需等待服务器的确认。

该通道可能由于错误而关闭。 添加一个 NotifyClose 侦听器应对任何异常。创建交换器还有一个差不多的方法( ExchangeDeclarePassive ),他主要是假定交换已存在,并尝试连接到不存在的交换将导致 RabbitMQ 引发异常,可用于检测交换器的存在。

创建队列

q, err := channel.QueueDeclare("q1", true, false, false, true, nil)

参数说明:

  • name 队列名称

  • durable 持久化

  • autoDelete 自动删除

  • exclusive 排他

  • noWait 是否等待服务器确认

  • args Table 其它配置

参数说明要点:

exclusive 排他
排他队列只对首次创建它的连接可见,排他队列是基于连接( Connection )可见的,并且该连接内的所有信道( Channel)都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。

同一连接中不允许建立同名的排他队列的这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。

非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。

autoDelete 设置是否自动删除。为 true 则设置队列为自动删除。

自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。

不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。

创建队列还有一个差不多的方法( QueueDeclarePassive ),他主要是假定队列已存在,并尝试连接到不存在的队列将导致 RabbitMQ 引发异常,可用于检测队列的存在。

绑定交换器和队列

err = channel.QueueBind("q1", "q1Key", "e1", true, nil)

参数解析:

  • name 队列名称

  • key BindingKey 根据交换机类型来设定

  • exchange 交换机名称

  • noWait 是否等待服务器确认

  • args Table 其它配置

绑定交换器(可选)

err = channel.ExchangeBind("dest", "q1Key", "src", false, nil)

参数解析:

  • destination 目的交换器

  • key RoutingKey 路由键

  • source 源交换器

  • noWait 是否等待服务器确认

  • args Table 其它参数

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination ,井把消息转发到 destination 中,进而存储在 destination 绑定的队列 queue 中,某种程度上来说 destination 交换器可以看作一个队列。

投递消息

err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
    Timestamp:   time.Now(),
    DeliveryMode: amqp.Persistent, //Msg set as persistent
    ContentType: "text/plain",
    Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
})

参数解析:

  • exchange 交换器名称

  • key RouterKey 路由键

  • mandatory 是否为无法路由的消息进行返回处理

  • immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃

  • msg 消息体

参数说明要点:

mandatory

消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为 true 表示将消息返回到生产者,否则直接丢弃消息。

immediate

参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。 imrnediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ 3.0版本开始去掉了对 imrnediate 参数的支持。

其中 amqp.Publishing 的 DeliveryMode 如果设为 amqp.Persistent 则消息会持久化。需要注意的是如果需要消息持久化 Queue 也是需要设定为持久化才有效。

消费者流程

在 Golang 中创建 rabbitmq 消费者基本步骤是:

  1. 连接 Connection

  2. 创建 Channel

  3. 创建或连接一个交换器

  4. 创建或连接一个队列

  5. 交换器绑定队列

  6. 消费消息

  7. 关闭 Channel

  8. 关闭 Connection

消费者的步骤和生产者流程基本类似,只是将生产者流程中的投递消息变为消费消息。

标签:交换器,队列,RabbitMQ,基础知识,交换机,消息,连接,路由
From: https://blog.csdn.net/zwl153910/article/details/142577126

相关文章

  • 开发人员人工智能入门:揭秘基础知识部分
    开发者们大家好!人工智能不再只是一个梦想。它就在这里并改变我们构建软件的方式。它可以使应用程序更好、更有用。但如何开始在项目中使用人工智能呢?本系列旨在为您提供踏上人工智能开发之旅的基础知识。在第一部分中,我们将深入研究核心概念并提供使用langchain和openai的实践......
  • RabbitMq 入门应用 提升性能 : 算法多阶段并行 (Python)
    大问题:我们有一个算法,它可以被分为多个阶段进行(顺序不可颠倒),每个阶段的性能和资源要求不同(且不均衡程度比较高);假设我们现在可以堆资源(较多的CPU和内存),如何将算法各个步骤拆分并进行性能均衡和实现,使得算法性能最大化以满足生产要求?多进程:由于算法有严格的顺序要求,如果是......
  • RabbitMQ(兔子队列入门/消息队列)
    介绍(本笔记不涉及RabbitMQ的环境搭建,主要用于了解和上手使用RabbitMQ)RabbitMQ是一种消息队列,什么是消息队列?消息(Message):是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。**队列:**可以说是一个数据结构,可以存储数据,如下图,我们从右侧(队......
  • 【C++】C++基础知识
    C++基础1.指针1.1定义与使用指针在内存中占多少字节?指针在32位操作系统中占4个字节,在64位操作系统中占8个字节。定义指针的两种方式如下/***定义指针的两种形式*///1.inta=10;int*p;p=&a;//2.int*p2=&a;1.2空指针与野指针空指针空指针......
  • docker简介、安装、基础知识
    基础知识Docker简介:1.Docker是一种用于构建、发布及运行应用程序的开源项目,他通过容器化技术简化了应用程序的部署和管理2.Docker是一个开源的应用容器引擎,基于go语言开发,为应用打包、部署平台,而非单纯的虚拟化技术3.Docker类似于集装箱,各式各样的货物,经过集装箱的标准化进......
  • 【MySQL】基础知识Day1
    博客主页:小蜗系列专栏:MySQL参考教程:菜鸟教程/黑马关注博主,后期持续更新系列文章如果有错误请大家批评指出,我会及时修改感谢大家点赞......
  • RabbitMQ——消息的可靠性处理
    1.业务分析    在业务的开发中,我们通常将业务的非核心业务交给MQ来处理,比如支付,在支付过后,我们需要扣减余额,修改支付单状态,修改订单状态,发送短信提醒用户,给用户增加积分等等(可能真是场景并非这么简单,这里举个例子),在这套业务中,修改订单状态,发送短信提醒用户,给用户增加......
  • RabbitMQ通讯方式第二讲:Work Queues
    了解WorkQueues  1.1官网中的图片:通过官网里的图片,我们可以看到WordQueues与HelloWorld的区别,这里的消费者增加,但是时多个消费者消费单个队列,在这里我们依然要注意,这里面使用的是默认的交换机,并不是直接连接的队列。  1.2直观的图片:更好的理解每次的连接都是......
  • 【C++基础知识——迭代器 引入】
    问题引入#include<iostream>#include<map>#include<string>intmain(){//定义一个std::map容器std::map<std::string,int>ageMap;ageMap["Alice"]=30;ageMap["Bob"]=25;ageMap["Charlie&q......
  • 【入门岛·第1关】linux 基础知识
    目录闯关任务完成SSH连接与端口映射并运行hello_world.py闯关任务完成SSH连接与端口映射并运行hello_world.py1在远程主机上建立hello_python.py程序并运行,查看程序运行的端口:importsocketimportreimportgradioasgr#获取主机名defget_hostname():hostname......