mandatory——处理不可路由消息
在使用Basic.Publish
发送一条消息并携带参数mandatory=True
时,当消息是不可路由的时,RabbitMQ会发回一个Basic.Return
方法帧。
不可路由消息是指交换机无法通过指定的路由键将消息映射到具体的队列上,也就是路由键不正确
import rabbitpy
conn = rabbitpy.Connection()
with conn.channel() as channel:
content = 'hello'
message = rabbitpy.Message(channel, content, {'content_type': 'text/plain'})
# 尝试通过`example.exchange`路由到`notexists.routingkey`,该路由将失败
message.publish('example.exchange', 'notexists.routingkey', mandatory=False)
目前,我们的mandatory=False
,运行代码,什么也不会发生。
将它改为True
,抛出了以下异常:
这是
rabbitpy
库的处理方式,该库自动监听Basic.Return
方法帧,当接收到后自动抛出一个异常,在使用其它的库时,Basic.Return
与Basic.Publish
的异步性会更好的体现
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionFactory.localGuestConnection();
Channel channel = connection.createChannel();
// 注册ReturnListener监听Basic.Return方法帧
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return aReturn) {
System.out.println("Basic.Return received!");
System.out.println(aReturn.getRoutingKey());
System.out.println(aReturn.getExchange());
System.out.println(aReturn.getBody());
}
});
// 发布,并设置mandatory=true
channel.basicPublish("example.exchange", "notexists.routingkey", true,
new AMQP.BasicProperties.Builder().contentType("text/plain").build(),
"hello".getBytes());
}
结果:
发布者确认
发布者确认(publisher-confirm)是RabbitMQ对AMQP规范的增强。
发布者先发送Confirm.Select
方法帧给MQ,MQ回复Confirm.SelectOk
方法帧。在那之后,服务器对于该发布者发布的每条消息都给予Basic.Ack
回复或Basic.Nack
回复。
官方文档里的说法
emmm,关于Basic.Ack
和Basic.Nack
什么情况下被RabbitMQ返回,如何确认确认信息属于哪个消息,这些问题在这本书上、各种帖子上、各种库的使用文档中、网课教程上有着各种各样的说法,不如我们来看看官方文档怎么说,然后再按照官方文档进行一下实践。为了避免翻译出现错误,我会在每一个翻译的段落下面贴上原文。
只要通道处于confirm
模式,broker和客户端就都开始对消息进行计数(首次confirm.select
时从1开始)。然后,当broker处理消息时,它就通过在同一个通道上发送一个basic.ack
来确认这条消息,delivery-tag
属性包含了被确认消息的序列号,broker也可以通过设置basic.ack
的multiple
属性来表示序列号以及之前的消息都已经被处理过了。
Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 on the first confirm.select). The broker then confirms messages as it handles them by sending a basic.ack on the same channel. The delivery-tag field contains the sequence number of the confirmed message. The broker may also set the multiple field in basic.ack to indicate that all messages up to and including the one with the sequence number have been handled.
在broker无法成功的处理消息的特殊情况下,broker会发送一个basic.nack
来代替basic.ack
。在这个上下文中,basic.nack
的属性和basic.ack
中相应的属性有着相同的含义,并且requeue
属性应该被忽略。通过对一个或多个消息发送nack
,broker说明了它无法处理这些消息并且拒绝承担责任。这时,客户端可以选择重新发布这些消息。
In exceptional cases when the broker is unable to handle messages successfully, instead of a basic.ack, the broker will send a basic.nack. In this context, fields of the basic.nack have the same meaning as the corresponding ones in basic.ack and the requeue field should be ignored. By nack'ing one or more messages, the broker indicates that it was unable to process the messages and refuses responsibility for them; at that point, the client may choose to re-publish the messages.
在通道处于confirm
模式时,所有后续被发布的消息都会被确认(basic.ack
)或nack一次,不保证一个消息被确认的速度,不会有消息被确认并且被nack。
After a channel is put into confirm mode, all subsequently published messages will be confirmed or nack'd once. No guarantees are made as to how soon a message is confirmed. No message will be both confirmed and nack'd.
basic.nack
只会在负责一个队列的Erlang进程发生一个内部错误时被传送。
basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue.
对于无法路由的消息,一旦路由器发现一个消息无法路由到任何队列(返回一个空的队列列表)broker将提交一次确认。如果消息设置了mandatory
,basic.return
会在basic.ack
前发送到客户端。(这其中的顺序)对于否定确认(basic.nack
)也是一样的。
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).
对于可路由的消息,basic.ack
将在一个消息已经被所有队列接收的情况下被发送到客户端。对于路由到持久队列的persistent
消息,意味着要将它持久化到硬盘(basic.ack
才会被发送);对于quorum
队列,这意味着一个quorum
副本已经接收并且对当前leader确认了这条消息(basic.ack
才会被发送)。
For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For quorum queues, this means that a quorum replicas have accepted and confirmed the message to the elected leader.
官方文档总结
从官方文档中,我们可以看到,不管各种各样的第三方库提供了怎样的API,对于RabbitMQ来说:
basic.nack
只会在RabbitMQ的Erlang进程出现内部错误时被传送basic.ack
会在其它所有情况下被传送,不管消息是否可路由basic.return
会比confirm消息更先被发送basic.ack
和basic.nack
使用delivery-tag
参数来标识该确认对应的消息
还有一点要提,就是在《深入RabbitMQ》这篇书上的一句:当消息传递到不存在的交换机时,RabbitMQ会直接关闭发布消息使用的通道
rabbitpy的测试
代码如下:
import rabbitpy
conn = rabbitpy.Connection()
try:
with conn.channel() as channel:
channel.enable_publisher_confirms()
content = 'hello'
message = rabbitpy.Message(channel, content, {'content_type': 'text/plain'})
acked = message.publish('example.exchange', 'example.routingkey', mandatory=True)
print('return [%s]' % 'ack' if acked else 'nack')
except rabbitpy.exceptions.MessageReturnedException as e:
print('Basic.Return')
正常发布时ack
:
路由失败时return
:
在路由失败时,理论来说是应该返回
Basic.Ack
,而Basic.Return
会比Basic.Ack
先到达,而rabbitpy
处理Basic.Return
的方式是抛出异常,所以我们无法看到下面对ack
还是nack
的打印。下面我们可以关掉mandatory
再来看下。
路由失败,关闭mandatory
时ack
:
尝试向不存在的交换机发布消息通道关闭
:
这里抛出了异常,通道被关闭,而且从RabbitMQ的日志也能看出
Java org.rabbitmq.amqp-client:5.2.0
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionFactory.localGuestConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.basicPublish("example.exchange", "example.routingkey",
true,
new AMQP.BasicProperties.Builder().contentType("text/plain").build(),
"hello".getBytes());
boolean acked = channel.waitForConfirms();
System.out.println(acked);
}
结果和rabbitpy
是一致的,不做分析。对于waitForConfirms
的性能问题以及怎么优化,超过了本篇文章的讨论范畴。
Spring-AMQP
令我最疑惑的就是这个库,因为前几天看黑马程序员的SpringCloud网课,其中讲到的publisher-confirm
和RabbitMQ的官方文档所述的完全不一致,可能是这个库进行了一些自定义的高度封装导致的。
这次测试咱不测Basic.Return
,因为这个库中对于这个东西的行为和官方的一致。
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String message = "hello, spring amqp!";
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
correlationData.getFuture().addCallback(
(CorrelationData.Confirm result) -> {
System.out.println("Result " + ( result.isAck() ? "ACK" : "NACK"));
},
(Throwable ex) -> {
log.warn("发送时抛出异常...");
}
);
rabbitTemplate.convertAndSend("example.exchange", "example.routingkey", message, correlationData);
Thread.sleep(1000);
}
向正确的交换机和Routingkey发布消息:
向正确的交换机和错误的Routingkey发布消息:
向不存在的交换机发布消息:
问题出现了,SpringAMQP中返回Nack,但从RabbitMQ的日志来说,它和以前并没什么不同,也是错误并且关闭了通道连接。
这个库是咋做的?可以看到它在close
中调用了shutdownComplete
,然后又调用了processAck
,将ack
设为了false,也就是Nack。我们知道当往不存在的路由器中发送消息时,Channel是会被关闭的,所以,就是经过了这么一系列的调用,最终我们收到了Nack。
正是有了这个原因,SpringAMQP中的这个nack消息可以用来判定消息是否到达路由器。
标签:可靠性,ack,nack,Basic,RabbitMQ,深入,消息,basic,channel From: https://www.cnblogs.com/lilpig/p/16649709.html