首页 > 其他分享 >消息队列:如何确保消息不会丢失?

消息队列:如何确保消息不会丢失?

时间:2024-09-21 19:19:28浏览次数:3  
标签:Producer 队列 Broker 发送 消息 丢失

引言

对业务系统来说,丢消息意味着数据丢失,这是无法接受的。

主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。

虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

检测消息丢失

消息队列的有序性

在这里插入图片描述
大概流程如下:

  1. 发送端在拦截器中,给每条消息附加一个连续递增的序号;
  2. 消费端在拦截器中检测消息的连续性,如果消息没有丢失,消息序号必然是库中保存的上一次消费到的消息序号+1;

对于 kafka 和 rocketmq,不保证在 Topic 上的严格顺序,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

除了在拦截器生成消息序号外,我们也可以利用消息位移来实现。

位移(Offset)是消息队列中用于标识消息位置的指标,它指向了消息在队列中的确切位置。在如Kafka这样的消息队列中,位移是消费者读取消息的起始点。消费者在读取消息后,会更新位移,表明已经读取了这些消息。如果位移连续,那么可以认为没有消息丢失。如果位移不连续,比如位移从100直接跳到了102,那么101的消息就可能丢失了。

例如:

 public void onMessage(List<ConsumerRecord<String, EventRecord>> data, Acknowledgment acknowledgment) {

        try {
             
            /**
             * 
             * 业务处理
             * 
             * */


            long initStartOffset = redisCluster.exists(String.format(Constants.INIT_OFFSET, bizName))
                    ? Long.valueOf(redisCluster.get(String.format(Constants.INIT_OFFSET, bizName)))
                    : 0L;
            if (initStartOffset == 0) {
                redisCluster.set(String.format(Constants.INIT_OFFSET, bizName), String.valueOf(
                        data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).max().getAsLong()));
                acknowledgment.acknowledge();
                return;
            }
            long minOffsetRecord = data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).min()
                    .getAsLong();
            long maxOffsetRecord = data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).max()
                    .getAsLong();



            log.info("initStartOffset={}, minOffsetRecord={}, maxOffsetRecord={}", initStartOffset, minOffsetRecord, maxOffsetRecord);

            if (minOffsetRecord - initStartOffset > 1) {
                log.info(String.format(Constants.INIT_OFFSET, bizName) + "存在数据丢失。。。。。。。。。。。。。。。。。。minOffsetRecord = " + minOffsetRecord
                        + ",initStartOffset = " + initStartOffset);
                consumeUtils.sendDingWarn("同步消费异常", bizName + "数据丢失!!!!!!!");
            }
            redisCluster.set(String.format(Constants.INIT_OFFSET, bizName), String.valueOf(maxOffsetRecord));
            acknowledgment.acknowledge();

            
        } catch (Exception e) {
            throw new RuntimeException("kafka message process error!", e);
        }

}

在检测到消息丢失时,我们可以钉钉报警,也可以直接抛出异常,停止消费。

确保消息可靠传递

在这里插入图片描述

  • 生产阶段: 消息在 Producer 创建出来,经过网络传输发送到 Broker 端;
  • 存储阶段: 消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上;
  • 消费阶段: Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

在上面三个阶段中,哪些会发生消息丢失呢?

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。

有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。

以 Kafka 为例,我们看一下如何可靠地发送消息:同步发送时,只要注意捕获异常即可。

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("消息发送成功。");
} catch (Throwable e) {
    System.out.println("消息发送失败!");
    System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。

producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println("消息发送成功。");
    } else {
        System.out.println("消息发送失败!");
        System.out.println(exception);
    }
});

存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

比如,上面的那段代码,只有业务逻辑处理完成后,才会发送消费确认。

acknowledgment.acknowledge();

参考资料
《消息队列高手课》

标签:Producer,队列,Broker,发送,消息,丢失
From: https://blog.csdn.net/yqq962464/article/details/142363880

