首页 > 编程语言 >Python全栈工程师(23:消息队列RabbitMQ)

Python全栈工程师(23:消息队列RabbitMQ)

时间:2023-06-27 23:32:51浏览次数:70  
标签:body pika 23 Python exchange RabbitMQ queue connection channel

谁能用通俗的语言解释一下什么是 RPC 框架?

深入浅出 RPC - 浅出篇

深入浅出 RPC - 深入篇


1小时写一个分布式系统基础框架(一个java实现帮助理解RPC)


RabbitMQ 消息队列

pip install pika
or
easy_install pika
or
源码
https://pypi.python.org/pypi/pika

实现最简单的队列通信

Python全栈工程师(23:消息队列RabbitMQ)_python

 send端:

#!/usr/bin/env python
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost')) #建立一个socket
channel = connection.channel()  #声明一个管道
 
#声明queue
channel.queue_declare(queue='hello') 
 
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello',  #queue名字
                      body='Hello World!')   #消息内容
print(" [x] Sent 'Hello World!'")
connection.close()

receive端:

#_*_coding:utf-8_*_
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost')) #建立一个socket
channel = connection.channel()  #建立一个管道
  
# 避免queue已经存在的情况. 
channel.queue_declare(queue='hello')  #声明queue
 
def callback(ch, method, properties, body):#ch:管道的内存对象;method:发送方法包含内容;
    print(" [x] Received %r" % body)
 
channel.basic_consume(callback,   #如果收到消息就调用callback
                      queue='hello', #声明从哪个队列收消息
                      no_ack=True) #no acknowledgement消息处理完自动确认
 
print(' [*] Waiting for messages. To exit press CTRL+C')  #均为byte格式
channel.start_consuming()  #开始收消息

远程连接rabbitmq server的话,需要配置权限 

1、首先在rabbitmq server上创建一个用户

sudo rabbitmqctl  add_user root admin

2、同时还要配置权限,允许从外面访问

sudo rabbitmqctl set_permissions -p / root".*" ".*" ".*"


set_permissions [-p vhost] {user} {conf} {write} {read}




vhost:授权用户使用的虚拟主机的名字,默认 /. user:授权可以使用指定虚拟主机的用户名称. conf:授权配置. write:授权写. read:授权读

客户端连接的时候需要配置认证参数

credentials = pika.PlainCredentials('root', 'admin')
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
    '10.211.55.5',5672,'/',credentials))
channel = connection.channel()

消息分发轮询

Python全栈工程师(23:消息队列RabbitMQ)_RPC_02

在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多

消息提供者代码

import pika
import time
connection =pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
 
# 声明queue
channel.queue_declare(queue='task_queue')
 
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent %r" % message)
connection.close()

消费者代码

#_*_coding:utf-8_*_
 
import pika, time
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
 
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(20)
    print(" [x] Done")
    print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
 
channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=True
                      )
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上。

为了确保消息永远不会丢失,RabByMQ支持消息确认。ACK(NoDeLeGeMeST)从消费者发送回告诉RabByMQ一个特定的消息已经被接收、处理,并且RabBMQ可以自由地删除它。

如果用户死亡(它的信道被关闭,连接被关闭,或者TCP连接丢失)而不发送ACK,RabBMQ将理解消息没有被完全处理并且将重新排队。如果同时有其他在线消费者,那么它将很快重新交付给另一个消费者。这样,你就可以确保没有消息丢失,即使工人偶尔死亡。


没有任何消息超时;当消费者死亡时,RabBMQ将重新传递消息。即使处理消息需要很长很长的时间也很好。

默认情况下,消息确认打开。在前面的示例中,我们通过no_ack=true标记显式地关闭它们。并在完成任务后删除此标志并向工人发送适当的确认。

