首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2023-06-04 23:23:37浏览次数:48  
标签:pika exchange 队列 RabbitMQ queue connection channel

一、消息队列介绍

MQ的全称是Message Queue——消息队列。

MQ是一种应用程序对应用程序的通信方法。

MQ是消费者-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。

这样发布者和使用者都不用知道对方的存在。

生产者-消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,

消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

我们先不管消息(Message)这个词,来看看队列(Queue)。这一看,队列大家应该都熟悉吧。

队列是一种先进先出的数据结构。

image-20230331204902275

消息队列可以简单理解为:把要传输的数据放在队列中。

image-20230331204852737

二、为什么需要MQ

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦、异步消息、流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

接下来利用一个外卖系统的消息推送给大家解释下MQ的意义。

三、RabbitMQ

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。

使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。

而且两端可以使用不同的语言编写,大大提供了灵活性。

image-20230331204915497

简单模式

生产者:

import pika  # pika是python用于连接RabbitMQ的第三方包

# 先连接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建消息队列
channel.queue_declare(queue='hello')

# 简单发布内容
channel.basic_publish(exchange='',  # 简单模式,此处值为空
                      routing_key='hello',  # 指定使用的队列
                      body='Hello World!')  # 内容

print(" [x] Sent 'Hello World!'")

消费者:由于不确定是生产者还是消费者先启动,所以都需要创建消息队列。

import pika

# 消费者第一步也是先连接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建消息队列
channel.queue_declare(queue='hello')

# 回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 指定监听内容
channel.basic_consume(queue='hello',  # 指定队列
                      auto_ack=True,  # 默认应答
                      on_message_callback=callback)  # 指定回调函数


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

应答参数

以上使用的是默认应答(auto_ack=True)。

消费者到队列中取出数据时,无论消费者后续是否正确完成了数据处理,队列中的数据一旦被取走就会删除。

但这种方式是有问题的,如果消费者在处理数据时出错了,而队列中的数据又已经删除,就会造成数据丢失。


手动应答模式:

设置auto_ack=False则为手动应答模式,此时回调函数需要在逻辑处理完成之后,给队列发送信号,队列接受到信号后删除数据。

import pika

# 消费者第一步也是先连接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建消息队列
channel.queue_declare(queue='hello')

# 回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 给队列发送信号:数据已完成处理。

# 指定监听内容
channel.basic_consume(queue='hello',  # 指定队列
                      auto_ack=False,  # 默认应答
                      on_message_callback=callback)  # 指定回调函数


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

持久化参数

RabbitMQ默认是将数据保存在内存中的,系统重启数据就会丢失。

如果需要对数据进行持久化,可以如下处理:

  1. 创建消息队列时,指定durable=True,表示此队列中的数据支持持久化;
  2. 发布消息时,指定该消息需进行持久化。
#声明queue
channel.queue_declare(queue='hello2', durable=True)  # 若声明过,则换一个名字
 
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                          )
                      )

分发参数

如果有两个消费者同时监听一个队列。

其中一个线程sleep2秒,另一个消费者线程sleep1秒,无论数据处理速度差异有多大,它们处理的消息是一样多。

这种方式叫轮询分发(round-robin)。谁都不会多给消息,总是你一个我一个。


另一种分发方式叫公平分发(fair dispatch)

想要做到公平分发,即谁先处理完数据谁先获得下一个数据。

必须关闭自动应答ack,改成手动应答。

使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。

channel.basic_qos(prefetch_count=1)

交换机之发布订阅

实际开发中,往往存在多个消费者,此时可以使用交换机模式。

在交换机模式中,生产者与交换机通信;

每个消费者有对应的消息队列,该消息队列与交换机进行绑定;

交换机一接收到消息,将会往各个队列发送。

生产者:

import pika

# 连接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 生命一个名为logs、类型为fanout的交换机
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = "info: Hello World!"

channel.basic_publish(exchange='logs', # 指定声明的交换机
                      routing_key='', # 不用指定
                      body=message)

print(" [x] Sent %r" % message)
connection.close()

消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 生命一个名为logs、类型为fanout的交换机
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# 创建消息队列
result = channel.queue_declare("",exclusive=True)  # 不指定队列名称,系统将会自动生成唯一的名称
queue_name = result.method.queue  # 取得消息队列的名称

# 将指定队列绑定到交换机上
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()

交换机之关键字模式

上面发布订阅模式中,没有指定关键字,即交换机每一次获取消息,都会给每一个绑定它的队列传送。

交换机提供了一个关键字模式,队列在绑定交换机时,可以指定一个关键字,

而生产者往交换机传送数据时,也需要给每一条数据指定一个关键字,

只有关键字与队列匹配,该消息才会被传送。

# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 注意这里的exchange_type参数,
channel.exchange_declare(exchange='logs2',
                         exchange_type='direct')

message = "info: Hello Yuan!"
channel.basic_publish(exchange='logs2',
                      routing_key='info',  # 指定此次数据的关键字
                      body=message)

