首页 > 其他分享 >RabbitMQ的基本使用

RabbitMQ的基本使用

时间:2024-05-04 18:55:06浏览次数:21  
标签:基本 pika 队列 RabbitMQ QUEUE scrape 使用 channel NAME

在数据采集的过程中,可能需要一些进程间的通信,如

  1. 一个进程负责构造爬取请求,另一个负责执行这些请求;
  2. 某个数据爬取进程执行完毕,通知另一个负责数据处理的进程开始爬取数据;
  3. 某个进程新建了一个爬取任务,通知另一个负责数据爬取的进程开始爬取数据。

为了降低进程耦合度,需一个消息队列中间件来存储和转发消息,实现进程间通信,RabbitMQ 就是一个开源可靠灵活的消息队列中间件。

1. 特点

可靠性、路由灵活、消息集群、高可用、多协议支持、多语言客户端、管理界面、跟踪机制、插件机制等。

2. 准备

安装 RabbitMQ 以及操作它的 Python 库 pika(pip3 install pika)
RabbitMQ 安装包 参考链接:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.2
安装 RabbitMQ 的前置环境 Erlang,安装包参考链接:http://www.erlang.org/download

3. 基本使用

RabbitMQ作为消息队列,要实现进程间通信,本质上是一个生产者消费者模型,一个进程作为生产者往消息队列放入消息,另一个进程作为消费者监听并处理消息队列,有3点需要关注:

  1. 声明队列:通过指定参数,创建消息队列;
  2. 生产内容:生产者根据队列的连接信息连接队列,往队列放入消息;
  3. 消费内容:消费者根绝队列的连接信息连接队列,从队列去除消息。

声明队列并尝试往队列添加消息:

import pika

QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
channel.queue_declare(queue=QUEUE_NAME)

# 向队列放入消息
channel.basic_publish(exchange='', routing_key=QUEUE_NAME, body='Hello World!'.encode())

以上就是一个简单的生产者,可以命名为 producer.py,接下来新建一个 consumer.py,写入如下代码:

import pika

QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
channel.queue_declare(queue=QUEUE_NAME)

def callback(ch, method, properties, body):
    print(f'Get {body.decode()}')

# 从队列获取数据, auto_ack=True表示消费者获取消息之后会自动通知消息队列当前消息已被处理,可以移除这个消息。
channel.basic_consume(queue=QUEUE_NAME, auto_ack=True, on_message_callback=callback)
channel.start_consuming()

运行 consumer.py,然后每运行一次producer.py,consumer.py的控制台就会打印一次信息。

4. 随用随取

生产者如果往队列里放至过多请求导致消费者处理不过来,就会出现问题,因此消费者也应该有权控制取用消息的频率。
队列中的消息先用字符串表示,后面再更换为请求对象。修改之前代码如下:
producer.py

import pika

QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
channel.queue_declare(queue=QUEUE_NAME)

while True:
    data = input()
    channel.basic_publish(exchange='', routing_key=QUEUE_NAME, body=data.encode())
    print(f'Put {data}')

consumer.py

import pika

QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()

while True:
    input()
    method_frame, header, body = channel.basic_get(queue=QUEUE_NAME, auto_ack=True)
    if body:
        print(f'Get {body.decode()}')

先运行生产者代码,随便输入一些内容,比如:

// 控制台输入及输出
foo
Put foo
bar
Put bar
baz 
Put baz

然后运行消费者代码,不断回车,可以看到对应的打印内容。

Get 'foo'
Get 'bar'
Get 'baz'

以上示例代码便通过input方法控制了生产者生产消息以及消费者何时获取消息,实现了消费者的随用随取。

5. 优先级队列

若想设置消息的优先级,只需在声明队列时增加一个属性即可, MAX_PRIORITY = 100, arguments={'x-max-priority': MAX_PRIORITY}:
producer.py

import pika