def callback(ch, method, properties, body):
    print " [x] Received %r"% (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_consume(callback,
                      queue='hello')

使用此代码,我们可以确保即使在处理消息时使用CTRL+C杀死一个工人,也不会丢失任何东西。工人死后不久,所有未确认的信息将被重发。

消息持久化

我们已经学会了如何确保即使消费者死亡,任务也不会丢失(默认情况下,如果想禁用NoIAkAC= true)。但是如果RabByMQ服务器停止,我们的任务仍然会丢失。
当RabBMQ退出或崩溃时,除非你告诉它不要,否则它将忘记队列和消息。需要两件事来确保消息不会丢失:我们需要将队列和消息标记为持久的。
首先,我们必须确保RabBMQ永远不会失去我们的队列。为了做到这一点,我们需要声明它是durable=true:

channel.queue_declare(queue='hello', durable=True)

虽然这个命令本身是正确的,但它在我们的设置中是行不通的。这是因为我们已经定义了一个名为hello的队列,这是不持久的。RabBMQ不允许您重新定义具有不同参数的现有队列,并将返回任何试图尝试该程序的错误。但是有一个很快的解决办法——让我们声明一个名称不同的队列,例如task_queue:

channel.queue_declare(queue='task_queue', durable=True)  #队列持久化

这个队列声明更改需要应用于生产者和消费者代码。在这一点上,我们确信即使在RabByMQ重新启动时,task_queue2队列也不会丢失。现在我们需要将消息标记为持久的——通过提供一个值为2的传递模式。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # 消息持久化
                      ))

消息公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

Python全栈工程师(23:消息队列RabbitMQ)_RPC_03

 

channel.basic_qos(prefetch_count=1)

带消息持久化+公平分发的完整代码

生产者端

#!/usr/bin/env python
import pika
import sys
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # 消息持久化
                      ))
print(" [x] Sent %r" % message)
connection.close()

消费者端

#!/usr/bin/env python
import pika
import time
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag =method.delivery_tag)
 
channel.basic_qos(prefetch_count=1) #公平分发
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

广播:

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

Exchange 

1、接收从生产者发来的消息;

2、将消息放进队列.

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

  • fanout: 所有bind到此exchange的queue都可以接收消息;
  • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息;
  • topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息;
表达式符号说明:
#代表一个或多个字符,*代表任何字符
      例:#.a会匹配a.a,aa.a,aaa.a等
          *.a会匹配a.a,b.a,c.a等
 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout


fanout广播模式

headers: 通过headers 来决定把消息发给哪些queue。

Python全栈工程师(23:消息队列RabbitMQ)_持久化_04

消息publisher

import pika
import sys
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')  #1、fanout
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',#2
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消息subscriber

#_*_coding:utf-8_*_
import pika
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
 
channel.queue_bind(exchange='logs',
                   queue=queue_name) #绑定到转换器上
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

direct广播模式

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。



消息publisher

import pika
import sys
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

subscriber 

import pika
import sys
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

topic广播模式

不仅订阅基于severity的例子, 而且订阅基于发出日志的源. 更加灵活,可以只监听从 cron'收到的典型错误也可以收到从'kern'收到的所有例子;

Python全栈工程师(23:消息队列RabbitMQ)_持久化_05

publisher

import pika
import sys
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

subscriber

import pika
import sys
 
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" %sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()
获取所有日志:
python receive_logs_topic.py "#"
从 facility "kern"获取所有日志:
python receive_logs_topic.py "kern.*"
只收取 "critical" 日志:
python receive_logs_topic.py "*.critical"
 创建多种绑定:
python receive_logs_topic.py "kern.*" "*.critical"
用一个routing key "kern.critical" 类型发出日志:
python emit_log_topic.py "kern.critical" "A critical kernel error"

Remote procedure call (RPC)

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

Python全栈工程师(23:消息队列RabbitMQ)_RPC_06

RPC server

#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import pika
import time
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
 
channel = connection.channel()
 
channel.queue_declare(queue='rpc_queue')
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
 
def on_request(ch, method, props, body):
    n = int(body)
 
    print(" [.] fib(%s)" % n)
    response = fib(n)
 
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag =method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
 
print(" [x] Awaiting RPC requests")
channel.start_consuming()

RPC client

