首页 > 其他分享 >Day10_09_消息队列之RabbitMQ消息可靠性详解

Day10_09_消息队列之RabbitMQ消息可靠性详解

时间:2022-12-23 15:37:20浏览次数:39  
标签:09 确认 System RabbitMQ Day10 消息 println message out


RabbitMQ消息可靠性详解

一. 消息发送确认 与 消息接收确认(ACK)

默认情况下如果一个 Message 被消费者正确接收则会被从 Queue 中移除.如果一个 Queue 没被任何消费者订阅消费,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除.

二. 消息发送确认

1. 发送的消息怎么样才算失败或成功?如何确认?

当消息无法发送到交换机,或者从交换机无法发送到队列,确认消息路由失败.对于持久化队列意味着写入磁盘,对于镜像队列意味着所有镜像接收成功.

1.1 ConfirmCallback

通过实现 ConfirmCallback 接口,消息发送到 交换机 后触发回调.

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
}

@Override
public void confirm(CorrelationData correlationData, boolean ack,String cause) {
System.out.println("消息唯一标识:"+correlationData);
System.out.println("确认结果:"+ack);
System.out.println("失败原因:"+cause);
}
}

1.2 在配置文件添加配置:

spring:
rabbitmq:
publisher-confirms: true

1.3 ReturnCallback

通过实现 ReturnCallback 接口,当消息从交换机无法路由到队列时,启动消息失败返回,触发该回调方法.

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback
}

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息主体 message : "+message);
System.out.println("消息主体 message : "+replyCode);
System.out.println("描述:"+replyText);
System.out.println("消息使用的交换器 exchange : "+exchange);
System.out.println("消息使用的路由键 routing : "+routingKey);
}
}

1.4 在配置文件添加配置:

spring:
rabbitmq:
publisher-returns: true

三. 消息接收确认

1. 消息消费者如何通知 RabbitMQ 消息消费成功?

  • 消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK.
  • 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能.如果消费端消费逻辑抛出异常,也就是消费端没有成功处理这条消息,那么就相当于丢失了消息.
  • 如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失.
  • 手动确认是消费者调用 ack、nack、reject 几种方法进行确认.手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者.
  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限.
  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟.

2. 消息确认模式

AcknowledgeMode.NONE: 自动确认;
AcknowledgeMode.AUTO: 根据情况确认;
AcknowledgeMode.MANUAL:手动确认.

2.1 手动确认消息(局部方法处理消息)

默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual.

spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual

或在 RabbitListenerContainerFactory 中进行开启手动 ack.

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
return factory;
}

2.2 确认消息

@RabbitHandler
public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
System.out.println(message);
try {
channel.basicAck(tag,false); // 确认消息
} catch (IOException e) {
e.printStackTrace();
}
}

需要注意的 basicAck 方法需要传递两个参数:

1️⃣.deliveryTag(唯一标识 ID): 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel , RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel.

2️⃣.multiple: 为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息.

2.3 手动否认、拒绝消息

发送一个 header 中包含 error 属性的消息.


Day10_09_消息队列之RabbitMQ消息可靠性详解_抛出异常

消费者获取消息时检查到头部包含 error 则 nack 消息.

@RabbitHandler
public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {
System.out.println(message);
if (map.get("error")!= null){
System.out.println("错误的消息");
try {
channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //否认消息
return;
} catch (IOException e) {
e.printStackTrace();
}
}
try {
channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //确认消息
} catch (IOException e) {
e.printStackTrace();
}
}

此时控制台重复打印,说明该消息被 nack 后一直重新进入队列然后一直重新消费.

hello
错误的消息
hello
错误的消息
hello
错误的消息
hello
错误的消息

也可以拒绝该消息,消息会被丢弃,不会重回队列.

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒绝消息

2.4 确认消息(全局处理消息)

自动确认涉及到一个问题就是如果在处理消息的时候抛出异常,消息处理失败,但是因为自动确认而导致 RabbitMQ 将该消息删除了,造成消息丢失.

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("consumer_queue"); // 监听的队列
container.setAcknowledgeMode(AcknowledgeMode.NONE);// NONE 代表自动确认
container.setMessageListener((MessageListener) message -> { //消息监听处理
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
//相当于自己的一些消费逻辑抛错误
throw new NullPointerException("consumer fail");
});
return container;
}

