首页 > 其他分享 >为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

时间:2023-08-15 10:02:37浏览次数:35  
标签:group 消费者 分区 commitSync 偏移量 record 提交

(目录)

一、为什么需要带有 subscribe 的 group.id

  • 消费概念: Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以 group 参数是强制性的,如果没有组,Kafka 将不知道如何对待订阅同一主题的其他消费者。
  • 偏移量: 每当我们启动一个消费者时,它都会加入一个消费者组,然后根据该消费者组中的其他消费者数量,为其分配要读取的分区。对于这些分区,它会检查列表读取偏移量是否已知,如果找到,它将从这一点开始读取消息。如果没有找到偏移量,则参数 auto.offset.reset 控制是从分区中最早的消息还是从最新的消息开始读取。

二、我们需要使用commitSync手动提交偏移量吗?

  • 是否需要手动提交偏移? 是否需要提交偏移量取决于作为参数 enable.auto.commit 选择的值。默认情况下,此设置为 true,这意味着消费者将定期自动提交其偏移量(由auto.commit.interval.ms 决定提交的频率)。如果将其设置为 false,那么将需要自己提交偏移量。这种默认行为可能也是导致很多发现 kafka 总是从最新的开始消费的原因,由于偏移量是自动提交的,因此它将使用该偏移量。

  • 有没有办法从头开始重播消息? 如果想每次都从头开始读取,可以调用seekToBeginning,如果不带参数调用,它将重置为所有订阅分区中的第一条消息,或者仅重置您传入的那些分区。

  • seekToBeginning 查找每个给定分区的第一个偏移量。poll(long) 该函数延迟计算,仅在调用或时才查找所有分区中的第一个偏移量position(TopicPartition)。如果未提供分区,则查找所有当前分配的分区的第一个偏移量。

    public class MyListener implements ConsumerSeekAware {
    
    ...
    
      @Override
      public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
          callback.seekToBeginning(assignments.keySet());
      }
    
    }
    
  • 有没有办法从最后开始重播消息? 有的,可以使用 seekToEnd() 查找所有分配的分区到最后。或者使用 seekToTimestamp(long time)- 查找所有分配的分区到该时间戳表示的偏移量。

    public class MyListener extends AbstractConsumerSeekAware {
    
      @KafkaListener(...)
      void listn(...) {
          ...
      }
    }
    
    public class SomeOtherBean {
    
      MyListener listener;
    
      ...
    
      void someMethod() {
          this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
      }
    
    }
    

三、如果我想手动提交偏移量,该怎么做?

  • 1、禁用自动提交

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
  • 提交方法 对于手动提交,KafkaConsumers提供了两种方法,即 commitSync() 和 commitAsync()。commitSync()是一个阻塞调用,在偏移量成功提交后返回,commitAsync()则立即返回。如果想知道提交是否成功,可以为回调处理程序 ( OffsetCommitCallback) 提供一个方法参数。请注意,在两次提交调用中,消费者都会提交最新poll()调用的偏移量。 举个例子:假设一个分区主题有一个消费者并且最后一次调用poll()返回偏移量为 4、5、6 的消息。提交时,偏移量 6 将被提交,因为这是消费者客户端跟踪的最新偏移量。 同时,commitSync() 和 commitAsync() 都允许更多地控制我们想要提交的偏移量:如果你使用允许你指定的相应重载,那么Map<TopicPartition, OffsetAndMetadata>消费者将仅提交指定的偏移量(即,映射可以包含分配的分区的任何子集) ,并且指定的偏移量可以为任意值)。

  • 同步提交: 阻塞线程,直到提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)

    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
          System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
          consumer.commitSync();
      }
    }
    

    对于 for 循环中的每次迭代,只有在consumer.commitSync()成功返回或因抛出异常而中断后,代码才会移至下一次迭代。

  • 异步提交: 是一种非阻塞方法。调用它不会阻塞线程。相反,它将继续处理以下指令,无论最终是成功还是失败。

    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
          System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
          consumer.commitAsync(callback);
      }
    }
    

    对于 for 循环中的每次迭代,无论consumer.commitAsync()最终会发生什么,代码都会移至下一次迭代。并且,提交的结果将由定义的回调函数处理。

  • 权衡:延迟与数据一致性 1、如果必须确保数据一致性,请选择commitSync(),因为它将确保在执行任何进一步操作之前,你将知道偏移量提交是成功还是失败。但由于它是同步和阻塞的,你将花费更多的时间来等待提交完成,这会导致高延迟。 2、如果可以接受某些数据不一致并希望具有低延迟,请选择commitAsync(),因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时代码将继续执行。

