首页 > 其他分享 >kafka 如何保证不重复消费又不丢失数据?

kafka 如何保证不重复消费又不丢失数据?

时间:2024-06-20 17:09:39浏览次数:43  
标签:候选者 重复 msgId Redis broker kafka 处理 消息 丢失

作者:Java3y
链接:https://www.zhihu.com/question/483747691/answer/2392949203
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

面试官:今天我想问下,你觉得Kafka会丢数据吗?

候选者:嗯,使用Kafka时,有可能会有以下场景会丢消息

候选者:比如说,我们用Producer发消息至Broker的时候,就有可能会丢消息

候选者:如果你不想丢消息,那在发送消息的时候,需要选择带有 callBack的api进行发送

候选者:其实就意味着,如果你发送成功了,会回调告诉你已经发送成功了。如果失败了,那收到回调之后自己在业务上做重试就好了。

候选者:等到把消息发送到Broker以后,也有可能丢消息

候选者:一般我们的线上环境都是集群环境下嘛,但可能你发送的消息后broker就挂了,这时挂掉的broker还没来得及把数据同步给别的broker,数据就自然就丢了

候选者:发送到Broker之后,也不能保证数据就一定不丢了,毕竟Broker会把数据存储到磁盘之前,走的是操作系统缓存

候选者:也就是异步刷盘这个过程还有可能导致数据会丢

 

 

候选者:嗯,到这里其实我已经说了三个场景了,分别是:producer -> broker ,broker->broker之间同步,以及broker->磁盘

候选者:要解决上面所讲的问题也比较简单,这块也没什么好说的…

候选者:不想丢数据,那就使用带有callback的api,设置 acks、retries、factor等等些参数来保证Producer发送的消息不会丢就好啦。

面试官:嗯…

候选者:一般来说,还是client 消费 broker 丢消息的场景比较多

面试官那你们在消费数据的时候是怎么保证数据的可靠性的呢?

候选者:首先,要想client端消费数据不能丢,肯定是不能使用autoCommit的,所以必须是手动提交的。

 

 

候选者:我们这边是这样实现的:

候选者:一、从Kafka拉取消息(一次批量拉取500条,这里主要看配置)时

候选者:二、为每条拉取的消息分配一个msgId(递增)

候选者:三、将msgId存入内存队列(sortSet)中

候选者:四、使用Map存储msgId与msg(有offset相关的信息)的映射关系

候选者:五、当业务处理完消息后,ack时,获取当前处理的消息msgId,然后从sortSet删除该msgId(此时代表已经处理过了)

候选者:六、接着与sortSet队列的首部第一个Id比较(其实就是最小的msgId),如果当前msgId<=sort Set第一个ID,则提交当前offset

候选者:七、系统即便挂了,在下次重启时就会从sortSet队首的消息开始拉取,实现至少处理一次语义

候选者:八、会有少量的消息重复,但只要下游做好幂等就OK了。

 

 

面试官:嗯,你也提到了幂等,你们这业务怎么实现幂等性的呢?

候选者:嗯,还是以处理订单消息为例好了。

候选者:幂等Key我们由订单编号+订单状态所组成(一笔订单的状态只会处理一次)

候选者:在处理之前,我们首先会去查Redis是否存在该Key,如果存在,则说明我们已经处理过了,直接丢掉

候选者:如果Redis没处理过,则继续往下处理,最终的逻辑是将处理过的数据插入到业务DB上,再到最后把幂等Key插入到Redis上

