首页 > 其他分享 >分布式进阶(四)——分布式框架之高性能:消息丢失

分布式进阶(四)——分布式框架之高性能:消息丢失

时间:2024-04-05 09:02:29浏览次数:17  
标签:进阶 生产者 RabbitMQ 高性能 消息 消费者 分布式 源码 丢失

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

学习必须往深处挖,挖的越深,基础越扎实!

阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析


阶段4、深入jdk其余源码解析


阶段5、深入jvm源码解析

码哥源码部分

 

码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】

码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】

码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】

​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】

码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】

码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】

 

码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】

终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!

打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】

在使用消息队列中的过程中,很可能会出现消息丢失的情况。本章我们就来分析哪些场景下可能出现消息丢失,然后继续以RabbitMQ和Kafka为例,介绍这两种消息队列是如何保证消息不丢失的。

一、丢失场景

一般来说,消息丢失无非就是发生在三个阶段: 生产者投递消息 、 消息队列存储消息 、 消费者消费消息 。

生产者投递消息:
生产者在写消息的过程中,由于网络等原因,导致消息队列没接收到消息,从而出现消息丢失的现象。

消息队列存储消息:
消息队列接受到消息后,一般先暂存到内存中,然后再持久化,如果在持久化前消息队列自身挂掉了,就可能导致消息丢失。

消费者消费消息:
消费者消费到了消息,但是还没来得及处理完成,然后就自己挂掉了,而消息队列则认为消费者已经处理完了。

二、RabbitMQ

我们先来看下RabbitMQ是如何应对上述三种消息丢失的场景的。

2.1 生产者投递消息丢失

RabbitMQ有两种方式可以保证生产者对消息的100%成功投递:事务机制Confirm机制

事务机制
  1. 生产者发送数据之前开启Rabbitmq事务channel.txSelect,然后发送消息;
  2. 如果消息没有被接收到,生产者会收到异常报错,此时可以回滚事务channel.txRollback,然后重新发送;
  3. 如果消息被接受,则可以提交事务channel.txCommit
    channel.txSelect();    //开启事务
    try{
        // 发送消息
    }catch(Exception ex){
        channel.txRollback();    //回滚事务
        // 重新发送消息
    }
    channel.txCommit();    //提交事务

缺点:
RabbitMQ的事务机制是同步的,会导致吞吐量大幅下降。

Confirm机制

生产者可以开启confitm模式:

  1. 每次写消息时会给消息分配一个唯一id;
  2. 如果RabbitMQ收到了该消息,会回调生产者的ack接口,表示接受成功;
  3. 如果RabbitMQ接受消息失败,会回调生产者的nack接口,表示接受或处理失败,生产者在nack方法内进行重试发送;
    //开启confirm模式
    
    channel.confirm();
    
    // 发送消息,然后就不管了
    send();
    
    /**
     * 消息成功被接受后回调
     */
    public void ack(String messageId){
    
    }
    
    /**
     * 消息接受失败时回调
     */
    public void nack(String messageId){
        send();
    }

如果因为网络原因,这两个方法都没有被回调,生产者可以自己维护消息id的状态,对一些超时的消息,根据状态进行重发。

事务机制和confirm机制最大的不同在于:事务机制是同步的,会导致生产者线程阻塞。而Confirm机制是异步的,采用回调来确认消息是否发送成功,所以生产环境一般都用Confirm机制保证生产者对消息的100%可靠投递。

2.2 MQ故障丢失

因MQ故障而导致消息丢失的解决方案就是 持久化 。在RabbitMQ中开启持久化的方式如下:

  1. 创建queue时,将其设置为持久化,这样RabbitMQ会持久化queue的元数据;
  2. 生产者发送消息时,消息的deliveryMode设置为2,此时RabbitMQ就会将消息持久化到磁盘上去。

