首页 > 其他分享 >kafka的漏消费和重复消费问题

kafka的漏消费和重复消费问题

时间:2022-10-25 21:00:13浏览次数:68  
标签:副本 重复 ISR kafka 重试 消费 消息 提交 leader

三种消息语义及场景

image.png

kafka如何做到消息不丢失?

具体需要Producer端,Broker端,Consumer都做一些工作才能保证消息一定被消费,即,

  1. 生产者不少生产消息;
  2. 服务端不丢失消息;
  3. 消费者也不能少消费消息。

生产者不少生产消息

使用带回调的发送消息的方法。

如果消息没有发送成功,那么Producer会按照配置的重试规则进行重试,如果重试次数用光后,还是消息发送失败,那么kafka会将异常信息通过回调的形式带给我们,这时,我们可以将没有发送成功的消息进行持久化,做后续的补偿处理。

  kafkaProducer.send(new ProducerRecord<>("foo", "bar"), new Callback() { @Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception != null) {

// todo 处理发送失败的消息

}

}

});

  1. 复制代码
  1. 配置可靠性参数

    2.1 配置 acks = -1

    • acks=0,表示生产者不等待任何服务器节点的响应,只要发送消息就认为成功。
    • acks=1,表示生产者收到 leader 分区的响应就认为发送成功。
    • acks=-1,表示只有当 ISR 中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果 leader 副本挂了,当 follower 副本被选为 leader 副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息后才能再次发送消息。

    2.2 配置 retries = 3

    参数 retries 表示生产者生产消息的重试次数,这里的3属于一个建议值,如果重试次数超过3次后,消息还是没有发送成功,可以根据自己的业务场景对发送失败的消息进行额外处理,比如持久化到磁盘,等待服务正常后进行补偿。 2.3 配置 retry.backoff.ms=300

    参数retry.backoff.ms 表示重试的时间间隔,单位是毫秒,300ms是一个建议值,如果配置的时间间隔太短,服务可能仍然处于不可用状态。

服务端不丢失消息

  1. 配置 replication.factor > 1

    参数replication.factor表示在服务端的分区副本数,配置 > 1后,即使分区的leader挂掉,其他follower被选中为leader也会正常处理消息。

  2. 配置 min.insync.replicas > 1

    min.insync.replicas 指的是 ISR 最少的副本数量,原理同上,也需要大于 1 的副本数量来保证消息不丢失。

简单介绍下 ISR。ISR 是一个分区副本的集合,每个分区都有自己的一个 ISR 集合。但不是所有的副本都会在这个集合里,首先 leader 副本是在 ISR 集合里的,如果一个 follower 副本的消息没落后 leader 副本太长时间,这个 follower 副本也在 ISR 集合里;可是如果有一个 follower 副本落后 leader 副本太长时间,就会从 ISR 集合里被淘汰出去。也就是说,ISR 里的副本数量是小于或等于分区的副本数量的.

  1. 确保 replication.factor > min.insync.replicas。

    如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

  2. 配置 unclean.leader.election.enable = false

    unclean.leader.election.enable 指是否能把非 ISR 集合中的副本选举为 leader 副本。unclean.leader.election.enable = true,也就是说允许非 ISR 集合中的 follower 副本成为 leader 副本。因为非ISR集合中的副本消息可能已经落后leader消息很长时间,数据不完整,如果被选中作为leader副本,可能导致消息丢失。

消费者不少消费消息

  1. 手动提交消息

    1.1 配置 enable.auto.commit=false

    enable.auto.commit 这个参数表示是否自动提交,设置成false后,将消息提交的权利交给开发人员。因为设置自动提交后,消费端可能由于消息消费失败,但是却自动提交,导致消息丢失问题。 1.2 手动提交消息的正确方式 先处理消息,后提交offset,代码如下:

 

kafkaConsumer.subscribe(Collections.singleton("foo"));