相关文章

  • 分布式事务一致性:本地消息表设计与实践
    概念本地消息表是一种常见的解决分布式事务问题的方法。其核心思想是将分布式事务拆分成本地事务来处理,通过消息队列来保证各个本地事务的最终一致性。实现步骤创建本地消息表:在数据库中创建一个本地消息表,用于存储待发送的消息以及消息的发送状态和相关信息。表结构通......
  • 61.《数据结构-栈 队列 串》
    栈栈是受限的线性表(只允许在一端进行插入删除操作)LIFO特点后进先出当n个不同元素进栈时,出栈元素的不同排列个数1/(n+1)Cn2n顺序栈:(S.top=-1)进栈:if(S.top==MaxSize-1)栈满S.data[++S.top]=x入栈:if(S.top==-1)栈空x=S.data[S.top--]大致理解图:链栈:不......
  • kafka 消息位移提交几种方式:消息重复消息、消息丢失的关键
    消费位移Kafka中的位移(offset)是用来记录消息在分区中的位置的标志,简单说就是记录消费者的消费进度,每次消息消费后需要更新消费进度,也就是位移提交由此可见一旦位移提交发生异常,会导致消费进度不正确,就必然发生消息丢失或者重复消费消息位移存储内部主题__consumer_off......
  • 观察者模式:如何发送消息变化的通知?
    观察者模式是一种非常流行的设计模式,也常被叫作订阅-发布模式。观察者模式在现代的软件开发中应用非常广泛,比如,商品系统、物流系统、监控系统、运营数据分析系统等。现在我们常说的基于事件驱动的架构,其实也是观察者模式的一种最佳实践。当我们观察某一个对象时,对象传递出的每一个......
  • 备忘录模式:如何在聊天会话中记录历史消息?
    相较于其他的设计模式,备忘录模式不算太常用,但好在这个模式理解、掌握起来并不难,代码实现也比较简单,应用场景就更是比较明确和有限,一般应用于编辑器或会话上下文中防丢失、撤销、恢复等场景中。下面就一起来了解一下吧。一、模式原理分析备忘录模式的原始定义是:捕获并外部化对象的......
  • 1.JDK自带的线程池有哪些?2.线程池中核心线程数与最大线程数与缓冲任务队列的关系?3.为
    1.JDK自带的线程池有哪些?2.线程池中核心线程数与最大线程数与缓冲任务队列的关系?在Java中的线程池(如ThreadPoolExecutor)中,核心线程数(corePoolSize)、最大线程数(maximumPoolSize)以及缓冲队列(workQueue)之间存在着密切的关系,它们共同决定了线程池如何管理和调度任务。以下是......
  • Leetcode 406. 根据身高重建队列
    1.题目基本信息1.1.题目描述假设有打乱顺序的一群人站成一个队列,数组people表示队列中一些人的属性(不一定按顺序)。每个people[i]=[h_i,k_i]表示第i个人的身高为h_i,前面正好有k_i个身高大于或等于h_i的人。请你重新构造并返回输入数组people所表示的队列。返......
  • 解决QFC810.exe运行时错误:soundplayer.dll文件丢失,恢复音频播放的实用指南
    当遇到QFC810.exe运行时错误,提示soundplayer.dll文件丢失时,这通常意味着你的系统或应用程序目录中缺少了必要的动态链接库文件(DLL),导致音频播放功能无法正常工作。以下是一份恢复音频播放的实用指南:一、确认问题首先,确认错误消息确实是由于soundplayer.dll文件丢失引起的。这......
  • 丧尸围城bink2w64.dll文件丢失?专家级修复丧尸围城因bink2w64.dll缺失导致的启动错误
    针对《丧尸围城》游戏中因bink2w64.dll文件丢失导致的启动错误,我们可以采取一系列专家级的修复方法。以下是一些详细步骤和建议:一、确认问题首先,确认游戏启动错误确实是由于bink2w64.dll文件丢失所致。这通常会在游戏启动时出现错误提示,明确指出该文件缺失。二、重新安装游......
  • 帝国cms系统错误,不能发布文章很多东西不能生成丢失
    当帝国CMS系统出现错误,导致不能发布文章或生成内容时,可能涉及多个方面的故障。以下是一些常见的解决方法,帮助你诊断并解决这类问题:1.检查服务器环境确保服务器环境满足帝国CMS的要求。解决方法:PHP版本:检查服务器的PHP版本是否符合帝国CMS的要求。通常帝国CMS需要PHP5.6及以......