import pika
import uuid
 
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection =pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
 
        self.channel =self.connection.channel()
 
        result =self.channel.queue_declare(exclusive=True)
        self.callback_queue =result.method.queue
 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
 
    def on_response(self, ch, method, props, body):
        if self.corr_id ==props.correlation_id:
            self.response = body
 
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
 
fibonacci_rpc = FibonacciRpcClient()
 
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)


标签:body,pika,23,Python,exchange,RabbitMQ,queue,connection,channel
From: https://blog.51cto.com/u_12667998/6567775

相关文章

  • 阿里云dns解析python脚本
    用于阿里云dns解析的脚本#-*-coding:utf-8-*-#Thisfileisauto-generated,don'teditit.Thanks.importsysfromtypingimportListfromalibabacloud_alidns20150109.clientimportClientasAlidns20150109Clientfromalibabacloud_tea_openapiimportmodels......
  • python: pyQt5
    pipinstallPyQt5pipinstallPyQt5-toolshttps://www.w3schools.cn/pyqt5/pyqt5_hello_world.htmlhttps://doc.qt.io/qtforpython-5/contents.htmlhttps://zhuanlan.zhihu.com/p/162866700https://blog.csdn.net/youcans/article/details/120925109https://zhuanlan.zhihu.......
  • 2023年VSCode插件
    第一推动|2023年VSCode插件最新推荐(54款) 本文介绍前端开发领域常用的一些VSCode插件,插件是VSCode最重要的组成部分之一,本文列出了我自己在以往工作经验中积累的54款插件,个人觉得这些插件是有用或有趣的,根据它们的作用,我粗略的把它们分成了代码管理、文本和图片处理、前端框架......
  • Python3.7源码编译
    1.下载Python3.7.0源码gitclonehttps://github.com/python/cpython.gitgitcheckoutv3.7.0wgethttps://www.python.org/ftp/python/3.7.0/Python-3.7.0.tar.xz源码目录结构如下所示:(1)Include目录:包含Python提供的所有头文件,如果用户需要自己用C或C++来编写自定义模块扩展Python......
  • Python3.7源码编译
    1.下载Python3.7.0源码git clone https://github.com/python/cpython.gitgit checkout v3.7.0wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tar.xz源码目录结构如下所示:(1)Include目录:包含Python提供的所有头文件,如果用户需要自己用C或C++来编写自定义模......
  • python闭包与装饰器
    1.  闭包闭包定义:在函数嵌套的前提下;内部函数使用了外部函数的变量;并且外部函数返回了内部函数;我们把这个使用外部函数变量的内部函数称为闭包。闭包有三大特点:1.有内函数与外函数,即函数是嵌套的。2.内函数使用了外函数的变量与参数。3.外部函数的......
  • Python一个有趣的彩蛋
    上周组内技术分享会,朋友介绍了Python语言有趣的历史,其中一个有意思的环节就是Python之禅,或者叫Python的彩蛋-this.py,命令行执行python-c"importthis"或者在python解释器中执行importthis,会打印出如下的一段英文,TheZenofPython,byTimPetersBeautifulisbetterthanug......
  • Python | os.path库的用法
    os.path是Python标准库中的一个模块,提供了一些用于处理文件路径的函数和变量。它可以跨平台地处理不同操作系统下的路径问题,包括Windows、Linux、Unix等。os.path模块中的函数和变量可以用于处理路径字符串,并返回路径的各种组成部分,如文件名、目录名、扩展名等。同时,它也提供了一......
  • 自学C语言2023_6_27
    注释:快捷键:ctrl+k+c(先按k再按c)将选中行注释ctrl+k+u(先按k再按u)取消注释注释的作用是解释代码,注释内容不会运行选择语句:循环语句:  ......
  • Python之文档测试模块——doctest(转载)
    doctest是python自带的一个模块。doctest有两种使用方式:一种是嵌入到python源码中,另外一种是放到一个独立文件。doctest模块会搜索那些看起来像是python交互式会话中的代码片段,然后尝试执行并验证结果。 1doctest嵌入源码中下面的代码只有一个函数,里面嵌入了两个doctest测试......