RabbitMQ tutorial - "Hello world!"
本例
- 阻塞线程方式
- 一生产者一消费者
依赖项
- abbitMQ is installed
- running on localhost on the standard port (5672).
理解
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.
The major difference between RabbitMQ and the post office is that it doesn't deal with paper, instead it accepts, stores, and forwards binary blobs of data ‒ messages.
RabbitMQ 术语(用于节点描述)
- producer / Producing
Producing means nothing more than sending. A program that sends messages is a producer :
(P)
- queue
A queue is the name for the post box in RabbitMQ. Although messages flow through RabbitMQ and your applications, they can only be stored inside a queue. A queue is only bound by the host's memory & disk limits, it's essentially a large message buffer.
Many producers can send messages that go to one queue, and many consumers can try to receive data from one queue.
This is how we represent a queue:
||queue_name||
- consumer / Consuming
Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages:
(C)
producer, consumer, broker 和 application, host
Note that the producer, consumer, and broker do not have to reside on the same host; indeed in most applications they don't. An application can be both a producer and consumer, too.
Hello World! 示例(Pika库)
一个producer (sender)发,一个consumer (receiver)收,收了之后打印。
节点说明
In the diagram below, "P" is our producer and "C" is our consumer. The box in the middle is a queue - a message buffer that RabbitMQ keeps on behalf of the consumer.
完整设计如下:
(P) -> ||hello|| -> (C)
Producer sends messages to the "hello" queue. The consumer receives messages from that queue.
协议
RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, general-purpose protocol for messaging.
安装pika
代码(命令行)
python -m pip install pika --upgrade
代码说明
发送
P -> ||hello||
发送代码(send.py)发送一个简单消息到队列中。
首先是连上服务器
The first thing we need to do is to establish a connection with RabbitMQ server.
代码(py)
!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
如果不是本地机器
We're connected now, to a broker on the local machine - hence the localhost. If we wanted to connect to a broker on a different machine we'd simply specify its name or IP address here.
创建名为hello的队列
Next, before sending we need to make sure the recipient queue exists. If we send a message to non-existing location, RabbitMQ will just drop the message. Let's create a hello queue to which the message will be delivered:
代码(py)
channel.queue_declare(queue='hello')
发送消息
At this point we're ready to send a message. Our first message will just contain a string Hello World! and we want to send it to our hello queue.
In RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. But let's not get dragged down by the details ‒ you can read more about exchanges in the third part of this tutorial. All we need to know now is how to use a default exchange identified by an empty string. This exchange is special ‒ it allows us to specify exactly to which queue the message should go. The queue name needs to be specified in the routing_key parameter:
代码(py)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection.
代码(py)
connection.close()
如果发送不成功
If this is your first time using RabbitMQ and you don't see the "Sent" message then you may be left scratching your head wondering what could be wrong. Maybe the broker was started without enough free disk space (by default it needs at least 200 MB free) and is therefore refusing to accept messages. Check the broker logfile to confirm and reduce the limit if necessary. The configuration file documentation will show you how to set disk_free_limit.
接收
||hello|| -> (C)
接收代码(receive.py)从队列中接收消息,输出到屏幕。
连接到服务器的代码和发送代码相同。
queue_declare 对于同一个名称的队列可以调用多次,只有第一次调用执行创建队列的操作。
代码(py)
channel.queue_declare(queue='hello')
You may ask why we declare the queue again ‒ we have already declared it in our previous code. We could avoid that if we were sure that the queue already exists. For example if send.py program was run before. But we're not yet sure which program to run first. In such cases it's a good practice to repeat declaring the queue in both programs.
查看所有队列(以及多少消息在里面)
linux系统(as a privileged user)
代码(命令行)
sudo rabbitmqctl list_queues
Windows系统
代码(命令行)
rabbitmqctl.bat list_queues
接收消息需要用到回调函数
代码(py)
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
回调函数指定处理队列
代码(py)
channel.basic_consume(queue='hello',
auto_ack=True, #
on_message_callback=callback)
上面代码执行成功的前提是'hello'队列已经创建。
添加按键(Ctrl+C)捕捉
And finally, we enter a never-ending loop that waits for data and runs callbacks whenever necessary, and catch KeyboardInterrupt during program shutdown.
代码(py)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if name == 'main':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
py文件方式的代码
send.py (source)
代码(py)
!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py (source)
代码(py)
!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if name == 'main':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
先运行接收程序然后运行发送程序
Now we can try out our programs in a terminal. First, let's start a consumer, which will run continuously waiting for deliveries:
运行接收程序(会一直运行,可按Ctrl+C退出执行)
python receive.py
接收端输出
=> [*] Waiting for messages. To exit press CTRL+C
运行发送程序(每执行一次仅发送一条消息后退出)
python send.py
发送端输出
=> [x] Sent 'Hello World!'
接收端输出
=> [*] Waiting for messages. To exit press CTRL+C
=> [x] Received 'Hello World!'
动手:多运行几次发送程序,观察接收端的输出。
命令行方式的代码:Producer + Consumer
接收消息的示例(Consumer)
代码(py)
import pika
连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
声明队列
channel.queue_declare(queue='hello')
定义回调函数处理接收到的消息
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
设置消费者
channel.basic_consume(queue='hello',
on_message_callback=callback,
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
发送消息的示例(Producer)
代码(py)
import pika
连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
声明一个队列
channel.queue_declare(queue='hello')
发送消息
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
来源
来自系列
https://www.rabbitmq.com/tutorials
中的下文删减
https://www.rabbitmq.com/tutorials/tutorial-one-python
其他参考
https://www.rabbitmq.com/
https://www.rabbitmq.com/tutorials
获取帮助
GitHub Discussions or RabbitMQ community Discord.
安装说明和下载地址
https://www.rabbitmq.com/docs/install-windows
https://www.erlang.org/downloads
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.3