在数据采集的过程中,可能需要一些进程间的通信,如
- 一个进程负责构造爬取请求,另一个负责执行这些请求;
- 某个数据爬取进程执行完毕,通知另一个负责数据处理的进程开始爬取数据;
- 某个进程新建了一个爬取任务,通知另一个负责数据爬取的进程开始爬取数据。
为了降低进程耦合度,需一个消息队列中间件来存储和转发消息,实现进程间通信,RabbitMQ 就是一个开源可靠灵活的消息队列中间件。
1. 特点
可靠性、路由灵活、消息集群、高可用、多协议支持、多语言客户端、管理界面、跟踪机制、插件机制等。
2. 准备
安装 RabbitMQ 以及操作它的 Python 库 pika(pip3 install pika)
RabbitMQ 安装包 参考链接:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.2
安装 RabbitMQ 的前置环境 Erlang,安装包参考链接:http://www.erlang.org/download
3. 基本使用
RabbitMQ作为消息队列,要实现进程间通信,本质上是一个生产者消费者模型,一个进程作为生产者往消息队列放入消息,另一个进程作为消费者监听并处理消息队列,有3点需要关注:
- 声明队列:通过指定参数,创建消息队列;
- 生产内容:生产者根据队列的连接信息连接队列,往队列放入消息;
- 消费内容:消费者根绝队列的连接信息连接队列,从队列去除消息。
声明队列并尝试往队列添加消息:
import pika
QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
channel.queue_declare(queue=QUEUE_NAME)
# 向队列放入消息
channel.basic_publish(exchange='', routing_key=QUEUE_NAME, body='Hello World!'.encode())
以上就是一个简单的生产者,可以命名为 producer.py,接下来新建一个 consumer.py,写入如下代码:
import pika
QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
channel.queue_declare(queue=QUEUE_NAME)
def callback(ch, method, properties, body):
print(f'Get {body.decode()}')
# 从队列获取数据, auto_ack=True表示消费者获取消息之后会自动通知消息队列当前消息已被处理,可以移除这个消息。
channel.basic_consume(queue=QUEUE_NAME, auto_ack=True, on_message_callback=callback)
channel.start_consuming()
运行 consumer.py,然后每运行一次producer.py,consumer.py的控制台就会打印一次信息。
4. 随用随取
生产者如果往队列里放至过多请求导致消费者处理不过来,就会出现问题,因此消费者也应该有权控制取用消息的频率。
队列中的消息先用字符串表示,后面再更换为请求对象。修改之前代码如下:
producer.py
import pika
QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
channel.queue_declare(queue=QUEUE_NAME)
while True:
data = input()
channel.basic_publish(exchange='', routing_key=QUEUE_NAME, body=data.encode())
print(f'Put {data}')
consumer.py
import pika
QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
while True:
input()
method_frame, header, body = channel.basic_get(queue=QUEUE_NAME, auto_ack=True)
if body:
print(f'Get {body.decode()}')
先运行生产者代码,随便输入一些内容,比如:
// 控制台输入及输出
foo
Put foo
bar
Put bar
baz
Put baz
然后运行消费者代码,不断回车,可以看到对应的打印内容。
Get 'foo'
Get 'bar'
Get 'baz'
以上示例代码便通过input方法控制了生产者生产消息以及消费者何时获取消息,实现了消费者的随用随取。
5. 优先级队列
若想设置消息的优先级,只需在声明队列时增加一个属性即可, MAX_PRIORITY = 100, arguments={'x-max-priority': MAX_PRIORITY}:
producer.py
import pika
MAX_PRIORITY = 100
QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape,新增arguments - x-max-priority设置最大优先级
channel.queue_declare(queue=QUEUE_NAME, arguments={
'x-max-priority': MAX_PRIORITY
})
while True:
data, priority = input().split()
channel.basic_publish(exchange='', routing_key=QUEUE_NAME, properties=pika.BasicProperties(priority=int(priority), ), body=data.encode())
print(f'Put {data}')
运行此文件,在控制台输入如下内容:
// 控制台输入及输出
foo 40
Put foo
bar 20
Put bar
baz 50
Put baz
再次运行之前的consumer.py,不断回车可以发现,输出的顺序为 baz, foo, baz,按照我们规定的优先级输出了。
6. 队列持久化
不设置队列持久化,数据在RabbitMQ重启后就没有了。声明队列时指定durable=True即可开启持久化。同时在添加消息的时候需指定BasicProperties对象的delivery_mode为2
procuder.py
import pika
MAX_PRIORITY = 100
QUEUE_NAME = 'scrape'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
channel.queue_declare(queue=QUEUE_NAME, arguments={'x-max-priority': MAX_PRIORITY}, durable=True)
while True:
data, priority = input().split()
channel.basic_publish(exchange='', routing_key=QUEUE_NAME, properties=pika.BasicProperties(priority=int(priority), delivery_mode=2,), body=data.encode())
print(f'Put {data}')
这样就可以持久化存储队列了。
7. 实战
将字符串消息改写为请求对象
构造请求对象时传入请求方法和URL即可:request = requests.Request('GET', url)
然后可以通过 pickle 工具进行序列化,发送到 RabbitMQ 中。
producer.py
import pika
import requests
import pickle
MAX_PRIORITY = 100
TOTAL = 100
QUEUE_NAME = 'scrape_queue'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
# 声明队列,名称叫 scrape
# channel.queue_declare(queue=QUEUE_NAME, arguments={'x-max-priority': MAX_PRIORITY}, durable=True)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
for i in range(1, TOTAL + 1):
url = f'https://ssr1.scrape.center/detail/{i}'
request = requests.Request('GET', url)
channel.basic_publish(exchange='', routing_key=QUEUE_NAME, properties=pika.BasicProperties(delivery_mode=2,), body=pickle.dumps(request))
print(f'Put request of {url}')
consumer.py
import pika
import pickle
import requests
MAX_PRORITY = 100
QUEUE_NAME = 'scrape_queue'
# 连接 RabbitMQ 服务
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明频道对象,用以操作队列内消息的生产和消费
channel = connection.channel()
session = requests.Session()
def scrape(request):
try:
response = session.send(request.prepare())
print(f'success scraped {response.url}')
except requests.RequestException:
print(f'error occured when scraping {request.url}')
while True:
# input()
method_frame, header, body = channel.basic_get(queue=QUEUE_NAME, auto_ack=True)
if body:
request = pickle.loads(body)
print(f'Get {request}')
scrape(request)
运行生产者代码后即构造了100个请求对象发送到了 RabbitMQ 中。
运行消费者代码可以看到,依次进行请求并打印了成功的信息。
Get <Request [GET]>
success scraped https://ssr1.scrape.center/detail/1
Get <Request [GET]>
success scraped https://ssr1.scrape.center/detail/2
Get <Request [GET]>
......
Get <Request [GET]>
success scraped https://ssr1.scrape.center/detail/100
标签:基本,pika,队列,RabbitMQ,QUEUE,scrape,使用,channel,NAME
From: https://www.cnblogs.com/achangblog/p/18172542