标签:group,消费者,分区,commitSync,偏移量,record,提交
From: https://blog.51cto.com/TiMi/7085378

相关文章

  • Netty源码学习1——NioEventLoopGroup的初始化
    系列文章目录和关于我零丶引入netty源码学习中,大家maybe都接触到如下的helloworld——netty客户端启动的demo:映入眼帘的第一个类就是NioEventLoopGroup,很多文章上来就是是Netty中的核心类,啥Channel,Pipeline,Context,Boostrap一通劈里啪啦,我看起来比较费劲。so本文不会上来就给......
  • mysql在开启group_replication后,状态显示为RECOVERING,告警日志报错MY-013117、MY-0115
    问题描述:mysql在开启group_replication后,状态显示为RECOVERING,告警日志报错MY-013117、MY-011582、MY-011583,如下所示:数据库:MySQL8.0.27系统:rhel7.364位1、问题重现Slave02[(none)]>select*fromperformance_schema.replication_group_members;+-----------------------......
  • mysql在开启group_replication后,报错ERROR 3092,This member has more executed transa
    问题描述:mysql在开启group_replication后,报错ERROR3092,Thismemberhasmoreexecutedtransactionsthanthosepresentinthegroup,如下所示:数据库:MySQL8.0.27系统:rhel7.31、异常重现Slave01[(none)]>startgroup_replication;ERROR3092(HY000):Theserverisnotc......
  • mysql在安装group_replication插件时,报错"version libcrypto.so.10 not defined in fi
    问题描述:mysql在安装group_replication插件时,报错"versionlibcrypto.so.10notdefinedinfilelibcrypto.so",如下所示:数据库:mysql8.0.27系统:rhel7.364位1、异常重现mysql>installplugingroup_replicationsoname'group_replication.so';ERROR1126(HY000):......
  • TheOpenGroup APAC 2023年度大奖来袭,14个奖项申报正式开启!
    作为亚太地区最突出、最负盛名的奖项之一,TheOpenGroupAPAC年度大奖始终秉承“探索无边界信息流™”的愿景,旨在基于开放架构标准和开源软件在应用和进步方面的影响力、领导力、卓越性与创新性,表彰与奖励那些为推动开放架构标准的发展而努力的组织和个人。TheOpenGroupAPAC年......
  • Cypher中的group by功能实现
    Cypher语言并没有原生的 GROUPBY 关键字,但聚合函数(例如 COUNT)隐含地引入了分组。 https://neo4j.com/docs/cypher-manual/current/functions/aggregating/#grouping-key-examples聚合函数采用一组值并计算它们的聚合值。可以对所有匹配路径进行聚合计算,也可以通过引入分......
  • SQL 中 select 和 group by 中数据的相互约束关系
    前提本文的前提是mysql的sql_mode中含有ONLY_FULL_GROUP_BY。如果不含有ONLY_FULL_GROUP_BY,那么就没有本文后续说的限制。可以使用下面这条sql查看。SHOWVARIABLESLIKE'sql_mode';--输出sql_mode|ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZER......
  • Sql去重查询数据,存在部分字段相同 max(id)  和 group by配合使用
    Sql去重查询数据原文链接:https://blog.51cto.com/u_15910936/5932613最近在工作过程中,面试过程中, 部分求职者或者同事,对sql怎么去重查询,不是太熟练今天下午忙里偷闲,整理了一下 其实sql基本的查询,还是蛮有意思,  下面是我大致整理的几种去重查询 1.存在2条一样的数据, 使......
  • sql group by 加条件
    在SQL中,可以在GROUPBY子句中加入条件,以进一步过滤结果。你可以使用HAVING子句来添加条件。HAVING子句的使用方式类似于WHERE子句,但不同的是,它用于对GROUPBY子句生成的分组进行过滤。以下是一个示例,演示如何在GROUPBY子句中加入条件:SELECTcolumn1,column2,aggregate_function(......
  • SQL Server实现mysql中的group_concat功能
    mysql中的group_concat函数的功能将groupby产生的同一个分组中的值连接起来,返回一个字符串结果。group_concat函数首先根据groupby指定的列进行分组,将同一组的列显示出来,并且用分隔符分隔。由函数参数(字段名)selectgroup_concat(emp_name)fromemp;语法:group_concat([distin......