首页 > 其他分享 >Kafka如何解决消息丢失的问题

Kafka如何解决消息丢失的问题

时间:2023-10-15 12:31:54浏览次数:32  
标签:Broker Kafka 消息 Offset 解决 数据 Leader 丢失


在 Kafka 的整个架构中可以总结出消息有三次传递的过程:

  1. Producer 端发送消息给 Broker 端
  2. Broker 将消息进行并持久化数据
  3. Consumer 端从 Broker 将消息拉取并进行消费

在以上这三步中每一步都可能会出现丢失数据的情况, 那么 Kafka 到底在什么情况下才能保证消息不丢失呢?

Producer 端丢失

Producer 端为了提升发送效率,减少 IO 操作,发送消息的时候是将多个请求异步发送出去,所以 Producer 端消息丢失更多是因为消息根本就没有发送到 Broker 端。

导致 Producer 端没有发送消息成功的有以下原因:

  • 网络原因:由于网络抖动导致数据没发到 Broker 端
  • 数据原因:消息体太大超出 Broker 承受范围导致 Broker 拒收消息

解决方案

Producer 端数据丢失是因为通过异步的方式进行发送的,所以如果此时使用发后即焚的方式发送,即调用 Producer.send(msg) 会立即返回,由于没有回调,可能因网络原因导致 Broker 并没有收到消息,此时就丢失了。

因此可以从以下几方面进行解决 Producer 端消息丢失问题:

  • 使用带回调通知函数的方法进行发送消息
  • ACK 确认机制
  • 重试次数

Producer 端通过 ACK 配置来确认消息是否生产成功,配置参数如下:

  • 0:由于发送后就自认为发送成功,这时如果发生网络抖动,会造成数据丢失
  • 1:消息发送 Leader 分区并接收成功就表示发送成功,只要 Leader 分区不挂掉,就可以保证数据不丢数据,但是如果 Leader 分区挂掉了,Follower 分区还未同步完数据且没有 ACK,这时就会丢数据
  • -1 或者 all: 消息发送需要等待 ISR 中 Leader 分区和所有的 Follower 分区都确认收到消息才算发送成功, 可靠性最高,但也不能保证不丢数据,比如:当 ISR 中只有 Leader 分区, 这样就变成 acks = 1 的情况了

Broker 端丢失

Broker 接收到数据后会将消息进行持久化到磁盘存储,为了提高吞吐量和性能,采用的是异步批量刷盘的策略,也就是说按照一定的消息量和间隔时间进行刷盘。

首先会将数据存储到 PageCache 中,至于什么时候将 Cache 中的数据刷盘是由操作系统根据自己的策略决定或者调用 fsync 命令进行强制刷盘。如果在同步到 Follower 分区前 Broker 宕机掉,且选举了一个新的 Leader 分区,那么落后的消息数据就会丢失。

既然 Broker 端消息存储是通过异步批量刷盘的,那么就有可能会丢数据。由于 Kafka 中并没有提供同步刷盘的方式,所以单个 Broker 还是很有可能丢失数据的。

kafka 通过多分区多副本机制已经可以最大限度的保证数据不丢失,如果数据已经写入 PageCache 中但是还没来得及刷写到磁盘,此时如果所在 Broker 突然宕机挂掉或者停电,极端情况还是会造成数据丢失。

解决方案

Broker 端丢失消息是因为通过异步批量刷盘的策略,先将数据存储到 PageCache,再进行异步刷盘。

因此 Kafka 是通过多分区多副本的方式来最大限度的保证数据不丢失。可以通过以下参数配合来保证:

  • unclean.leader.election.enable:该参数表示有哪些 Follower 可以有资格被选举为 Leader , 如果一个 Follower 的数据落后 Leader 太多,那么一旦它被选举为新的 Leader, 数据就会丢失,因此我们要将其设置为false,防止此类情况发生。
  • replication.factor:该参数表示分区副本的个数。建议设置 replication.factor >=3, 这样如果 Leader 副本挂掉,Follower 副本会被选举为新的 Leader 副本继续提供服务。
  • min.insync.replicas:该参数表示消息至少要被写入成功到 ISR 多少个副本才算”已提交”,建议设置min.insync.replicas > 1, 这样才可以提升消息持久性,保证数据不丢失。

另外还需要确保一下 replication.factor > min.insync.replicas,如果相等,只要有一个副本挂掉,整个分区就无法正常工作了,因此推荐设置成: replication.factor = min.insync.replicas +1, 最大限度保证系统可用性。

Consumer 端丢失

消息消费流程主要分为两个阶段:

  • 从 Broker 上拉取数据
  • 处理消息,并提交 Offset 记录

Consumer 拉取后消息后需要提交 Offset, 那么这里就可能会丢数据的。丢失原因如下:

  • 可能使用的自动提交 Offset 方式
  • 拉取消息后先提交 Offset,后处理消息,如果此时处理消息的时候异常宕机,由于 Offset 已经提交了, 待 Consumer 重启后,会从之前已提交的 Offset 下一个位置重新开始消费, 之前未处理完成的消息不会被再次处理,对于该 Consumer 来说消息就丢失了。
  • 拉取消息后先处理消息,在进行提交 Offset, 如果此时在提交之前发生异常宕机,由于没有提交成功 Offset, 待下次 Consumer 重启后还会从上次的 Offset 重新拉取消息,不会出现消息丢失的情况, 但是会出现重复消费的情况,这里只能业务自己保证幂等性。

