RabbitMQ(四)RabbitMQ的确认机制
- 保证消息不丢失、可靠抵达,可以使用
事务消息
,但性能会下降250倍,因此引入确认机制
:publisher
:Confirm Callback确认模式
publisher
:return CallBack
未投递到queue退回模式consumer
:ack机制
1 发送端的确认机制
Confirm Callback
-
Confirm Callback
是RabbitMQ
的发送端确认机制,主要是在Broker
(消息代理)收到消息的时候执行的回调 -
Spring.rabbitmq.publisher-confirms=true
- 编码是通过在创建
connectionFactory
的时候设置PublisherConfirms(true)
选项开启confirmcallback
CorrelationData
:用来表示当前消息的唯一性
- 编码是通过在创建
首先配置RabbitMQ:
spring:
datasource:
username: root
password: root
url: jdbc:mysql://192.168.60.3:3306/grainmall_oms
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
# 开启发送端消息抵达队列的回调
publisher-returns: true
# 抵达队列后以异步方式优先回调
template:
mandatory: true
# 开启发送端消息抵达broker的通知
publisher-confirm-type: correlated
在配置类自定义Confirm CallBack
消息回调:
@PostConstruct
标识在配置类对象创建成功的时候执行此注解标注的方法setConfirmCallback
的参数主要有:correlationData
当前消息的唯一关联数据(消息的唯一id)ack
消息是否被broker接收到s
失败的原因
@Configuration
public class RabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@PostConstruct
public void initRabbitConfirmCallBack() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要broker收到消息,就会触发这个回调
* @param correlationData 当前消息的唯一关联数据(消息的唯一id)
* @param ack 消息是否被broker接收到
* @param s 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
System.out.println("confirm: " + correlationData + " " + ack + " " + s);
}
});
}
}
测试成功和失败的回调分别为:
confirm: null true null
confirm: null false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'java-exchange1' in vhost '/', class-id=60, method-id=40)
Return Callback
-
配置
rabbitmq-returns:true
spring: datasource: username: root password: root url: jdbc:mysql://192.168.60.3:3306/grainmall_oms driver-class-name: com.mysql.cj.jdbc.Driver rabbitmq: # 开启发送端消息抵达队列的回调 publisher-returns: true # 抵达队列后以异步方式优先回调 template: mandatory: true
-
Confirm模式只能保证消息到达broker,不能保证消息准确投递到目标queue中,这需要使用return模式实现
-
即如果消息未能够投递到
目标queue
里将调用returnCallBack
,可以记录下详细的投递记录、定期的巡检或者自动纠错都需要的数据
@Configuration
public class RabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@PostConstruct
public void initRabbitConfirmCallBack() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要broker收到消息,就会触发这个回调
* @param correlationData 当前消息的唯一关联数据(消息的唯一id)
* @param ack 消息是否被broker接收到
* @param s 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
System.out.println("confirm: " + correlationData + " " + ack + " " + s);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有传递给指定的队列,就会触发这个回调
* @param message 投递失败的消息的详细信息
* @param i 回复的状态码
* @param s 回复的消息文本
* @param s1 这个消息是交给哪个交换机
* @param s2 用的交换机的那个route-key
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("return: " + message + " " + i + " " + s + " " + s1 + " " + s2);
}
});
}
}
测试写错route-key的情况下的return callback
return: (Body:'{"id":null,"memberId":null,"orderSn":"94c9d88c-9ffb-41fd-8071-4f85fe16fc0e","couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":null,"billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={__TypeId__=com.hikaru.grainmall.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 312 NO_ROUTE java-exchange java-binding1
为消息添加唯一Id
- rabbitTemplate的最后一个参数可以为:
CorrelationData
:标识其唯一性id
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "num", defaultValue = "10") Integer num) {
for(int i = 0; i < num; i++) {
if(i % 2 == 0) {
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("Utada Hikaru" + i);
rabbitTemplate.convertAndSend("java-exchange",
"java-binding",
orderReturnReasonEntity,
new CorrelationData(UUID.randomUUID().toString()));
} else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("java-exchange",
"java-binding1",
orderEntity,
new CorrelationData(UUID.randomUUID().toString()));
}
}
return "ok";
}
本地事务表保证事务的可靠性抵达(怎么判断哪些消息没有抵达)
-
在向MQ发送消息的时候,还可以将消息保存到数据库中,使用其唯一id为主键进行记录
-
如果服务端收到了
ConfirmCallBack
,则表明消息被收到 -
则只需要遍历数据库表即可知道哪些没有收到了
2 接受端的ack确认机制
消费者获取到消息,成功处理后可以回复ack给broker,方式有:
basic.ack
用于肯定确认,broker将移除此消息basic.nack
用于否定确认,可以指定broker是否丢弃该消息(即重新入队),可以批量basic.reject
用于否定确认,可以指定broker是否丢弃该消息,但不可以批量
默认情况下位自动ack,即消息被消费者收到之后,就会从broker中移除,会造成消息的丢失,而上述的手动ack在队列没有消费者的情况下,仍然会存储消息直到出现消费者进行消费
2.1 自动消息签收
一种是自动接收的(auto),也是默认的处理方式:即只要消息接收到,客户端消息处理的方法执行完毕就会自动确认,服务端就会移除这个消息。但是这种模式存在的问题是会导致消息的丢失,比如我们向消息队列发送五个消息,在消息处理的地方加一个端点:
然后放行一个消息:
最后终止客户端,模拟宕机,然后发现消息丢失
2.2 手动消息签收
另一种是手动处理(manual),即消息队列接收到消息后,只要没有明确告诉MQ消息被签收,也就是没有ack
,消息就一直是unacked
的状态,当接收方宕机重启后,就会重新开始处理这些消息,因此不会造成消息的丢失
此时中断服务器,接收端宕机,消息均变为Ready状态,下次宕机重启后会继续执行:
那么接收端如何手动签收消息?
接收方通过向MQ发送ack
表示手动签收了消息,这个ack包括:
deliveryTag
:存在于接收方收到的消息的消息头中,表示消息的唯一标识multiple
:是否开启批量签收
代码为:
@RabbitHandler
public void receiveMessage1(Message message,
OrderReturnReasonEntity returnReasonEntity,
Channel channel) throws InterruptedException {
System.out.println("收到消息:" + returnReasonEntity.getName());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("签收了" + deliveryTag);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
2.3 手动消息拒签
-
channel.basicNack(deliveryTag, multiple, requeue)
表示拒签消息,其中参数:deliveryTag
:存在于接收方收到的消息的消息头中,表示消息的唯一标识multiple
:是否开启批量签收requeue
:签收后是否重新入队
requeue为true的话,等同于一直既不签收也不拒签的情况
@RabbitHandler
public void receiveMessage1(Message message,
OrderReturnReasonEntity returnReasonEntity,
Channel channel) throws InterruptedException {
System.out.println("收到消息:" + returnReasonEntity.getName());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
if(deliveryTag % 2 == 0) {
System.out.println("签收了" + deliveryTag);
channel.basicAck(deliveryTag, false);
} else {
System.out.println("没有签收" + deliveryTag);
channel.basicNack(deliveryTag, false, false);
}
} catch (IOException e) {
e.printStackTrace();
}
}
标签:ack,确认,broker,RabbitMQ,deliveryTag,消息,机制,null,public
From: https://www.cnblogs.com/tod4/p/17574414.html