首页 > 编程语言 >PHP与RabbitMQ消息队列简单示例

PHP与RabbitMQ消息队列简单示例

时间:2023-02-18 10:34:20浏览次数:36  
标签:false 消费者 示例 队列 RabbitMQ 消息 PHP channel


​RabbitMQ​​提供跨语言接口,我们可以使用主流编程语言Java,C,C++,Python,PHP等和RabbitMQ做对接。RabbitMQ有消息确认机制、灵活的路由控制、以及消息集群高可用,使得很多大型系统使用RabbitMQ做消息处理系统。

消息队列(​​Message Queue​​)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

简单的消息队列,我们完全可以使用Redis实现,而相对复杂的需求,比如消息确认、消息持久化、高可用等需要用RabbitMQ这样的大器来做比较合适。

安装php-amqplib

php-amqplib是一个纯PHP库,使用它,基于PHP的脚本客户端就可以轻松的连接和操作RabbitMQ。
我们使用在上个swoole项目里面composer来安装。

composer require php-amqplib/php-amqplib

示例说明
生产者(Producer)和消费者(Consumer)是消息队列的基本概念,生产者是指生产消息的一方,也是消息发送方,消费者就是消费消息的一方,也是消息接收方,队列就是存储消息的一个缓存区。本文实例将由生产者发送很多消息给消息队列,由多个消费者来消费队列中的消息。我们可以想象这样的场景:皮鞋生产打包打包车间,不断有成品鞋进入传送带(消息队列)等待操作工人(消费者)将皮鞋打包。因为等待打包的鞋子特别多,我们需要安排多个打包工人在传送带两边,及时从传送带取出成品鞋,然后装箱打包。我们要求是要确保工人最后打包好的皮鞋数量一双不少,不能因为打包工人操作慢或者个人原因暂时离开生产线,导致最终打包数不一致。

消息发送

生产者将消息发送给队列,至于谁来消费(处理)这些消息,生产者不管。

消息队列(MQ),用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

消息到达队列中后,如果没有一个消费者来处理消息的话,我们希望队列中的消息不要丢弃,也就是消息持久化。在生产者和消费者中都要将queue_declare第3个参数设置为true,表示让消息队列持久化。

$channel->queue_declare($queue, false, true, false, false);

此外,我们可以确保即使RabbitMQ重启了,消息队列不会丢失,在生产者端设置:

'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT。

现在我们建立生产者文件sender.php。

<?php
/**
* @sender.php
* @消息生产者-分发任务
*/

require_once dirname(__DIR__ ). '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$queue = 'worker';

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection(
'192.168.3.106',
5672,
'admin', //user
'admin', //password
'admin' //vhost
);
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false); //第3个参数设置为true,表示让消息队列持久化

for ($i = 0; $i < 100; $i++) {
$arr = [
'id' => 'message_' . $i,
'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i,
'content' => 'shanhubei-' . time()
];
$data = json_encode($arr);
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); 设置rabbitmq重启后也不会丢失队列,或者设置为'delivery_mode' => 2
$channel->basic_publish($msg, '', $queue);

echo 'Send message: ' . $data . PHP_EOL;
}

$channel->close();
$connection->close();

上述代码中,我们模拟了生产者向队列中发送了100条订单消息。

消息接收

消费者是指完成消息的接收和处理的客户端程序,消费者就如同生产线上的操作工人,他们按照操作规程从传送带上取出产品后有序的完成后续工作任务。

实际项目中,如果消费者处理消息能力不够时,就要开启多个消费者来消费队列中的消息。默认情况下,RabbitMQ将会把队列中的消息平均分配给每个消费者。如果消费者要对分配到的消息任务处理时间很长(耗时任务),那么处理消息任务的时候就有可能会遇到意外。比如某个消费者断电了,或者出故障了,那它正在处理的消息会怎么办?这里就是RabbitMQ的消息确认机制,为了保证数据不丢失,RabbitMQ会将未处理完的消息分配给下一个消费者处理。

