Publisher确认是一个RabbitMQ扩展,用于实现可靠的发布。当发布者确认在通道上启用时,客户端发布的消息将由代理异步确认,这意味着它们已在服务器端得到处理。
概述
在本教程中,我们将使用发布者确认来确保发布的消息已安全到达代理。我们将介绍使用出版商确认的几种策略,并解释它们的优缺点。
在频道上启用发布者确认
Publisher确认是AMQP 0.9.1协议的RabbitMQ扩展,因此默认情况下未启用。使用confirm_select方法在通道级别启用发布者确认:
$channel = $connection->channel(); $channel->confirm_select();
必须在您希望使用publisher confirms的每个通道上调用此方法。确认应该只启用一次,而不是对发布的每条消息都启用。
策略#1:单独发布消息
让我们从使用confirms发布的最简单方法开始,即发布消息并同步等待其确认:
在前面的示例中,我们像往常一样发布消息,并使用 $channel::wait_forpending_acks(int|float) 方法等待其确认。消息确认后,该方法立即返回。如果消息未在超时内得到确认,或者消息被发送(意味着代理因某种原因无法处理它),则该方法将抛出异常。异常的处理通常包括记录错误消息和/或重试发送消息。
不同的客户端库有不同的方式来同步处理发布者确认,因此请务必仔细阅读您正在使用的客户端的文档。
这种技术非常简单,但也有一个主要缺点:它会显著减慢发布速度,因为消息的确认会阻止所有后续消息的发布。这种方法不会提供每秒超过几百条已发布消息的吞吐量。然而,对于某些应用来说,这已经足够好了。
发布者确认异步吗?
我们在开始时提到,代理异步确认已发布的消息,但在第一个示例中,代码会同步等待,直到消息得到确认。客户端实际上异步接收确认,并相应地解除对 wait_for_pending_acks 的调用。将 wait_for_pending_acks 视为一个同步助手,它在后台依赖于异步通知
策略#2:批量发布消息
为了改进我们之前的示例,我们可以发布一批消息并等待整批消息得到确认。以下示例使用一批100:
$batch_size = 100; $outstanding_message_count = 0; while (thereAreMessagesToPublish()) { $data = ...; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'exchange'); $outstanding_message_count++; if ($outstanding_message_count === $batch_size) { $channel->wait_for_pending_acks(5.000); $outstanding_message_count = 0; } } if ($outstanding_message_count > 0) { $channel->wait_for_pending_acks(5.000); }
等待一批消息得到确认比等待单个消息的确认大大提高了吞吐量(对于远程RabbitMQ节点,可达20-30倍)。一个缺点是,我们不知道在失败的情况下到底出了什么问题,所以我们可能不得不在内存中保存一整批数据来记录有意义的内容或重新发布消息。这个解决方案仍然是同步的,因此它阻止了消息的发布。
策略3:异步处理发布者确认
代理异步确认已发布的消息,只需在客户端注册一个回调,即可收到这些确认的通知:
$channel = $connection->channel(); $channel->confirm_select(); $channel->set_ack_handler( function (AMQPMessage $message){ // code when message is confirmed } ); $channel->set_nack_handler( function (AMQPMessage $message){ // code when message is nack-ed } );
有2个回调:一个用于确认消息,一个用于nack-ed消息(可以被代理视为丢失的消息)。每个回调都有 AMQPMessage$message 参数和返回的消息,因此您不需要处理序列号(传递标签)来了解此回调属于哪个消息。
概述
在某些应用程序中,确保发布的消息到达代理可能是至关重要的。Publisher确认,RabbitMQ功能有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有明确的方法来实现发布者确认,这通常归结为应用程序和整个系统中的约束。典型的技术有:
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,等待批量同步确认:简单、合理的吞吐量,但很难推断出什么时候出了问题。
- 异步处理:性能最佳,资源利用率高,发生错误时控制良好,但可以正确实施。
官方链接:https://www.rabbitmq.com/tutorials/tutorial-seven-php
标签:教程,队列,确认,RabbitMQ,发布,消息,发布者,message,channel From: https://www.cnblogs.com/dreamboycx/p/18644025