在前面的教程中,我们构建了一个简单的日志系统。我们能够向许多接收器广播日志消息。
在本教程中,我们将为其添加一个功能——我们将使仅订阅消息的一个子集成为可能。例如,我们将能够仅将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定
在前面的示例中,我们已经创建了绑定。你可能会想起这样的代码:
$channel->queue_bind($queue_name, 'logs');
绑定是交换和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。
绑定可以接受额外的routing_key参数。为了避免与$channel::basic_publish参数混淆,我们将其称为绑定键。这就是我们如何使用密钥创建绑定:
$binding_key = 'black'; $channel->queue_bind($queue_name, $exchange_name, $binding_key);
绑定密钥的含义取决于交换类型。我们之前使用的扇出交易所完全忽略了它的价值。
直接交换
我们上一教程中的日志系统将所有消息广播给所有消费者。我们希望扩展该功能,以允许根据消息的严重性对其进行过滤。例如,我们可能希望将日志消息写入磁盘的脚本只接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。
我们使用的是扇出交换,这并没有给我们太多的灵活性——它只能进行无意识的广播。
我们将使用直接交换。直接交换背后的路由算法很简单——消息会进入队列,队列的绑定密钥与消息的路由密钥完全匹配。
为了说明这一点,请考虑以下设置:
在这个设置中,我们可以看到直接交换X绑定了两个队列。第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个绑定键黑色,另一个绑定绿色。
在这种设置中,使用橙色路由密钥发布到交换机的消息将被路由到队列Q1。路由键为黑色或绿色的邮件将转到Q2。所有其他消息都将被丢弃。
多重绑定
使用相同的绑定密钥绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键black在X和Q1之间添加绑定。在这种情况下,直接交换将表现得像扇出,并将消息广播到所有匹配的队列。路由密钥为黑色的消息将同时发送到Q1和Q2。
排放日志
我们将把这个模型用于我们的日志系统。我们将向直接交换机发送消息,而不是扇出。我们将提供日志严重性作为路由密钥。这样,接收脚本将能够选择它想要接收的严重性。让我们先关注日志的发布。
一如既往,我们需要先创建一个交易所:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
我们已准备好发送信息:
$channel->exchange_declare('direct_logs', 'direct', false, false, false); $channel->basic_publish($msg, 'direct_logs', $severity);
为了简化,我们假设“严重性”可以是信息、警告或错误之一
订阅
接收消息的工作方式与上一教程相同,但有一个例外——我们将为我们感兴趣的每个严重性创建一个新的绑定。
foreach ($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); }
一起执行
emit_log_direct.php类的代码:
1 <?php 2 3 require_once __DIR__ . '/vendor/autoload.php'; 4 use PhpAmqpLib\Connection\AMQPStreamConnection; 5 use PhpAmqpLib\Message\AMQPMessage; 6 7 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 8 $channel = $connection->channel(); 9 10 $channel->exchange_declare('direct_logs', 'direct', false, false, false); 11 12 $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info'; 13 14 $data = implode(' ', array_slice($argv, 2)); 15 if (empty($data)) { 16 $data = "Hello World!"; 17 } 18 19 $msg = new AMQPMessage($data); 20 21 $channel->basic_publish($msg, 'direct_logs', $severity); 22 23 echo ' [x] Sent ', $severity, ':', $data, "\n"; 24 25 $channel->close(); 26 $connection->close();
receive_logs_direct.php的代码:
1 <?php 2 3 require_once __DIR__ . '/vendor/autoload.php'; 4 use PhpAmqpLib\Connection\AMQPStreamConnection; 5 6 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 7 $channel = $connection->channel(); 8 9 $channel->exchange_declare('direct_logs', 'direct', false, false, false); 10 11 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); 12 13 $severities = array_slice($argv, 1); 14 if (empty($severities)) { 15 file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); 16 exit(1); 17 } 18 19 foreach ($severities as $severity) { 20 $channel->queue_bind($queue_name, 'direct_logs', $severity); 21 } 22 23 echo " [*] Waiting for logs. To exit press CTRL+C\n"; 24 25 $callback = function ($msg) { 26 echo ' [x] ', $msg->getRoutingKey(), ':', $msg->getBody(), "\n"; 27 }; 28 29 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); 30 31 try { 32 $channel->consume(); 33 } catch (\Throwable $exception) { 34 echo $exception->getMessage(); 35 } 36 37 $channel->close(); 38 $connection->close();
如果你只想将“警告”和“错误”(而不是“信息”)日志消息保存到文件中,只需打开控制台并键入:
php receive_logs_direct.php warning error > logs_from_rabbit.log
如果您想在屏幕上看到所有日志消息,请打开一个新的终端并执行以下操作:
php receive_logs_direct.php info warning error # => [*] Waiting for logs. To exit press CTRL+C
例如,要发出错误日志消息,只需键入:
php emit_log_direct.php error "Run. Run. Or it will explode." # => [x] Sent 'error':'Run. Run. Or it will explode.'
继续学习教程5,了解如何根据模式监听消息
官方链接:https://www.rabbitmq.com/tutorials/tutorial-four-php
标签:教程,false,logs,绑定,direct,RabbitMQ,channel,日志,路由 From: https://www.cnblogs.com/dreamboycx/p/18643619