MAX_PRIORITY = 100
QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape,新增arguments - x-max-priority设置最大优先级
channel.queue_declare(queue=QUEUE_NAME, arguments={
    'x-max-priority': MAX_PRIORITY
})

while True:
    data, priority = input().split()
    channel.basic_publish(exchange='', routing_key=QUEUE_NAME, properties=pika.BasicProperties(priority=int(priority), ), body=data.encode())
    print(f'Put {data}')

运行此文件,在控制台输入如下内容:

// 控制台输入及输出
foo 40
Put foo
bar 20
Put bar
baz 50
Put baz

再次运行之前的consumer.py,不断回车可以发现,输出的顺序为 baz, foo, baz,按照我们规定的优先级输出了。

6. 队列持久化

不设置队列持久化,数据在RabbitMQ重启后就没有了。声明队列时指定durable=True即可开启持久化。同时在添加消息的时候需指定BasicProperties对象的delivery_mode为2
procuder.py

import pika

MAX_PRIORITY = 100
QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
channel.queue_declare(queue=QUEUE_NAME, arguments={'x-max-priority': MAX_PRIORITY}, durable=True)

while True:
    data, priority = input().split()
    channel.basic_publish(exchange='', routing_key=QUEUE_NAME, properties=pika.BasicProperties(priority=int(priority), delivery_mode=2,), body=data.encode())
    print(f'Put {data}')

这样就可以持久化存储队列了。

7. 实战

将字符串消息改写为请求对象
构造请求对象时传入请求方法和URL即可:request = requests.Request('GET', url)
然后可以通过 pickle 工具进行序列化,发送到 RabbitMQ 中。
producer.py

import pika
import requests
import pickle

MAX_PRIORITY = 100
TOTAL = 100
QUEUE_NAME = 'scrape_queue'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
# channel.queue_declare(queue=QUEUE_NAME, arguments={'x-max-priority': MAX_PRIORITY}, durable=True)
channel.queue_declare(queue=QUEUE_NAME, durable=True)


for i in range(1, TOTAL + 1):
    url = f'https://ssr1.scrape.center/detail/{i}'
    request = requests.Request('GET', url)
    channel.basic_publish(exchange='', routing_key=QUEUE_NAME, properties=pika.BasicProperties(delivery_mode=2,), body=pickle.dumps(request))
    print(f'Put request of {url}')

consumer.py

import pika
import pickle
import requests

MAX_PRORITY = 100
QUEUE_NAME = 'scrape_queue'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
session = requests.Session()

def scrape(request):
    try:
        response = session.send(request.prepare())
        print(f'success scraped {response.url}')
    except requests.RequestException:
        print(f'error occured when scraping {request.url}')

while True:
    # input()
    method_frame, header, body = channel.basic_get(queue=QUEUE_NAME, auto_ack=True)
    if body:
        request = pickle.loads(body)
        print(f'Get {request}')
        scrape(request)

运行生产者代码后即构造了100个请求对象发送到了 RabbitMQ 中。
运行消费者代码可以看到,依次进行请求并打印了成功的信息。

Get <Request [GET]>
success scraped https://ssr1.scrape.center/detail/1
Get <Request [GET]>
success scraped https://ssr1.scrape.center/detail/2
Get <Request [GET]>
......
Get <Request [GET]>
success scraped https://ssr1.scrape.center/detail/100

标签:基本,pika,队列,RabbitMQ,QUEUE,scrape,使用,channel,NAME
From: https://www.cnblogs.com/achangblog/p/18172542

