首页 > 其他分享 >rabbitmq消息持久化

rabbitmq消息持久化

时间:2023-09-01 23:24:25浏览次数:32  
标签:持久 队列 确认 rabbitmq 发布 deliveryTag 消息 channel

概念

消息是可以持久化保存的,持久的目的是为了处理任务丢失情况的,采用持久化可以保证消息存储,且消息不被丢失。
默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:

我们需要将队列和消息都标 记为持久化。

代码实现

在声明队列的时候通过参数的方式将其队列声明为可持久化。
在发送消息时传入:MessageProperties.MINIMAL_PERSISTENT_BASIC 参数标识为持久化消息

  /**
         * 生成一个队列
         * 1、队列名称
         * 2、队列里面的消息是否持久化  默认情况下 消息存储在内存中。
         * 3、该队列是否只供一个消费者进行消费,是否进行消息的共享,true--可以多个消费者消费。反之不可(默认)
         * 4、表示是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除,true----自动删除,反之不删除
         * 5、其他参数;例如,延迟消息,死期消息等
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

# 发送消息时,通过参数MessageProperties.PERSISTENT_TEXT_PLAIN标识为持久化消息
//发消息
String message = " hello world";
/**
 * 发送一个消息
 * 1、发送到那个交换机
 * 2、路由的key值是哪个,本次是队列的名称
 * 3、其他参数信息  MessageProperties.PERSISTENT_TEXT_PLAIN  :表示消息实现持久化
 * 4、发送消息的消息体
 */
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));

image

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没 有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要 更强有力的持久化策略,就需要用到发布确认模式。

发布确认

  发布确认是一个保证RabbitMQ 可靠性的一个机制
  保证生产者将信息成功的发送到 RabbitMQ的 server端了,那么broker就会回一个确认,如果没有收到或者收到拒绝信息,那么说明可能网络不好没有发送成功,server端宕机了,broker拒绝接收等情况,如果不进行后续处理,那么信息就会丢失,生产者收到失败的消息使用回调函数在进行处理。
  生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会指定一个唯一的ID,一旦消息投递到队列中,就是发送成功了,broker会立刻发送一个确认ack 给生产者,这个时候,生产者就知道消息已经发送成功了。
  如果队列和信息是持久化的,那么确认消息会在将消息写入磁盘之后再发出,broker返回的确认包含 确认消息的序列号,还可以设置 multiple,表示此序号前的所有消息都得到了处理。
  一旦发布消息,生产者等待确认的同时继续发送下一条消息,如果rabbitMq自身内部错误导致消息为发送成功,生产者就可以再回调方法中继续处理。

保证消息不丢失:

  • 设置要求队列必须持久化 channel.queueDeclare(QUEUE_NAME,isLasting,false,false,null);
  • 设置要求队列中的消息必须持久化 channel.basicPublish("",QUEUE_NAME, MessageProperties.MINIMAL_PERSISTENT_BASIC,strMessage.getBytes());
  • 发布确认
    channel.confirmSelect();

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布 确认,都需要在 channel 上调用该方法
在mq的发布确认当中有三种策略

  • 单个确认发布
  • 批量确认发布
  • 异步确认发布

单个确认发布

  这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它 被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认 的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

  这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。

/**
 * 发布确认模式----单个发布确认
 */
public class ConfirmMessageProducre {
    public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = RabbitMQUtil.getChannel();
        // 开启发布确认
        channel.confirmSelect();
        String queueName = UUID.randomUUID().toString();
        // 构建队列,表示构建一个queueName名字的队列,不持久化,排他队列,自动删除,不携带参数
        channel.queueDeclare(queueName,false,false,true,null);

        // 开始时间
        Long beginTime = System.currentTimeMillis();

        // 发送消息
        for (int i = 0 ; i < 1000 ; i ++){
            String strString = String.format("%s", i);
            channel.basicPublish("",queueName,false,null,strString.getBytes(StandardCharsets.UTF_8));
            // 等待确认: 表示确认存储到磁盘
            boolean isSuccess = channel.waitForConfirms();
            if (isSuccess){
                System.out.println("发送消息成功,消息为:"+strString);
            }
        }
        // 结束时间
        Long endTime = System.currentTimeMillis();
        System.out.println("发布"+1000+"单独确认消息,耗时"+(endTime-beginTime)+"ms"); // 用时102707ms
    }
}

