首页 > 编程语言 >RabbitMQ(python)

RabbitMQ(python)

时间:2024-05-27 11:32:14浏览次数:31  
标签:pika exchange python RabbitMQ queue 队列 connection channel

 一、认识MQ

MQ全称为Message Queue 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。这样发布者和使用者都不用知道对方的存在。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

 如下图所示:

二、为什么要MQ 

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。

接下来利用一个外卖系统的消息推送给大家解释下MQ的意义。

三、RabbitMQ 

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

四、RabbitMQ的安装

【Windows安装RabbitMQ详细教程】_rabbitmq windows-CSDN博客

五、RabbitMQ的工作模型

5.1简单模式

#生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#创建队列
channel.queue_declare(queue='hello')
#往队列插入数据
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

print(" [x] Sent 'Hello World!'")

# 消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')


#回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

#确定监听队列
channel.basic_consume(queue='hello',
                      auto_ack=True,#默认应答
                      on_message_callback=callback)


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

5.2RabbitMQ参数

应答参数

#生产者代码不变

#消费者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')


#回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag) #添加这一句

#确定监听队列
channel.basic_consume(queue='hello',
                      auto_ack=False,#改为False,手动应答
                      on_message_callback=callback)


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

默认应答的意思是当生产者向消费者发送数据的时候,假如消费者在处理数据的过程中出现了一些错误,这时候生产者队列中的数据不会进行保留,当再次运行消费者的时候,数据已经丢失。手动应答就是在处理数据过程中,如果出现了错误,生产者队列的数据不会丢失,还保留在那,直到消费者向生产者传递了信号ch.basic_ack(delivery_tag=method.delivery_tag),这时候,生产者中的数据才会删除。

分发参数

有两个消费者同时监听一个的队列。其中一个线程sleep2秒,另一个消费者线程sleep1秒,但是处理的消息是一样多。这种方式叫轮询分发不管谁忙,都不会多给消息,总是你一个我一个。想要做到公平分发(根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;),必须关闭自动应答ack,改成手动应答。使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。

### 消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello',durable=True)


#回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

#公平分发
channel.basic_qos(prefetch_count=1)

#确定监听队列
channel.basic_consume(queue='hello',
                      auto_ack=False,#手动应答
                      on_message_callback=callback)


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

持久化参数

持久化的意思就是把数据写入磁盘;举个例子:当生产者往队列里面插入数据的时候i,此时RabbitMQ突然崩了,而消费者一开始没有去监听这个队列,当它崩掉的时候,消费者此时去监听这个队列,这个队列是没有信息了的,因为RabbitMQ往队列插入的数据是写到内存中的,而它已经崩掉了,那就意味着数据丢失了。要解决这个问题就需要把数据写入磁盘。

#只对生产者代码进行修改,这里注意,如果之前声明过一个队列,此时就不能再次把它重新定义为持久化队列
#持久化参数解决的是RabbitMQ在运行过程中突然宕机导致数据丢失的问题

#声明queue
channel.queue_declare(queue='hello2', durable=True)  # 若声hello2明过,则换一个名字
 
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                          )
                      )

5.3交换机模式

发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

#声明交换机   名字为logs  模式为发布订阅(fanout)
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = "info: Hello World!"
#向logs交换机插入数据
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

# 消费者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

#声明交换机   名字为logs  模式为发布订阅(fanout)
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

#声明队列,名字是系统随机的,
result = channel.queue_declare("",exclusive=True)
#获取队列名字
queue_name = result.method.queue

#将指定队列绑定到logs交换机上
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()

连续启动同一份代码可能会报错,解决如下:

关键字模式

关键字模式的意思就是消费者通过绑定关键字来确定消息是否是自己需要的。之前的发布订阅模式是生产者往交换机中插入数据,而交换机则把数据插入到消费者生产的队列中,无论消息是否是消费者需要的,只要有数据都会往里面插入。而通过关键字绑定就能避开这个问题。一个消费者可以绑定多个关键字。而生产者在发送消息的时候需要指定消息类型,即routing_key。代码如下:

# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()


#声明交换机   名字为logs2  模式为关键字模式(direct)
channel.exchange_declare(exchange='logs2',
                         exchange_type='direct')

message = "info: Hello Yuan!"

#绑定关键字routing_key='info'
channel.basic_publish(exchange='logs2',
                      routing_key='info',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

# 消费者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs2',
                         exchange_type='direct')

result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue


severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

#绑定多个关键字
for severity in severities:
    channel.queue_bind(exchange='logs2',
                       queue=queue_name,
                       routing_key=severity)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()

通配符

通配符交换机”与之前的路由模式相比,它将信息的传输类型的key更加细化,以“key1.key2.keyN....”的模式来指定信息传输的key的大类型和大类型下面的小类型,让消费者可以更加精细的确认自己想要获取的信息类型。而在消费者一段,不用精确的指定具体到哪一个大类型下的小类型的key,而是可以使用类似正则表达式(但与正则表达式规则完全不同)的通配符在指定一定范围或符合某一个字符串匹配规则的key,来获取想要的信息。