相关文章

  • dotnet 委托delegate的使用 定义和使用
    voidMain(){//委托-初级和高级的分水岭//1.委托的初体验//委托是一个引用类型,其实是一个类型,保存方法的指针(地址)(变量名字都是地址都是指针)//是一个数据类型实际是一个对象(委托对象,函数对象,列表对象)万物都是对象//指针指向一个方法当我......
  • 在Docker内部使用gdb调试器报错-Operation not permitted
    在docker内部使用gdb调试时刻遇到了gdb如下报错信息:warning:Errordisablingaddressspacerandomization:Operationnotpermitted原因地址随机化是linux一项安全特性,它允许内核进程启动每次加载库的时候都在随机化的分布在进程虚拟内存地址空间上(早期固定的库要加载......
  • 在Linux中,如何使用logrotate命令管理日志文件?
    logrotate是一个在Linux系统中用来管理和维护日志文件的工具。它可以自动地对日志文件进行压缩、删除旧的日志文件、创建新的日志文件,以及在日志轮换时运行指定的脚本。以下是如何使用logrotate命令的一些基本步骤和配置方法:1.安装logrotate在大多数Linux发行版中,logro......
  • 窗口的扩展风格和基本风格
    1.DWORDdwExStyle//窗口的扩展风格(加强版专有)指定扩展的窗口样式。为以下值中的一个或多个(devc++用“|”连接):WS_EX_ACCEPTFILES 指定此样式创建一个窗口接受拖放文件。WS_EX_APPWINDOW 当窗口可见时布置一个顶级窗口到任务栏上。WS_EX_CLIENTEDGE 指定窗口具有三维外观——......
  • 窗口的扩展风格和基本风格
    1.DWORDdwExStyle//窗口的扩展风格(加强版专有)指定扩展的窗口样式。为以下值中的一个或多个(devc++用“|”连接):WS_EX_ACCEPTFILES 指定此样式创建一个窗口接受拖放文件。WS_EX_APPWINDOW 当窗口可见时布置一个顶级窗口到任务栏上。WS_EX_CLIENTEDGE 指定窗口具有三维外观——......
  • 窗口的扩展风格和基本风格
    1.DWORDdwExStyle//窗口的扩展风格(加强版专有)指定扩展的窗口样式。为以下值中的一个或多个(devc++用“|”连接):WS_EX_ACCEPTFILES 指定此样式创建一个窗口接受拖放文件。WS_EX_APPWINDOW 当窗口可见时布置一个顶级窗口到任务栏上。WS_EX_CLIENTEDGE 指定窗口具有三维外观——......
  • 窗口的扩展风格和基本风格
    1.DWORDdwExStyle//窗口的扩展风格(加强版专有)指定扩展的窗口样式。为以下值中的一个或多个(devc++用“|”连接):WS_EX_ACCEPTFILES 指定此样式创建一个窗口接受拖放文件。WS_EX_APPWINDOW 当窗口可见时布置一个顶级窗口到任务栏上。WS_EX_CLIENTEDGE 指定窗口具有三维外观——......
  • 窗口的扩展风格和基本风格
    1.DWORDdwExStyle//窗口的扩展风格(加强版专有)指定扩展的窗口样式。为以下值中的一个或多个(devc++用“|”连接):WS_EX_ACCEPTFILES 指定此样式创建一个窗口接受拖放文件。WS_EX_APPWINDOW 当窗口可见时布置一个顶级窗口到任务栏上。WS_EX_CLIENTEDGE 指定窗口具有三维外观——......
  • 华为USG6000防火墙WEB基本配置
    第一步:浏览器登录https://192.168.0.1 (缺省状态下WEB登录地址)进入防火墙WEB配置界面。 第二步:输入用户名和密码登录,用户名:admin密码:Admin@123(缺省状态下)。 第三步:进入快速向导,根据指引进行配置,然后点击【下一步】。 第四步:配置基本信息。(建......
  • CMakeLists.txt --- install使用
    例:cmake_minimum_required(VERSION3.9)project(test)set(CMAKE_BUILD_TYPEDebug)add_library(hahatest.cpp)install(TARGEThahaDESTINATION/home/linxisuo/project/test)install(DIRECTORY${CMAKE_SOURCE_DIR}/testDESTINATION/home/linxisuo)说明:1.安装......