简介
1.本教程假设RabbitMQ已安装并在本地主机的标准端口(5672)上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
2.如果你在学习本教程时遇到困难,可以通过 GitHub Discussions或者RabbitMQ community Discord与我们联系
RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把想要邮寄的邮件放在邮箱里时,你可以确信邮递员最终会把邮件送到你的收件人那里。在这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。
RabbitMQ和邮局的主要区别在于,它不处理纸张,而是接受、存储和转发二进制数据块——消息。
RabbitMQ和一般的消息传递都使用了一些行话
- 生产只意味着发送。发送消息的程序是生产者:P
- 队列是RabbitMQ中邮箱的名称。虽然消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。队列只受主机的内存和磁盘限制,它本质上是一个大的消息缓冲区。
许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
这就是我们表示队列的方式:queue_name - 消费与接受具有相似的含义。消费者是一个主要等待接收消息的程序:C
请注意,生产者、消费者和经纪人不必驻留在同一主机上;事实上,在大多数应用程序中,它们都不是。应用程序也可以既是生产者又是消费者。
Hello World
使用php-amqplib客户端
在本教程的这一部分中,我们将用PHP编写两个使用RabbitMQ进行通信的程序。本教程使用需要PHP 7.x或8.x的客户端库。
第一个程序是发送单个消息的生产者,第二个程序是接收消息并将其打印出来的消费者。我们将介绍php-amqlib API中的一些细节,重点介绍这件非常简单的事情。这是消息传递的“Hello World”
在下面的图表中,“P”是我们的生产者,“C”是消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区
php-amqplib客户端库
RabbitMQ讲多种协议。本教程介绍AMQP 0-9-1,这是一种用于消息传递的开放通用协议。RabbitMQ有许多不同语言的客户端。在本教程中,我们将使用php-amqplib和Composer进行依赖关系管理
将composer.json文件添加到您的项目中:
{ "require": { "php-amqplib/php-amqplib": "^3.2" } }只要您安装了Composer并正常工作,就可以运行以下命令:
php composer.phar install
现在我们已经安装了php-amqplib库,我们可以编写一些代码。
发送消息
我们将调用消息发布者(发送者) send.php 和消息接收者 receive.php 。发布者将连接到RabbitMQ,发送一条消息,然后退出。
在 send.php 里,我们需要包括库并使用必要的类:
1 require_once __DIR__ . '/vendor/autoload.php'; 2 use PhpAmqpLib\Connection\AMQPStreamConnection; 3 use PhpAmqpLib\Message\AMQPMessage;
然后我们可以创建到服务器的连接:
1 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 2 $channel = $connection->channel();
连接抽象了套接字连接,并为我们处理协议版本协商和身份验证等。在这里,我们连接到本地机器上的RabbitMQ节点,即本地主机。如果我们想连接到不同机器上的节点或托管PHP客户端推荐的代理的主机,我们只需在此处指定其主机名或IP地址。
接下来,我们创建一个通道,这是大多数用于完成任务的API所在的位置
要发送消息,我们必须声明一个队列供我们发送;然后我们可以将消息发布到队列:
1 $channel->queue_declare('hello', false, false, false, false); 2 3 $msg = new AMQPMessage('Hello World!'); 4 $channel->basic_publish($msg, '', 'hello'); 5 6 echo " [x] Sent 'Hello World!'\n";
声明队列是幂等的——只有当它不存在时才会创建。消息内容是一个字节数组,因此您可以在那里对任何您喜欢的内容进行编码。
最后,我们关闭通道和连接。
$channel->close(); $connection->close();
这是整个send.php类。
1 <?php 2 3 require_once __DIR__ . '/vendor/autoload.php'; 4 use PhpAmqpLib\Connection\AMQPStreamConnection; 5 use PhpAmqpLib\Message\AMQPMessage; 6 7 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 8 $channel = $connection->channel(); 9 10 $channel->queue_declare('hello', false, false, false, false); 11 12 $msg = new AMQPMessage('Hello World!'); 13 $channel->basic_publish($msg, '', 'hello'); 14 15 echo " [x] Sent 'Hello World!'\n"; 16 17 $channel->close(); 18 $connection->close(); 19 ?>
发送不起作用!
如果这是你第一次使用RabbitMQ,而你没有看到“已发送”的消息,那么你可能会挠头想知道出了什么问题。也许代理启动时没有足够的可用磁盘空间(默认情况下,它需要至少50 MB的可用空间),因此拒绝接受消息。检查代理日志文件,查看是否记录了资源警报,必要时降低可用磁盘空间阈值。配置指南将向您展示如何设置disk_free_limit。
接收消息
这就是我们的出版商。我们的接收者监听来自RabbitMQ的消息,因此与发布单个消息的发布者不同,我们将保持接收者运行以监听消息并将其打印出来。
代码在 receive.php 里具有与 send.php 几乎相同的include和用法:
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection;
设置与出版商相同;我们打开一个连接和一个通道,并声明我们将从中使用的队列。请注意,这与向其发送发布的队列相匹配。
1 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 2 $channel = $connection->channel(); 3 4 $channel->queue_declare('hello', false, false, false, false); 5 6 echo " [*] Waiting for messages. To exit press CTRL+C\n";
请注意,我们在这里也声明了队列。因为我们可能会在发布者之前启动消费者,所以我们希望在尝试消费来自队列的消息之前确保队列存在。
我们即将告诉服务器将队列中的消息传递给我们。我们将定义一个PHP可调用函数,用于接收服务器发送的消息。请记住,消息是从服务器异步发送到客户端的。
1 $callback = function ($msg) { 2 echo ' [x] Received ', $msg->body, "\n"; 3 }; 4 5 $channel->basic_consume('hello', '', false, true, false, false, $callback); 6 7 try { 8 $channel->consume(); 9 } catch (\Throwable $exception) { 10 echo $exception->getMessage(); 11 }
当我们的$channel有回调时,我们的代码将被阻塞。每当我们收到消息时,我们的$callback函数都会收到收到的消息
这是整个receive.php类
1 <?php 2 3 require_once __DIR__ . '/vendor/autoload.php'; 4 use PhpAmqpLib\Connection\AMQPStreamConnection; 5 6 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 7 $channel = $connection->channel(); 8 9 $channel->queue_declare('hello', false, false, false, false); 10 11 echo " [*] Waiting for messages. To exit press CTRL+C\n"; 12 13 $callback = function ($msg) { 14 echo ' [x] Received ', $msg->getBody(), "\n"; 15 }; 16 17 $channel->basic_consume('hello', '', false, true, false, false, $callback); 18 19 try { 20 $channel->consume(); 21 } catch (\Throwable $exception) { 22 echo $exception->getMessage(); 23 } 24 25 $channel->close(); 26 $connection->close();
一起执行
现在我们可以运行这两个脚本了。在终端中,运行消费者(接收者):
php receive.php
然后,运行发布者(发送者):
php send.php
消费者将通过RabbitMQ打印从发送者那里收到的消息。接收器将继续运行,等待消息(使用Ctrl-C停止),因此请尝试从另一个终端运行发送器
列出队列
您可能希望查看RabbitMQ有哪些队列以及其中有多少消息。您可以使用rabbitmqtl工具(作为特权用户)完成此操作:
sudo rabbitmqctl list_queues在Windows上,省略sudo
rabbitmqctl.bat list_queues
PHP连接代理
虽然本教程努力保持简单,并专注于解释RabbitMQ概念,但重要的是要指出PHP应用程序特有的东西。在许多情况下,PHP应用程序将无法使用RabbitMQ假定的长期连接,从而产生了一种称为高连接流失的情况
为了避免这种情况,建议PHP用户尽可能在生产环境中使用特殊的代理。代理避免了连接混乱,或者至少显著减少了连接混乱。
现在,是时候继续第2部分并构建一个简单的工作队列了。
标签:教程,false,队列,HelloWorld,RabbitMQ,php,channel,消息 From: https://www.cnblogs.com/dreamboycx/p/18641600