try {
    1. new Thread(() -> {
    2. while (true) {
    3. ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100));
    4. handlerRecord(records);
    5. kafkaConsumer.commitSync();
    6. }
    7. }).start();
    8. } catch (Exception e) {
    9. errHandler(e);
    10. }复制代码

    但是这种情况可能会导致消息已经消费成功,但是提交offset的时候,consumer突然宕机,导致消息提交失败,等到consumer重启后,可能还会收到已经成功处理过的消息,消费了重复的消息,所以手动提交消息需要做一些幂等性的措施。


  • 消息不重复

  • 生产端不重复生产消息

由于网络原因,Producer端对消息进行了重试,但是,Broker端可能之前已经收到了消息,这样就导致broker端收到了重复的消息。

kafka在0.11.0 版本后,给每个Producer端分配了一个唯一的ID,每条消息中也会携带一个序列号,这样服务端便可以对消息进行去重,但是如果是两个Producer生产了两条相同的消息,那么kafka无法对消息进行去重,所以我们可以在消息头中自定义一个唯一的消息ID然后在consumer端对消息进行手动去重。

  • 消费端不重复消费消息

由于为了保证不少消费消息,配置了手动提交,由于处理消息期间,其他consumer的加入,进行了重平衡,或者consumer提交消息失败,进而导致接收到了重复的消息。

我们可以通过自定义唯一消息ID对消息进行过滤去重重复的消息。

标签:副本,重复,ISR,kafka,重试,消费,消息,提交,leader
From: https://www.cnblogs.com/lsp520/p/16826303.html

相关文章

  • #yyds干货盘点# LeetCode 腾讯精选练习 50 题:删除有序数组中的重复项
    题目:给你一个升序排列的数组nums,请你原地删除重复出现的元素,使每个元素只出现一次,返回删除后数组的新长度。元素的相对顺序应该保持一致。由于在某些语言中不能......
  • SQL中如何删除重复数据,只保留其中一行
    SQL专栏SQL数据库基础知识汇总SQL数据库高级知识汇总需求分析数据库中存在重复记录,删除保留其中一条(是否重复判断基准为多个字段)解决方案碰到这样的问题我们先分解步......
  • C# BlockingCollection(生产者/消费者)练习
    最近接手同事C#代码,(话说我一个C++程序员干嘛做这种事情,可能我是韭菜吧),为了能够承接,特地去拜读了《C#图解教程》,入门还是不错的。写代码嘛,异步编程肯定少不了,下面就是利用C......
  • 用SQL语句,删除掉重复项只保留一条
    在几千条记录里,存在着些相同的记录,如何能用SQL语句,删除掉重复的呢1、查找表中多余的重复记录,重复记录是根据单个字段(peopleId)来判断select*frompeoplewherepeopl......
  • Kafka Server之kafka-console-consumer.bat
    KafkaServer之kafka-console-consumer.bat注意:博主使用kafka版本:kafka_2.12-3.3.1.tgzwindows版一、订阅主题全部消息(注意:Producer已经生产:0~4999共5000条消息)在k......
  • kafka问题总结
    这篇文章主要记录自己遇到和在网上看到的一些关于kafka的相关问题。问题1:客户端和服务端版本不一致造成的消息发送延迟高现象kafka客户端支持多语言api,这里只关......
  • kafka与eventing
    项目地址https://strimzi.io/quickstarts/https://github.com/strimzi/strimzi-kafka-operator/tree/0.31.1/examples/kafka部署ClusterRole和CRDkubectlcreate-f'h......
  • 1655. 分配重复整数
    题目描述给了一个数组nums,数组中最多有50个不同的值再给了m个顾客的订单数组,数组元素ei的含义是第i个顾客需要有ei个相同的整数问给的nums能不能满足以上所有顾客的要求......
  • Kafka工具:Offset Explorer
    Kafka工具:OffsetExplorer一、OffsetExplorer安装offsetexplorer_64bit.exeWindowsx64版本下载链接安装方式:选择默认安装(即全部默认下一步)官网链接:https://www.kafk......
  • CF 254A(重复的数)
    A.数字卡片timelimitpertestmemorylimitpertestinputoutput2n张卡片编......