“通配符交换机”(Topic Exchange)将路由键和某模式进行匹配。此时队列需要绑定在一个模式上。符号“#”匹配一个或多个词,符号“*”仅匹配一个词因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。(这里与我们一般的正则表达式的“*”和“#”刚好相反,这里我们需要注意一下。)
下面是一个解释通配符模式交换机工作的一个样例

# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

#声明交换机   名字为logs3  模式为通配符模式(topic)
channel.exchange_declare(exchange='logs3',
                         exchange_type='topic')

message = "info: Hello ERU!"
channel.basic_publish(exchange='logs3',
                      routing_key='europe.news',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

# 消费者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

#声明交换机   名字为logs3  模式为通配符模式(topic)
channel.exchange_declare(exchange='logs3',
                         exchange_type='topic')

result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue

#通配符“#”匹配一个或多个词,符号“*”仅匹配一个词,因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。(这里与我们一般的正则表达式的“*”和“#”刚好相反,这里我们需要注意一下。)
channel.queue_bind(exchange='logs3',
                   queue=queue_name,
                   routing_key="#.news")

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()

标签:pika,exchange,python,RabbitMQ,queue,队列,connection,channel
From: https://blog.csdn.net/m0_71660867/article/details/139231927

相关文章

  • Python基于微信小程序的农产品溯源平台论文(1)
    目录1绪论41.1项目研究的背景41.2开发意义41.3项目研究内容与结构42开发技术介绍52.1B/S架构52.2小程序平台52.3python语言简介52.4MySQL介绍62.5MySQL环境配置82.6Django框架83系统分析93.1可行性分析93.1.1技术可行性93.1.2经济可行性......
  • python 批导
    安装官网https://www.python.org/ 安装教程https://blog.csdn.net/weixin_42212924/article/details/124979123 https://www.cnblogs.com/missjade/p/12992038.htmlPip源设置(使用清华源) 1、临时使用1pipinstall-ihttps://pypi.tuna.tsinghua.edu.cn/simpleso......
  • 解读注意力机制原理,教你使用Python实现深度学习模型
    本文分享自华为云社区《使用Python实现深度学习模型:注意力机制(Attention)》,作者:Echo_Wish。在深度学习的世界里,注意力机制(AttentionMechanism)是一种强大的技术,被广泛应用于自然语言处理(NLP)和计算机视觉(CV)领域。它可以帮助模型在处理复杂任务时更加关注重要信息,从而提高性能。在本......
  • Python筑基之旅-文件(夹)和流
    目录一、文件操作1、文件打开与关闭2、文件读写3、文件操作模式4、文件编码二、文件夹操作1、创建文件夹2、删除文件夹3、改变当前工作目录4、获取当前工作目录5、检查文件/文件夹是否存在6、遍历文件夹三、文件路径操作1、获取绝对路径2、构建完整路径3、检查......
  • Python数据处理训练
    (一)、中国大学排名数据分析与可视化;(写到实验报告中)【源代码程序】importrequestsfrombs4importBeautifulSoupimportmatplotlib.pyplotasplt #URL模板,按年份爬取数据URL_TEMPLATE="https://www.shanghairanking.cn/rankings/bcur/{}"  #爬取数据函数deff......
  • python04
    Python数据处理训练 班级:信2205-2班         学号:20224082        姓名:艾鑫一实验目的l 使学生熟练安装扩展库numpy、requests、bs4、pandas、seaborn、matplotlib等;l 使学生熟悉使用标准库cvs操作文件;l 使学生熟悉使用pandas进行数据分析的基本......
  • MQ和RabbitMQ
    一、微服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像发邮件,不需要马上回复。Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:1.耦合度高2.性能下降3.浪费资源4.级联失败.总结:同步调......
  • python venv
    venv虚拟环境作用Python虚拟环境主要是为不同Python项目创建一个隔离的环境,每个项目都可以拥有独立的依赖包环境,而项目间的依赖包互不影响;venv环境下,⽤pip安装的包都在安装到了venv这个环境下,系统python环境不受任何影响,也就是说,venv环境是专门针对当前项⽬创建的。......
  • 7-Python中的函数
    一、定义函数1.定义defgreat_user():   """显示简单的问候语"""     (函数的描述,可以自动生成一个说明文档)   print("Hello")二、传递实参(位置实参+关键字实参)1.位置实参(1)基于实参的顺序,在函数调用时把每个实参关联到函数定义中的形参(2)位置实......
  • wxpython开发gui界面基础
    wxpython开发gui基础知识一、前言记录使用wxpython开发gui工具吧。gui界面主要就是先布局,每个模块都是一个对象。二、基础知识importwxclassMyFrame(wx.Frame):def__int__(self):super(MyFrame,self).__int__()这里定义了一个主窗口为MyFrame的主窗口......