首页 > 其他分享 >kafka幂等性与重复消费

kafka幂等性与重复消费

时间:2023-03-31 09:56:49浏览次数:28  
标签:事务 消费 重复 kafka 消息 offset commit

1、Kafka生产者幂等性

1)Kafka 消息交付可靠性保障:

  • Kafka 默认是:至少一次
  • 最多一次 (at most once) : 消息可能会丢失,但绝不会被重复发送
  • 至少一次 (at least once) : 消息不会丢失,但有可能被重复发送
  • 精确一次 (exactly once) : 消息不会丢失,也不会被重复发送

2)Kafka实现幂等性

  • 幂等性 (Idempotence) :

    enable.idempotence = true 开启幂等性

    ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的

    SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

    由上面的 ProducerID、SequenceNumber、再加上分区号就可以是实现单个分区里面的消息幂等性,但是这里有两个缺点,ProducerID 在消费者重启的时候可能不一致、只实现了单分区内的幂等    

    性。

  • 事务 (Transaction) :

    为了解决上面的两个问题,就要引入kafka事务,开启事务,必须开启幂等性,事务如下图:

    

 

    设置全局唯一的事务ID --- TransactionID;事务ID与PID绑定,当producer重启后,会根据事务ID查找PID,因此能够保证全局at-exactly-once语义

    kafka的事务跟我们常见数据库事务概念差不多,也是提供经典的ACID,即原子(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)

    事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。

    这里需要提一下:事务ID是由用户指定的,而PID是生产者创建成功后,producer向kafka申请的

 

 2、kafka重复消费

1)重复消费的原因

  1. 消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交。
  2. 设置enable.auto.commit为true,如果在关闭消费者进程之前,取消了消费者的订阅,则有可能部分offset没提交,下次重启会重复消费。
  3. 消费后的数据,当offset还没有提交时,Partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout.ms时间,那么就会触发reblance重平衡,此时可能存在消费者offset没提交,会导致重平衡后重复消费。
  4. commit offset的保存时间offsets.retention.minutes只有1天,而消息log的保存时间log.retention.hours有7天,如果consumer是手动commit,当长时间没有新消息可以消费,也就长时间没有commit,造成commit offset被broker删除。之后一旦consumer重启,初始化时发现commit offset已经被删除,取到了0去fetch,必定会超出broker的留存消息范围,触发consumer的reset。如果reset=earliest 就会从留存的7天内的最小位消息开始消费,造成大量的重复消费。如果reset=latest 就会从最新消息开始消费,造成会丢失重启期间的消息。

2)处理方案

  1. 前面1、2、3中发生的几率发生概率比较小,但也是可能不完全的消除的,所以最好的办法是在业务端进行幂等性处理
  2. 第4点的话就是配置的问题了,也是比较容易出现,但也是排查比较隐晦的问题,数据全部被消费过, 只是正常的过期删除,所以这并没有任何问题,也不会发生reset,正常情况下,commit offset保存时间可以配置成消息log保存时间的2倍,如果log.retention.hours 仍然为 7 days,那 offsets.retention.minutes 可以配置成 14 days。习惯上把消息log配置保存 3 days,offsets配置保存 6 days

 

 

标签:事务,消费,重复,kafka,消息,offset,commit
From: https://www.cnblogs.com/vayneChen/p/17275172.html

相关文章

  • Linux 部署: canal (同步mariadb数据发往kafka)
    参考文档:https://blog.csdn.net/weixin_55549435/article/details/123309631目录1节点规划2部署mariadb3部署canal4验证附录1节点规划节点ipdeployvm1......
  • Facebook用户消费力竟不敌Pinterest
    表面上Pinterest的用户数下降了,实则是上升了,为什么这样说呢?前Ebay联合创始人创立的珠宝和饰品在线网站Boticca.com最近比较了Pinterest和Facebook各自50000个用户的消费能力......
  • 生产不会重复的随机数
    importjava.text.SimpleDateFormat;importjava.util.ArrayList;importjava.util.Date;importjava.util.List;importjava.util.Random;publicclassMyRandom{......
  • 关于 kafka 的一些经验
    1.kafka高吞吐原因PageCache+顺序写磁盘(读与写)producer请求:Server端的I/O线程统一将请求写到操作系统的PageCache后立即返回,当消息达到一定阈值后,Kafka应......
  • iOS 解决按钮防重复点击的问题
    日常使用中经常会出现按钮重复点击导致的数据重复提交问题,从而导致数据出错,常用的解决办法有1、在发起请求的时候来一个全屏的loading这样在loading期间按钮就无法被点击,......
  • 分布式学习笔记-zookeeper以及kafka
    zookeeper所谓分布式系统就是在不同的地域分布的多个服务器,共同组成一个应用系统来为用户提供服务,在分布式系统中最重要的是进程调度。多个进程的应用需要竞争资源,此时需要......
  • 防止消息重复消费
    用幂等性解决重复消息问题一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。幂等(Idempotence)本来是一个数学上的概念,它是这样定义的:如果一个函数f......
  • sql 过滤重复字段,取最早或最新记录
    --可以将重复字段,取最早的一次,同理,也可以取时间最新的一次--方法一withtmpas(selectrow_number()over(partitionby分组字段orderby时间字段)i,......
  • 力扣26.删除有序数组中的重复项【顺序表】
    ......
  • kafka-exporter
    KafkaExporterpackagedbyBitnamiWhatisKafkaExporter?KafkaexporterforPrometheus.OverviewofKafkaExporterTrademarks:Thissoftwarelistingispac......