批量确认发布

  先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布。

发布1000条消息,每发送100条确认一次

/**
 * 批量确认
 */
public class BatchProducre {

    public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = RabbitMQUtil.getChannel();
        // 开启发布确认
        channel.confirmSelect();
        String queueName = UUID.randomUUID().toString();
        // 构建队列,表示构建一个queueName名字的队列,不持久化,排他队列,自动删除,不携带参数
        channel.queueDeclare(queueName,false,true,true,null);

        // 开始时间
        Long beginTime = System.currentTimeMillis();

        // 发送消息
        for (int i = 0 ; i < 1000 ; i ++){
            String strString = String.format("%s", i);
            channel.basicPublish("",queueName,false,null,strString.getBytes(StandardCharsets.UTF_8));
            // 每次发布100次消息进行依次确认
            if (i%100==0){
                boolean isSuccess = channel.waitForConfirms();
                if (isSuccess){
                    System.out.println("发送消息成功");
                }
            }
        }
        // 结束时间
        Long endTime = System.currentTimeMillis();
        System.out.println("发布"+1000+"单独确认消息,耗时"+(endTime-beginTime)+"ms"); // 用时 1235 ms
    }
}

异步发布确认

生产者发送消息与 接收确认这两个步骤不是同步的,是异步的,生产者只管发送,同时使用监听(addConfirmListener)返回的确认,对成功确认、失败确认两种情况分别进行处理。非常高效且安全

开启确认模式

  • 声明确认成功的callback
  • 声明确认失败的callback
  • 开启确认监听 addConfirmListener() ,设置callback
  • 信道发送消息,不需要额外设置接收waitForConfirm什么的

image

思路:

  • 在发消息的时候记录下要发送消息的总和
  • 在确认消息成功的回调函数上,通过 总和 - 成功 = 未发生的消息
  • 在确认消息未成功的回调函数上,打印一下未确认的数据

两个回调函数,成功和失败的回调函数如下:

// 准备消息的监听器-----监听哪些消息成功了,哪些消息失败了。
// 消息确认成功的回调函数。
ConfirmCallback ackCallback = (deliveryTag,  multiple)->{
    // 如果是批量,那么就直接删除
    if (multiple){
        //2.删除已经确认的消息  剩下的就是未确认的消息
        // 通过 deliveryTag
        ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
        confirmed.clear(); // 删除
    }else {
        // 不是批量,那么就单个删除
        outstandingConfirms.remove(deliveryTag); // 一般情况下都是单个,因为批量有可能导致消息丢失
    }


    System.out.println("确认的消息--->编号"+deliveryTag);
};
//消息确认失败的回调函数。
//deliveryTag  消息的编号    multiple:是否是批量确认
ConfirmCallback nackCallback = (deliveryTag,  multiple)->{
    //3.打印一下未确认的消息都有哪些
    String message = outstandingConfirms.get(deliveryTag);
    System.out.println("未确认的消息--->编号"+deliveryTag+"内容是:"+message);
};

完整代码实现:

/**
 * 异步发布
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        // 声明队列名称
        String queueName = UUID.randomUUID().toString();
        // 声明队列--> 不持久化,排他队列,关闭后自动删除,不携带其他参数
        channel.queueDeclare(queueName,false,true,true,null);
        // 开启确认发布
        channel.confirmSelect();

        /**
         * 线程安全有序的一个哈希表  适用于高并发的情况下
         * 1、轻松的将序号于消息进行关联
         * 2、轻松的批量删除内容数据  只要给到序号
         * 3、支持高并发
         */
        ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();

        // 准备消息的监听器-----监听哪些消息成功了,哪些消息失败了。
        // 消息确认成功的回调函数。
        ConfirmCallback ackCallback = (deliveryTag, multiple)->{
            // 如果是批量发布
           if (multiple){
               //2.删除已经确认的消息  剩下的就是未确认的消息
               // 通过 deliveryTag
               ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
               confirmed.clear(); // 删除
           }else {
               // 单个删除
               outstandingConfirms.remove(deliveryTag);
           }
            System.out.println("确认的消息--->编号"+deliveryTag);
        };

        //消息确认失败的回调函数。
        //deliveryTag  消息的编号    multiple:是否是批量确认
        ConfirmCallback nackCallback = (deliveryTag,  multiple)->{
            //3.打印一下未确认的消息都有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息--->编号"+deliveryTag+"内容是:"+message);
        };

        // 1.监听哪些消息成功了,   2监听哪些消息失败了
        channel.addConfirmListener(ackCallback,nackCallback);
        long beginTime = System.currentTimeMillis();
        for (int i = 0 ; i < 1000 ; i ++){
            String message = "消息"+i;
            channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
            //1.此处记录下 所有要发送的消息  ----- 消息的总和,getNextPublishSeqNo,在确认模式下返回下一个消息的序列号
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("发布"+1000+"单独确认消息,耗时"+(endTime-beginTime)+"ms"); // 用时 209 毫秒
    }
}