此外RabbitMQ还可以设置公平分配消息任务,不会给某个消费者同时分配多个消息处理任务,因为消费者无法同时处理多个消息任务。换句话说,RabbitMQ在处理和确认消息之前,不会向消费者发送新的消息,而是将消息分发给下一个不忙的消费者。

$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息

我们现在建立消费者文件​​receiver.php​​,代码如下:

<?php
/**
* @Author: Helloweba
* @receiver.php
* @消息消费者-接收端
*/

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$queue = 'worker';

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test');
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL;

$callback = function($msg){
echo " Received message:", $msg->body, PHP_EOL;
sleep(1); //模拟耗时执行
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息
$channel->basic_consume($queue, '', false, false, false, false, $callback); //第4个参数值为false表示启用消息确认

while(count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

模拟测试

现在我们运行多个消费者终端,可以打开多个ssh客户端,​​client1​​​和​​client2​​运行:

php receive.php

然后再开个终端,运行生产者:

php sender.php

由于消费者是阻塞运行的,他们会一直等待队列中的消息,当有消息就会去取出来处理。我们可以模拟将其中某个客户端中断,即断开某个消费者。然后再看消息是不是被其他消费者接收处理了。同样我们可以模拟将客户端全部重启,看看队列中的消息是否没有丢失。

当client1中断连接RabbitMQ后,再次运行连接RabbitMQ,在client2中看到的消息处理情况,注意看消息id。

标签:false,消费者,示例,队列,RabbitMQ,消息,PHP,channel
From: https://blog.51cto.com/u_15967457/6065102

相关文章

  • 宝塔面板PHP7.3 安装mcrypt扩展
    mcrypt扩展从​​PHP7.1.0​​开始废弃,所以需要继续使用这个扩展的话需要自行编译mcrypt扩展或者采用pcel安装。yuminstalllibmcryptlibmcrypt-develmcryptmhashw......
  • PHP输出13位时间戳函数
    functiongetUnixTimestamp(){list($s1,$s2)=explode('',microtime());return(float)sprintf('%.0f',(floatval($s1)+floatval($s2))*1000);}......
  • php时间格式转换
    php时间格式的转换函数有date(),strtotime()函数,php原生的时间类也可以转换时间格式。1、Y-m-d转换为时间戳 例:2017-08-22转化为时间戳  strtotime(‘2017-08-22’......
  • 腾讯云视频点播示例
    <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metahttp-equiv="X-UA-Compatible"content="IE=edge,chrome=1"><metaname="viewport"content="wi......
  • rabbitmq消息丢失
    发送消息后,队列中没有显示最后发现只能在flatMap中创建和关闭RpcClient。publicMono<ServerResponse>addCart(ServerRequestrequest){AtomicReference<Cart......
  • docker rabbitmq数据持久化
    RabbitMQ消息队列怎样做到服务宕机或重启消息不丢失RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件。当发布一个持久性消息到......
  • PHP获取下个月今天时间
    获取php下个月当前时间publicstaticfunctionfq_time($time=""){if(!$time)$time=time();$arr=getdate($time);//判断月份$year=$arr['year']......
  • php连接数据库
    数据库查询分类名称php连接数据库输入网址http://myblog:8080,并显示连接成功Php连接数据库内容,读取myblog导航栏数据PHP查询MySQL数据库后返回中文为问号添加$conn->que......
  • PHP连接数据库
    在数据库中查询表格所有信息查询指定的分类名称在VScode中敲代码,并在网页中验证在PHP中连接数据库内容,并刷新网页查看输出导航栏在数据库中查询文章标题和文章内容利用<hr>......
  • PHP连接数据库、执行SELECT的SQL语句、在页面上输出
    数据库内查询文章分类php连接数据库,代码如下    页面显示连接成功 php读取导航栏数据,代码如下  页面显示如下数据库内查询文章  在原来代码上......