经过这样的设置,当RabbitMQ接受到消息然后持久化完成后,即使MQ挂了,重启后也可以从磁盘恢复queue和消息。但是还要注意一种情况:RabbitMQ接收到了消息,但是还没来得及持久化到磁盘,自己就挂了。此时,会导致MQ内存里的一点点数据丢失,但是这个概率是很小的。

我们可以配合生产者的confirm机制来解决上述问题。由于开启持久化后,只有消息被持久化到磁盘,才会回调生产者的ack接口,所以生产者在收不到ack的情况下,可以进行重发,这样哪怕持久化到磁盘前MQ自身挂了,也可以保证恢复后收到重发的消息。

2.3 消息投递后丢失

这种情况是因为消费者自身问题或网络问题造成的。RabbitMQ有一个消费者的autoAck机制,当消费者消费成功后,会自动通知RabbitMQ已经消费成功,此时如果消费者自身出现异常,就会导致消息丢失。

解决方案如下:

  1. 关闭RabbitMQ的消费者autoAck机制;
  2. 消费者消费完消息,自身逻辑处理成功后,再进行手动ack确认。(这种情况下,消费者必须要保证自身接口的幂等性)

三、Kafka

3.1 生产者投递消息丢失

Kafka可以通过一些配置来保证生产者对消息的100%可靠投递。

我们之前在《分布式框架之高性能:消息队列的可用性》一章中提到过,Kafka只有Leader节点会接受消息的读/写。Leader接受到生产发送来的消息后,只要当其它所有follower都同步成功,才会响应给生产者,否则生产者会不断重试,直到收到成功响应。

可以通过配置ack=all,来确保所有follower都进行消息同步。

3.2 MQ故障丢失

这种情况通常出现在Leader选举时:

  1. 某个partiton的Leader正在和其它Follower同步消息;
  2. 这个partiton所在的Broker突然挂了,部分Follower还没同步完成;
  3. 新选举的Leader刚好是之前没同步完成的,此时它就缺少了一些数据。

Kafka可以通过配置来解决这个问题,具体配置如下:

  1. topic设置replication.factor参数:值必须大于1,该参数用于配置每个topic的partition的副本数,即每份数据一共存在replication.factor+1份拷贝;
  2. kafka服务端设置min.insync.replicas参数:值必须大于1,这个是要求一个leader至少感知到有一个follower还跟自己保持联系,这样才能确保leader挂了还有一个follower候补;
  3. producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了;
  4. producer端设置retries=MAX:这个是要求生成者确保消息发送成功,一旦失败,就无限重试。

3.3 消息投递后丢失

我们之前在《分布式框架之高性能:消息队列的可用性》一章中提到过,消费者消费完消息后,会将消息的offset告知Kafka,表示这条消息已经被成功消费。

默认情况下,消费者会自动提交offset,如果此时消费者挂了,那么Kafka依然会认为消费者已经成功消费了该消息,从而出现消息丢失。

所以,Kafka对“消息投递后丢失”这一场景的问题处理方式和RabbitMQ类似,一般 消费者需要关闭自动提交offset ,等处理完消息后自己手动提交offset,就可以保证消息不会丢。但依然一样可能会出现重复消费问题,比如消费者刚处理完,还没提交offset结果自己挂了或因为网络原因Kafka没收到通知。所以消费者端仍然需要保证接口的幂等性。

四、总结

本章,我们介绍了在使用消息队列过程中出现消息丢失的几种场景,并以RabbitMQ和Kafka为例介绍了解决方案。总体来说,要保证消息不丢失,就是以下几种思路:

  1. 要保证消息投递的100%成功,基本思路就是消息队列的ack机制,以及生产者的最大努力投递;
  2. 要保证MQ故障消息不丢失,基本思路就是MQ自身做好持久化,或数据同步机制;
  3. 要保证消费者的100%消费成功,基本思路就是消费者的手动确认,以及消费者自身接口的幂等性保证。

标签:进阶,生产者,RabbitMQ,高性能,消息,消费者,分布式,源码,丢失
From: https://blog.csdn.net/smart_an/article/details/137391665

