RabbitMQ关系说明:
交换机:一个交换机可以有多个路由(routeKey),一个路由(routeKey)可以绑定多个队列。在发布消息的时候指定交换机对应的路由(routeKey),即可对应到该路由(routeKey)对应的队列。
队列:常驻内存监听进行消费。
准备
- 安装RabbitMq,指定数据卷
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672
-v /data/server/rabbitmq/data:/var/lib/rabbitmq rabbitmq:3.13-management-alpine
- 安装延迟插件
延迟插件地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange?tab=readme-ov-file
1.下载.ez文件到映射目录
rabbitmq_delayed_message_exchange-3.13.0.ez
2.找到插件目录,将插件复制到该目录
执行获取插件目录位置命令
d88ef1c2d3aa:/# rabbitmq-plugins directories -s
插件目录:
Plugin archives directory: /opt/rabbitmq/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@d88ef1c2d3aa-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins
3.将.ez文件放入插件目录
mv /var/lib/rabbitmq/rabbitmq_delayed_message_exchange-3.13.0.ez /opt/rabbitmq/plugins/
4.启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
管理界面
- 登录地址:ip:15672
- 账号密码:guest
查看交换机类型中是否有延迟消息类型
使用
- 创建获取连接工具类
<?php
namespace app\extend;
use think\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
// RabbitMq工具类
class RabbitMqUtil
{
// 连接
private static $connection = null;
/**
* 获取连接
*
* @throws Exception
*/
public static function getConnection(): ?AMQPStreamConnection
{
if (self::$connection) {
return self::$connection;
}
try {
self::$connection = new AMQPStreamConnection('172.17.0.2', 5672, 'guest', 'guest');
} catch (\Exception $e) {
throw new Exception($e->getMessage());
}
return self::$connection;
}
// 关闭连接
public static function closeConnection()
{
if (self::$connection) {
self::$connection->close();
}
}
}
- 生产者
use app\extend\RabbitMqUtil;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;
// 发送消息
public function sendMsg()
{
// 发消息
try {
// 获取连接
$connection = RabbitMqUtil::getConnection();
// 获取管道
$channel = $connection->channel();
// 延迟交换机名
$exchangeName = 'MyDelayedChange';
// 延迟交换机参数
$arguments = new AMQPTable([
// 内部使用的交换机类型 direct
'x-delayed-type' => 'direct',
]);
// 创建延迟交换机
$channel->exchange_declare($exchangeName, 'x-delayed-message', false, true, false, false, false, $arguments);
// 队列名
$queue = 'order_queue';
// 创建队列,幂等性,多次声明不会重复创建
$channel->queue_declare($queue, false, true, false, false);
// routeKey
$routeKey = 'order_queue';
// 绑定队列到交换机
$channel->queue_bind($queue, $exchangeName, $routeKey);
// 发送的消息
$messageBody = 'this is order';
$msg = new AMQPMessage($messageBody, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 设置5秒延迟
$headers = new AMQPTable(['x-delay' => 5000]);
// 设置消息头
$msg->set("application_headers", $headers);
// 发布消息
$channel->basic_publish($msg, $exchangeName, $routeKey);
// 关闭连接
$channel->close();
$connection->close();
} catch (\Exception $e) {
}
echo "发送OK";
}
- 消费
use think\Exception;
use app\extend\RabbitMqUtil;
// 接收消息
protected function execute()
{
$connection = null;
$channel = null;
try {
$connection = RabbitMqUtil::getConnection();
$channel = $connection->channel();
// 声明一个队列 幂等性,不会重复创建
$queue = 'order_queue';
$channel->queue_declare($queue, false, true, false, false);
$callback = function($msg) {
echo '订单数据';
echo $msg->getBody(), "\n";
// 消费完成ack
$msg->ack();
};
// 调度 不能都给你一个消费者
$channel->basic_qos(0, 1, false);
$channel->basic_consume($queue, '', false, false, false, false, $callback);
try {
$channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
} catch (Exception $e) {
dump('连接异常');
dump($e->getMessage());
}
$channel->close();
$connection->close();
}
标签:插件,false,rabbitmq,queue,connection,RabbitMQ,使用,PHP,channel
From: https://www.cnblogs.com/zhuo512/p/18521253