介绍
RabbitMQ 是一个消息中间件,它实现了 AMQP (Advanced Message Queuing Protocol) 协议。本教程将引导你通过几个简单的步骤来学习如何使用 RabbitMQ 发送和接收消息。
环境准备
1. 安装 RabbitMQ
- 在你的系统上安装 RabbitMQ: https://www.rabbitmq.com/download.html
- 启动服务: `sudo rabbitmq-server`
2. 安装客户端库
- Python 示例将使用 `pika` 库: `pip install pika`
第一步: 创建生产者
创建一个简单的生产者,用于发送消息到 RabbitMQ 服务器。
```python
import pika
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
message = "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(" [x] Sent %r" % message)
connection.close()
if __name__ == '__main__':
main()
```
第二步: 创建消费者
创建一个简单的消费者,用于接收来自 RabbitMQ 服务器的消息。
```python
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello',
on_message_callback=callback,
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
第三步: 使用持久化消息
确保消息在 RabbitMQ 重启后仍然存在。
生产者代码修改
```python
import pika
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
message = "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
if __name__ == '__main__':
main()
```
消费者代码修改
```python
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello',
on_message_callback=callback,
auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
第四步: 使用工作队列
实现一个简单的工作队列,可以分发任务给多个工作者。
生产者
```python
import pika
import sys
import random
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
message += f" {random.randint(1, 10)}"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2)) # make message persistent
print(" [x] Sent %r" % message)
connection.close()
if __name__ == '__main__':
main()
```
工作者
```python
import pika
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue',
on_message_callback=callback)
channel.start_consuming()
if __name__ == '__main__':
main()
```
标签:__,pika,入门教程,RabbitMQ,queue,connection,main,channel From: https://blog.csdn.net/qq_40698086/article/details/141804708