首页 > 其他分享 >RabbitMQ 可靠投递

RabbitMQ 可靠投递

时间:2023-03-27 14:11:07浏览次数:38  
标签:shovel RabbitMQ 可靠 queue 消息 rabbit 投递

RabbitMQ 可靠投递

标签: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投递


  • 背景
  • confirmCallback 确认模式
  • returnCallback 未投递到 queue 退回模式
  • shovel-plugin 跨机房可靠投递

背景

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两个选项用来控制消息的投递可靠性模式。

rabbitmq 整个消息投递的路径为:
producer->rabbitmq broker cluster->exchange->queue->consumer

message 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback 。
message 从 exchange->queue 投递失败则会返回一个 returnCallback 。我们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。

confirmCallback 确认模式

在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息发送失败!" + cause + data.toString());
        } else {
            log.info("消息发送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

我们来看下 ConfirmCallback 接口。

public interface ConfirmCallback {

		/**
		 * Confirmation callback.
		 * @param correlationData correlation data for the callback.
		 * @param ack true for ack, false for nack
		 * @param cause An optional cause, for nack, when available, otherwise null.
		 */
		void confirm(CorrelationData correlationData, boolean ack, String cause);

	}

重点是 CorrelationData 对象,每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。

发送的时候创建一个 CorrelationData 对象。

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

这里将 user ID 设置为当前消息 CorrelationData id 。当然这里是纯粹 demo,真实场景是需要做业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对账。

消息只要被 rabbitmq broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback

被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。

returnCallback 未投递到queue退回模式

confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。

同样创建 ConnectionFactory 到时候需要设置 PublisherReturns(true) 选项。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//开启return模式
rabbitTemplate.setMandatory(true);//开启强制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

shovel-plugin 跨机房可靠投递

RabbitMQ 在跨机房集成提供了一个不错的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受机房之间的网络断开、机器下线等不稳定因素。

这里有两个 broker :

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

我们希望将发送给 rabbit_node1 plen.queue 的消息传输到 rabbit_node2 plen.queue 中。我们先开启 rabbit_node1 的 shovel-plugin

先看下当前 RabbitMQ 版本是否安装了 shovel-plugin,如果有的话直接开启。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

然后就可以在 Admin 面板里看到这个设置选项,怎么设置这里就不介绍了。主要就是配置下 amqp 协议地址,amqp://user:password@server-name/my-vhost 。

如果配置没有问题的话,应该是这样的一个状态,说明已经顺利连接到 rabbit_node2 broker 。


我们来看下 rabbit_node1 和 rabbit_node2 的 Connections 面板。
rabbit_node1(10.211.55.3):

rabbit_node2(10.211.55.4):

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 创建了两个 tcp 连接,端口 39544 连接是用来消费 plen.queue 里的消息,端口 55706 连接是用来推送消息给 rabbit_node2 。

我们来看下 rabbit_node1 tcp 连接状态:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

rabbit_node2 tcp 连接状态:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

为了验证 shovel-plugin 稳定性,我们将 rabbit_node2 下线。

然后再发送消息,发现消息会现在 rabbit_node1 plen.queue 里待着,一旦 shovel-plugin 连接恢复将消费 rabbit_node1 plen.queue 消息,然后投递给 rabbit_node2 plen.queue 。

 

标签:shovel,RabbitMQ,可靠,queue,消息,rabbit,投递
From: https://www.cnblogs.com/csnjava/p/17261372.html

相关文章

  • rabbitMq的简单统计
    1.TTL消息队列的超时时间设置使用场景介绍---购物付款在指定时间内进行,超过某个时间就会取消(取消后的队列就会加入到死信队列中)通过TTL特有的参数进行插入,设置x-mess......
  • RabbitMQ 04 直连模式-Java操作
    使用Java原生的方式使用RabbitMQ现在已经较少,但这是基础,还是有必要了解的。引入依赖。<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-clien......
  • RabbitMQ 05 直连模式-Spring Boot操作
    SpringBoot操作SpringBoot集成RabbitMQ是现在主流的操作RabbitMQ的方式。官方文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/引入依赖。<d......
  • 设计模式-用代理模式(Proxy Pattern)来拯救你的代码:打造可靠的程序设计
    前言设计模式是一种高级编程技巧,也是一种通用的解决方案。它能在不同的应用场景中使用,它可以提高代码的可读性、可复用性和可维护性。设计模式的学习能提高我们的编程能力......
  • RabbitMQ快速入门与详解
    一、RabbitMQ简介1.简介 RabbitMQ是一个开源的消息代理(MessageBroker)软件,实现了高级消息队列协议(AMQP),支持多种消息传递模式,例如点对点、订阅/发布等。 RabbitMQ的核......
  • 重置RabbitMQ用户密码
    在/usr/sbin下执行rabbitmqctlset_user_tags用户名用户权限[root@rzksbin]#pwd/usr/sbin[root@rzksbin]#rabbitmqctlset_user_tagsxxxadministratorSet......
  • 如何保障消息中间件100%消息投递成功?如何保证消息幂等性?
    我们应该都听说够消息中间件MQ,如:RabbitMQ,RocketMQ,Kafka等。引入中间件的好处可以起到抗高并发,削峰,业务解耦的作用。如图:(1)订单服务投递消息给MQ中间件(2)物流服务监听MQ中......
  • 全面了解 Redis 高级特性,实现高性能、高可靠的数据存储和处理
    目录高性能、高可用、高可扩展性的原理持久化RDB持久化AOF持久化持久化的配置RDB配置AOF配置持久化的恢复RDB的恢复AOF的恢复RDB和AOF的选择持久化对性能的影响数据的丢失......
  • RabbitMQ-消息丢失、重复、积压等解决方案
    高并发场景的分布式事务,我们采用柔性事务+可靠消息+最终一致性方案(异步确保型),可靠性是最重要的,那么如何保证消息的可靠性呢?一、消息丢失1、消息发送出去,由于网络问题没......
  • SD-WAN网络可靠性设计
    随着信息技术的快速发展和普及,企业对信息的依赖程度越来越高。网络作为信息传输的载体,企业对其可靠性的要求也越来越高,而建立一个可靠的网络系统是一项复杂且艰巨的工作。网......