首页 > 其他分享 >kafka入门(二): 位移提交

kafka入门(二): 位移提交

时间:2023-11-25 21:22:06浏览次数:33  
标签:ConsumerConfig 入门 kafka 提交 props put public 位移

位移提交:

Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 "偏移量"。

消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集,

因此,需要记录上一次消费时的消费位移,并且持久化。

消费者在消费完消息之后,需要执行消费位移的提交。

自动位移提交:

Kafka默认的消费位移的提交方式是 自动提交。

自动提交,由消费者客户端参数 enable.auto.commit 配置,默认值是 true。

默认的自动提交,是定期提交,提交的周期由 auto.commit.interval.ms 配置,默认是 5s。

自动位移提交,有可能会重复消费和消息丢失。

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那又得从上一次位移提交的地方重新开始消费,这样就会重复消费。

手动位移提交:

手动位移提交,由消费者客户端参数 enable.auto.commit 配置, 设置为 false 就是手动位移提交。

手动位移提交,可以分为 同步提交、异步提交。

commitSync() 同步提交

同步提交,会阻塞消费者线程直到位移提交完成。

示例代码:

public class OffsetCommitSync {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                //do something
            }
            //手动提交位移
            consumer.commitSync();
            System.out.println("手动提交位移成功.");
        }

    }


    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //不自动提交,采用手动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }


}

commitAsync() 异步提交 :

异步提交,在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交,可以使消费者的性能得到一定的增强。

异步提交,将 consumer.commitSync(); 换成 commitAsync。

如果还需要回调,就用 OffsetCommitCallback对象作为参数。

示例如下:

public class OffsetCommitAsyncCallback {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";


    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));


        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                //do something
            }
            //异步回调,如果不需要回调,就采用无参的方法
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                       Exception exception) {
                    if (exception == null) {
                        System.out.println(offsets);
                    } else {
                        log.error("fail to commit offsets {}", offsets, exception);
                    }
                }
            });
        }


    }


    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

}

参考资料:

《深入理解kafka:核心设计与实践原理》

标签:ConsumerConfig,入门,kafka,提交,props,put,public,位移
From: https://www.cnblogs.com/expiator/p/17856119.html

相关文章

  • Kafka生产和消费
    生产,私有topic:[root@gbxhfbin]#exportPATH=$PATH:/data/kafka_2.12-2.8.2/bin;cd/data/kafka_2.12-2.8.2/bin[root@gbxhfbin]#kafka-console-producer.sh--bootstrap-server192.168.1.200:8423,192.168.1.201:8423--topic"RDSP-IPCM-NOTICE"--producer.c......
  • Introducing the kafka producer
    IntroductionAswesawintheprevioussections,Ithinkwehavesomebasicinformationaboutthekafkaproducers,today,letmediginthisconcept. Kafkaproducersaregoingtowritedatatotopicsaremadeofpartitions,nowtheproducersinkafkawil......
  • 工具 | Vshell使用入门
    写在前面   "Vshell是一款go编写的主机群管理工具(RAT)"。    发现Vshell作者团队非常低调,项目Github上Readme介绍非常简短,网上也很少有使用介绍。写个基础入门,记录从配置到主机管理、搭建隧道。本文仅作为工具介绍,请勿用于任何违法场景。    未经授权请勿利用文章......
  • Java零基础入门-数组
    Java零基础入门-数组前言Java是一门面向对象的编程语言,被广泛应用于各个领域。数组是Java编程中最基本也是最重要的数据结构之一,它可以用来存储一组数据,并且方便进行操作和处理。本文将为大家介绍Java数组的基本概念、语法和常见应用场景,帮助初学者快速入门。摘要本文将从以下......
  • Linux下利用Docker快速部署Kafka
    1.摘要Kafka是由Apache软件基金会开发一个开源流处理平台,使用Scala和Java编写,该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个按照分布式事务日志架构的大规模发布/订阅消息队列。这种工作方式使它为企业级基础设施来处理流失数据非常......
  • Jaeger Client Go 链路追踪|入门详解
    目录从何说起Jaeger部署Jaeger从示例了解JaegerClientGo了解trace、spantracer配置Sampler配置Reporter配置分布式系统与span怎么调、怎么传HTTP,跨进程追踪客户端Web服务端Tag、Log和Ref 从何说起之前参加柠檬大佬的训练营(免费白嫖),在大......
  • casbin快速入门:grom适配器管理策略
    一、概述上一讲我们已经讲了基础的策略管理。用配置文件来管理,在大型项目里肯定是不够的。所以这一讲我们讲一下用数据库来进行策略管理,我们选择Mysql。官方网站也给我们推荐了对应的适配器:我们选择Grom这个适配器来进行管理,主要是我们后期使用gin框架,这个比较习惯:二、添加适配器代......
  • Java泛型Generics​入门详解
    Java泛型Generics泛型基础知识泛型:是JDK5中引入的特性,可以在编译阶段约束操作的数据类型,并进行检查。泛型的格式:<数据类型>注意:泛型只能支持引用数据类型。如果我们没有给集合指定类型,默认认为所有的数据类型都是Object类型,此时可以往集合中添加任意的数据类型。带来一个坏处是由于......
  • scrapy的入门
    0,scapy的安装pipinstallscrapy注意安装的过程可能会有一些错误,需要尝试多次解决1,创建工程项目scrapystartprojectdemodemo是项目的名称2,创建爬虫cdbqb项目根目录下执行如下命令scrapygenspiderbqbwww.itcast.combqb表示爬虫的名称www.itcast.com表示爬去的......
  • quickjs入门学习
    由于最近在学习quickjs,把学习过程中遇到的问题和功能验证的过程都记录下来,这篇是quickjs入门学习的目录导航。  本文地址:https://www.cnblogs.com/wunaozai/p/17853962.html......