候选者:显然,单纯通过Redis是无法保证幂等的(:

候选者:所以,Redis其实只是一个「前置」处理,最终的幂等性是依赖数据库的唯一Key来保证的(唯一Key实际上也是订单编号+状态)

候选者:总的来说,就是通过Redis做前置处理,DB唯一索引做最终保证来实现幂等性的

 

 

面试官你们那边遇到过顺序消费的问题吗?

候选者:嗯,也是有的,我举个例子

候选者:订单的状态比如有 支付、确认收货、完成等等,而订单下还有计费、退款的消息报

候选者:理论上来说,支付的消息报肯定要比退款消息报先到嘛,但程序处理的过程中可不一定的嘛

候选者:所以在这边也是有消费顺序的问题

候选者:但在广告场景下不是「强顺序」的,只要保证最终一致性就好了。

候选者:所以我们这边处理「乱序」消息的实现是这样的:

候选者:一、宽表:将每一个订单状态,单独分出一个或多个独立的字段。消息来时只更新对应的字段就好,消息只会存在短暂的状态不一致问题,但是状态最终是一致的

候选者:二、消息补偿机制:另一个进行消费相同topic的数据,消息落盘,延迟处理。将消息与DB进行对比,如果发现数据不一致,再重新发送消息至主进程处理

候选者:还有部分场景,可能我们只需要把相同userId/orderId发送到相同的partition(因为一个partition由一个Consumer消费),又能解决大部分消费顺序的问题了呢。

标签:候选者,重复,msgId,Redis,broker,kafka,处理,消息,丢失
From: https://www.cnblogs.com/paimianbaobao/p/18259046

相关文章

  • 剖析 Kafka 消息丢失的原因
    目录前言一、生产者导致消息丢失的场景场景1:消息体太大解决方案:1、减少生产者发送消息体体积2、调整参数max.request.size场景2:异步发送机制解决方案:1、使用带回调函数的发送方法场景3:网络问题和配置不当解决方案:1、设置acks参数设置为"all"2、设置重试参数3、设置min.insync.......
  • KAFKA配置 SASL_SSL双重认证
    1.背景kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。SASL:是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。SSL:是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并......
  • 服务器数据恢复-重建MDisk导致VDisk丢失的数据恢复案例
    服务器数据恢复环境:IBM某型号存储;Solaris操作系统,部署Oracle数据库。服务器故障:重建MDisk导致对应的存储池中的VDisk丢失,导致Solaris操作系统中的Oracle数据库无法使用。服务器数据恢复过程:1、将所有涉及到Oracle数据库的VDisk以只读模式连接到备份服务器上,在只读模式下对......
  • 剖析 Kafka 消息丢失的原因
    文章目录前言一、生产者导致的消息丢失的场景场景1:消息太大解决方案:1、减少生产者发送消息体体积2、调整参数max.request.size场景2:异步发送机制解决方案:1、使用带回调函数的发送方法场景3:网络问题和配置不当解决方案:1、设置`acks`参数设置为"all"2、设置重试参数......
  • LeetCode80. 删除有序数组中的重复项 II题解
    LeetCode80.删除有序数组中的重复项II题解题目链接:https://leetcode.cn/problems/remove-duplicates-from-sorted-array-ii/题目描述:给你一个有序数组nums,请你原地删除重复出现的元素,使得出现次数超过两次的元素只出现两次,返回删除后数组的新长度。不要使用额外的数......
  • LeetCode26. 删除有序数组中的重复项题解
    LeetCode26.删除有序数组中的重复项题解题目链接:https://leetcode.cn/problems/remove-duplicates-from-sorted-array题目描述:给你一个非严格递增排列的数组nums,请你原地删除重复出现的元素,使每个元素只出现一次,返回删除后数组的新长度。元素的相对顺序应该保持一......
  • 分区丢失数据恢复
    分区丢失了数据如何恢复呢?分区丢失是常常会出现的一种数据丢失情况,比如,硬盘上的某个盘突然不见了,丢失的分区及数据该怎么恢复呢?分区是存储设备在逻辑层面上划分的一块连续的磁盘区域。硬盘、移动硬盘等设备必须先进行分区才可以存储数据,在电脑中,操作系统还会给每个正常的分区分......
  • Kafka集群保姆级部署教程
    目录资源列表基础环境修改主机名关闭防火墙关闭selinux安装JAVA安装Kafka下载Kafka解压修改配置文件kafka01kafka02kafka03启动服务启动ZK启动Kafka验证测试创建topic查看topic        今天给大家分享的是Kafka分布式集群部署,上次分享的单机版的k......
  • 【2024最新精简版】Kafka面试篇
    文章目录Kafka和RabbitMQ什么区别讲一讲Kafka架构你们项目中哪里用到了Kafka?为什么会选择使用Kafka?有什么好处?使用Kafka如何保证消息不丢失?消息的重复消费问题如何解决的?Kafka如何保证消费的顺序性?Kafka的高可用机制有了解过嘛?Kafka实现高性能的设计有了解......
  • 批量生产千万级数据 推送到kafka代码
    1、批量规则生成代码1、随机IP生成代码2、指定时间范围内随机日期生成代码3、随机中文名生成代码。packagecom.wfg.flink.connector.utils;importjava.time.LocalDate;importjava.time.LocalDateTime;importjava.time.LocalTime;importjava.util.ArrayList;i......