print(" [x] Sent %r" % message)
connection.close()

消费者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs2',
                         exchange_type='direct')

result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue


severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='logs2',
                       queue=queue_name,
                       routing_key=severity)  # 指定关键字

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()

交换机之通配符模式

通配符交换机与之前的路由模式相比,它将信息的传输类型的key更加细化,以“key1.key2.keyN….”的模式来指定信息传输的key的大类型和大类型下面的小类型,让消费者可以更加精细的确认自己想要获取的信息类型。

而在消费者一端,不用精确指定具体到哪一个大类型下的小类型的key,而是可以使用类似正则表达式(但与正则表达式规则完全不同)的通配符在指定一定范围或符合某一个字符串匹配规则的key,来获取想要的信息。

“通配符交换机”(Topic Exchange)将路由键和某模式进行匹配。

此时队列需要绑定在一个模式上。

符号“#”匹配一个或多个词,符号“*”仅匹配一个词。

因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。

(这里与我们一般的正则表达式的“*”和“#”刚好相反,这里我们需要注意一下。)

image-20230331204828981

生产者:

# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs3',
                         exchange_type='topic')

message = "info: Hello ERU!"
channel.basic_publish(exchange='logs3',
                      routing_key='europe.weather',
                      body=message)

print(" [x] Sent %r" % message)
connection.close()

消费者:

# 消费者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs3',
                         exchange_type='topic')

result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue


channel.queue_bind(exchange='logs3',
                   queue=queue_name,
                   routing_key="#.news")

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()

标签:pika,exchange,队列,RabbitMQ,queue,connection,channel
From: https://www.cnblogs.com/zibuyu2015831/p/17456685.html

相关文章

  • RabbitMQ
    https://blog.csdn.net/qq_35387940/article/details/100514134RabbitMQ概简介大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,知道接收者取回它。Producer:消息生产者,负责生产和发送消......
  • php rabbitmq队列的几种管理方案
     这里就懒得记录了,直接放上一篇还不错的知乎博主的博客吧。点击前往  ......
  • Rabbitmq在linux服务器的安装步骤
    Linux系统:CentOS7.x(如果是CentOS8.x的话,需要修改下面两个环境版本号中的el7为el8)Erlang:erlang-22.3.4.12-1.el7.x86_64.rpmRabbitMQ:rabbitmq-server-3.8.13-1.el7.noarch.rpm 1安装erlang Linux系统:CentOS7.x(如果是CentOS8.x的话,需要修改下面两个环境版本号中的el7为e......
  • springboot项目rabbitmq消费者消费json格式的String,出现无限循环抛出No method found
    转:springboot项目rabbitmq消费者消费json格式的String,出现无限循环抛出Nomethodfoundforclass[B     ......
  • 深入学习RabbitMQ五种模式(一)
    1.安装erlang下载otp_win64_25.3.exehttps://www.erlang.org/downloadserlang安装完成,需要配置erlang环境变量ERLANG_HOME=E:\software\ErlangOTPPATH=%PATH%;%ERLANG_HOME%\bin;2.安装RabbitMQ下载rabbitmq-server-3.11.13.exehttps://www.rabbitmq.com/download.html进入安装......
  • windows10环境下安装RabbitMQ以及延时插件(图文)
    安装转载:https://www.cnblogs.com/saryli/p/9729591.html插件转载:https://blog.csdn.net/nbdclw/article/details/107441772安装及配置环境第一步:下载并安装erlang原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装RabbitMQ的前提是安装Erlang。下载地址:http://ww......
  • RabbitMq镜像策略模式
    镜像策略ha-modeha-params说明exactlycount集群中队列副本的数量(主队列加上镜像)。count值为1表示一个副本:只有主节点。如果主节点不可用,则其行为取决于队列是否持久化。count值为2表示两个副本:一个队列主队列和一个队列镜像。换句话说......
  • RabbitMQ - 使用amqp库连接RabbitMQ(实例使用)
    1、发送端步骤分解如下:(1)建立连接conn,err:=amqp.Dial("amqp://admin:[email protected]:5672/")(2)打开channel这里的channel是AMQP里的概念,可以理解为多路复用的一个tcp长连接。(3)声明一个队列q,err:=ch.QueueDeclare(...)(4)创建消息msg:=amqp.Publishing{...}(5)发布......
  • RabbitMQ在Windows下设置服务启动
    1.管理员模式运行  cmd 2.进入RabbitMQ安装目录下的sbin目录   输入命令: cdrabbitMQ的sbin路径,进入sbin目录输入命令:rabbitmq-service.batinstall进入服务,开启rabbitMQ服务 ......
  • go-RabbitMQ
    erlang安装编译依赖:yuminstallmakegccgcc-c++build-essentialopensslopenssl-develunixODBCunixODBC-develkernel-develm4ncurses-devel解压:tar-zxvf创建存放环境目录:mkdir/opt/rabbitMq/erlang进入erlang解压目录执行命令:./configure--prefix=/opt/rabbit......