相关文章

  • 分布式进阶(五)——分布式框架之高性能:消息有序性
    作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬学习必须往深处挖,挖的越深,基础越扎实!阶段1、深入多线程阶段2、深入多线程设计模式阶段3、深入juc源码解析阶段4、深入jdk其余源码解析......
  • Python爬虫之分布式爬虫
    分布式爬虫1.详情介绍        分布式爬虫是指将一个爬虫任务分解成多个子任务,在多个机器上同时执行,从而加快数据的抓取速度和提高系统的可靠性和容错性的技术。        传统的爬虫是在单台机器上运行,一次只能处理一个URL,而分布式爬虫通过将任务分解成多个子......
  • 太强了!分布式Elasticsearch集群数据迁移企业案例
    太强了!分布式Elasticsearch集群数据迁移企业案例原创 林致远 Linux运维之旅 2024-04-0408:31 广东 1人听过Linux运维之旅专注分享运维实用技术,内容不限于Linux系统运维、自动化工具、监控工具、日志采集、容器技术、测试工具、python、GO等技术分享20篇原......
  • 第六章 面向对象进阶——6-3 继承
    一,构造方法:this(…)-访问本类构造方法super(…)-访问父类构造方法 二,继承中构造方法的访问特点(理解)注意:子类中所有的构造方法默认都会访问父类中无参的构造方法子类会继承父类中的数据,可能还会使用父类的数据。所以,子类初始化之前,一定要先完成父类数据的初始化,原因......
  • MyDumper/MyLoader的进阶玩法
    一、前言从mydumperv0.11.5版本开始,mydumper提供了--load-data参数,使用此参数导出的sql文件将不再是insert语句,而是loaddata语句。在MySQL官方文档中关于loaddata是这么描述的:Whenloadingatablefromatextfile,useLOADDATA.Thisisusually20timesfasterthanus......
  • Caddy进阶:因为Nginx占用了80和443端口导致Caddy无法启动
    日志里面有个红色的error,表示安装后没有启动caddy的服务。看了很多文章,感觉都没有把我当小白。自己折腾了几个小时,总算搞明白了。1、如果80端口和443端口被其它程序(比如Nginx)占用了,得先卸载Nginx。查看443端口被谁占用了:root@web005:~#sudolsof-i:443COMMANDPIDUSE......
  • Spark进阶(四)Spark性能优化和调优
    一、Spark的性能优化工具和技术Spark的性能优化工具和技术主要包括以下几个方面:数据分区和缓存:合理地将数据进行划分和缓存,可以提高数据的访问效率。可以使用repartition或coalesce进行数据分区,使用persist或cache进行数据缓存。并行度设置:通过调整并行度,可以提高Spark......
  • 加入云原生实战营(星球),带你进阶 Go + 云原生高级开发工程师
    过去1年,趁着闲暇时间,我创建了一个Go+云原生技术学习社群,旨在帮助你快速进阶为Go+云原生高级开发工程师,提高你的职场竞争力、扩展职业宽度,最终谋得一份好差事(进入大厂、升职加薪)。本篇文章,我来详细介绍下云原生实战营知识星球,让你对本知识星球有一个充分的了解,通过这些了......
  • 每日面经分享(python进阶 part2)
    Python中的装饰器和上下文管理器区别是什么?它们分别适用于哪些场景?a.装饰器用于在函数或类的外部添加额外功能,而上下文管理器用于管理资源的获取和释放。b.装饰器是一种用于修改函数或类行为的技术。适用于需要在函数或类的外部添加额外功能的场景,比如日志记录、性能监......
  • 中间件 ZK分布式专题与Dubbo微服务入门 6-13 acl - ip权限
    0课程地址https://coding.imooc.com/lesson/201.html#mid=12729 1重点关注1.1本节内容通过schema为ip的方式设置权限,只有指定ip才能操作 1.2关键代码//ip方式的aclList<ACL>aclsIP=newArrayList<ACL>();......