解决方案

Consumer 端丢失消息是因为在拉取完消息后提交 Offset 造成的,因此为了不丢数据,正确的做法是:拉取数据、业务逻辑处理、提交消费 Offset 位移信息。

同时还需要设置参数 enable.auto.commit = false,采用手动提交位移的方式。另外对于消费消息重复的情况,业务自己保证幂等性, 保证只成功消费一次即可。

标签:Broker,Kafka,消息,Offset,解决,数据,Leader,丢失
From: https://blog.51cto.com/u_15856116/7871097

相关文章

  • 记录Orcad中报错和解决方法
    本章目的:使用Orcad画原理图总会遇到奇怪的报错,故整理总结 1、根本原因:有元器件没有编号;更新一下位号解决。提示➤ERROR(ORNET-1048):Designisnotannotated.ERROR(ORNET-1006): Netlist failed or may be unusable. 2、根本原因:DesignCache右键CleanupCache,和......
  • 完美解决XDG_RUNTIME_DIR not set, defaulting to ‘/tmp/runtime-root‘
    完美解决XDG_RUNTIME_DIRnotset,defaultingto‘/tmp/runtime-root‘源代码杀手已于2023-01-1112:53:46修改阅读量4.1w收藏49点赞数13分类专栏:报错记录文章标签:linux版权报错记录专栏收录该内容19篇文章0订阅订阅专栏警告:对Linux不熟悉的人慎重使用,为了保险起......
  • Kafka 入门教程
     Kafka是分布式发布-订阅消息系统,它最初由LinkedIn公司开发,使用Scala语言编写,之后成为Apache项目的一部分。在Kafka集群中,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除,同样的消息的生产者和消费者......
  • 解决VS Code/Code insiders右键python代码无法“转到定义”问题
    最近怀疑自己用了个假的VSCode,同门的能丝滑跳转定义、跳转引用,自己的偏偏不行(合着这么爽的功能我从来没享受到(。﹏。*)),网上各种教程试了个遍都不行,最后自己摸索出了解决方案。记录在此备忘:按以下顺序依次Check:确保安装这些插件:Python、Pylance、IntelliCode(用远程服......
  • 关于Cortex-M3报错解决方法总结:Flash Download failed错误
    事情原因:在一次使用ST-LINKv2下载程序时,突然出现Error:FlashDownloadFailed-"Cortex-M3”这个错误,显示没有错误,没有警告。芯片型号接线都没有问题。当时就很摸不着头脑,然后上网查看了一下。原来是因为STM32F103C8T6有64kFlash和20kRAM,tm他们不属于高容量的Flash。所以我改了......
  • 测试springboot项目苍穹外卖,解决websocket“服务器错误,无法接收实时报警信息”问题
    使用IDEA启动springboot项目苍穹外卖后,http://localhost:8071/能够正常访问登录,但是网页右上角始终显示“服务器错误,无法接收实时报警信息”: 在网上搜索找到:https://blog.csdn.net/qq_65032048/article/details/132077097,发现可能是修改了nginx端口号为8071导致。解决办法:在n......
  • MySQL解决查询语句1111 - Invalid use of group function错误
    是因为mysql查询语句的字段当中有聚合函数,where条件中不能用聚合函数的计算值作为查询条件,否则会出现:>1111-Invaliduseofgroupfunction错误。可以使用having解决。补充:这里主要要清楚where和having的作用以及区别:“WHERE” 是一个约束声明,在查询数据库的结果返回之前对......
  • Kafka:用于日志处理的分布式消息系统
    周末躺不平,摆不烂,卷不动,随便读一篇paper吧原文:Kafka:aDistributedMessagingSystemforLogProcessing作者:JayKreps/NehaNarkhede/JunRao这三尊神就是当初在LinkedIn开发Kafka的大佬摘要日志处理已经成为了当下互联网公司数据管道(datapipeline)的重要组成部分。......
  • 21计算机解决问题的过程
    第二单元编程计算同学们进入高中阶段,高中生活丰富多彩,需要学习好科学文化知识的同时,我们还需要丰富课外生活,好的课外生活能够促进科学文化知识的学习,如游戏娱乐、运动、运用智慧增值财富等,这个单元我们将介绍一个娱乐游戏,同学们可以在游戏的开发中学习知识,找到编程的乐趣,理解计算......
  • pycharm连接远程服务器,代码成功运行,但一些基本python属性和函数会报红线(例如print)解决
    状况:pycharm连接远程服务器,代码成功运行,但一些常见python属性和方法报红线,例如print。当你在程序中输入print这种基本方法时,pycharm是不会有输入提示的,输入后也会报红线解决方法:将远程服务器中的环境变量添加至pycharm中查出服务器中环境变量:在xshell中输入vim~/.bashrc执......