以上 3 种发布确认速度对比

单独发布消息

  • 同步等待确认,简单,但吞吐量非常有限。

批量发布消息

  • 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条 消息出现了问题。

异步处理:

  • 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

标签:持久,队列,确认,rabbitmq,发布,deliveryTag,消息,channel
From: https://www.cnblogs.com/zgf123/p/17673043.html

相关文章

  • SpringAMQP--消息转换器
        ......
  • RabbitMQ Stream类型队列
    RabbitMQ提供了三种类型的队列:ClassicQuorumStream官方文档对于流队列的描述是:高性能、可持久化、可复制、非破坏性消费、只追加写入的日志使用场景:一个队列将同一条消息分发给不同消费者可重复消费消息更高的性能存储大量消息而不影响性能更高的吞......
  • RabbitMQ快速入门--简单队列模型
             ......
  • RabbitMQ快速入门--介绍和安装
                     ......
  • 深入研究消息队列01
    一、消息队列技术趋势 早年业界消息队列演进的主要推动力在于功能(如延迟消息、事务消息、顺序消息等)、场景(实时场景、大数据场景等)、分布式集群的支持等等。近几年,随着云原生架构和Serverless的普及,业界MQ主要向实时消息和流消息的融合架构、Serverless、Event、协议兼容等方......
  • 园子的脱困努力-线上大会合作:欢迎预约直播——2023腾讯全球数字生态大会 + 腾讯云微服
    在园子脱困的关键时期,每一笔收入都很重要,一边在会员救园,一边我们要努力把握每一个商务合作机会,争取早日走出困境。之前园子维持生存的收入主要来自于与云厂商的合作,但去年由于云厂商推广策略的调整,这块收入几乎没有了。当我们对这块收入不报任何希望时,这个月开始,有些云厂商又回......
  • redis持久化
    目录一持久化1.1什么是持久化1.2持久化的实现方式二rdb方案2.1使用2.2RDB问题三AOF3.1AOF介绍3.2#AOF的三种策略3.3AOF重写实现方式AOF重写配置:自动触发时机(两个条件同时满足):重写流程aof配置7.3混合持久化一持久化1.1什么是持久化redis的所有数据保存在内存中,对数......
  • 成品直播源码平台消息传递的协议:MMS协议
    一、成品直播源码平台MMS协议是什么?    MMS协议中文全称为多媒体短信协议,是一种消息传递协议,MMS协议在成品直播源码平台中,MMS协议会利用自身的多媒体消息传输的机制,来实现成品直播源码平台的多媒体内容的高效传输,并呈现给用户。  二、MMS协议在成品直播源码平台的......
  • Kafka - 不仅是消息引擎,还是分布式流处理平台
     如果你通读全篇文字但只能记住一句话,我希望你记住的就是这句ApacheKafka是消息引擎系统,也是一个分布式流处理平台(DistributedStreamingPlatform) 作为流处理平台,Kafka与其他主流大数据流式计算框架相比,优势在哪里呢?我能想到的有两点。第一点是更容易实现端到端的正......
  • Apache RocketMQ 5.0 消息进阶:如何支撑复杂的业务消息场景?
    作者:隆基一致性首先来看RocketMQ的第一个特性-事务消息,事务消息是RocketMQ与一致性相关的特性,也是RocketMQ有别于其他消息队列的最具区分度的特性。以大规模电商系统为例,付款成功后会在交易系统中订单数据库将订单状态更新为已付款。然后交易系统再发送一条消息给Rocke......