2.5 手动确认消息

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("consumer_queue"); // 监听的队列
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { //消息处理
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
if(message.getMessageProperties().getHeaders().get("error") == null){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息已经确认");
}else {
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息拒绝");
}

});
return container;
}

AcknowledgeMode 除了 NONE 和 MANUAL 之外还有 AUTO,它会根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue).

如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认;

当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列);

当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认;

其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的).可以通过 setDefaultRequeueRejected(默认是true)去设置.

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("consumer_queue"); // 监听的队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 根据情况确认消息
container.setMessageListener((MessageListener) (message) -> {
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
//抛出NullPointerException异常则重新入队列
//throw new NullPointerException("消息消费失败");
//当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,
//则消息会被拒绝,且requeue=false
//throw new AmqpRejectAndDontRequeueException("消息消费失败");
//当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
throw new ImmediateAcknowledgeAmqpException("消息消费失败");
});
return container;
}

四. 消息可靠性总结

1. 持久化

1️⃣.exchange要持久化;

2️⃣.queue要持久化;

3️⃣.message要持久化.

2. 消息确认

1️⃣.启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去);

2️⃣.生产者和Server(broker)之间的消息确认;

3️⃣.消费者和Server(broker)之间的消息确认.

 

标签:09,确认,System,RabbitMQ,Day10,消息,println,message,out
From: https://blog.51cto.com/u_7044146/5965783

相关文章

  • Day10_11_消息队列之保证RabbitMQ全链路数据完全不丢失
    保证RabbitMQ全链路数据完全不丢失一.消息可靠性概述1.消息生命周期过程一条消息从创建到最终被消费掉,也就是从生产端到消费端最终被消费掉大致上要经过3个步骤:1️⃣.生产......
  • Day10_16_消息队列之RocketMQ
    RocketMQ一.RocketMQ简介1.概述消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:削峰填谷: 主要解决瞬时写压力大......
  • SpringBoot2.x系列教程09--新纪元之SpringBoot原理探究(重点)
    SpringBoot系列教程09--新纪元之SpringBoot原理探究(重点)作者:一一哥一.SpringBoot工作原理概述Springboot应用程序采用各种Starters启动器,入口类是包含​​@SpringBootA......
  • Spring Security系列教程09--基于默认数据库模型实现授权
    前言在上一个章节中,一一哥给大家讲解了如何基于内存模型来实现授权,在这种模型里,用户的信息是保存在内存中的。你知道,保存在内存中的信息,是无法持久化的,也就是程序一旦关闭,......
  • AcWing341. 洛谷P1073, NOIP2009 最优贸易
    AcWing题目传送门洛谷题目传送门题目大意\(~~~~~~\)一个投机倒把的奸商想要通过城市不太健全的贸易系统坑点钱,任意城市都可以买入或者卖出水晶球,他想尽量在便宜的城市买......
  • day39_0098.验证二叉搜索树
    0098.验证二叉搜索树classSolution{public:boolisValidBST(TreeNode*root){if(root==NULL)returnfalse;if(root->left){......
  • RabbitMQ、RocketMQ、Kafka延迟队列实现
    延迟队列在实际项目中有非常多的应用场景,最常见的比如订单未支付,超时取消订单,在创建订单的时候发送一条延迟消息,达到延迟时间之后消费者收到消息,如果订单没有支付的话,那么......
  • 【《硬件架构的艺术》读书笔记】09 电磁兼容性能设计指南
     9.1简介电子线路易于接收来自其他发射器的辐射信号,这些EMI(电磁干扰)使得设备内毗邻的元件不能同时工作。这就有必要进行电磁兼容设计以避免系统内有害的电磁干扰。确保......
  • 设置发布和订阅消息的 RabbitMQ AMQP 服务器
    本指南将引导您完成设置发布和订阅消息的RabbitMQAMQP服务器的过程,并创建一个Spring引导应用程序以与该RabbitMQ服务器进行交互。您将构建什么您将构建一个应用程序,......
  • 微服务异步通讯——RabbitMQ消息队列复习笔记
    服务异步通讯——RabbitMQ复习随笔微服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像发邮件,不需要马上回复。两种方式各有优劣,打电话......