首页 > 其他分享 >kafka从指定位置消费

kafka从指定位置消费

时间:2022-10-09 12:03:18浏览次数:58  
标签:消费 group auto 位置 指定 kafka offset consumer id


消费者消费方式

1、KafkaConsumer.subscribe():为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。

2、KafkaConsumer.assign():为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与指定的group无效(this method does not use the consumer’s group management)。

注意:consumer.assign()是不会被消费者的组管理功能管理的,他相对于是一个临时的,不会改变当前group.id的offset,比如:在使用consumer.subscribe(Arrays.asList(topicName));时offset为20,如果再通过assign方式已经获取了消息后,在下次通过consumer.subscribe(Arrays.asList(topicName));来获取消息时offset还是20,还是会获取20以后的消息。

3、我们还可以配置如下属性auto.offset.reset来,设置消费者从分区的开头或者末尾进行消费数据。当然这也是有条件的。

 //一般配置earliest 或者latest 值
props.put("auto.offset.reset", "latest");

我把上述三种情况的消费者不同使用方式下,消费者提交offset的情况进行了归总和说明:

kafka从指定位置消费_大数据

注意:

只要不更改group.id,每次重新消费kafka,都是从上次消费结束的地方继续开始,不论"auto.offset.reset”属性设置的是什么

场景一:Kafka上在实时被灌入数据,但kafka上已经积累了两天的数据,如何从最新的offset开始消费?

(最新指相对于当前系统时间最新)

1.将group.id换成新的名字(相当于加入新的消费组)

2.网上文章写还要设置 properties.setProperty("auto.offset.reset", "latest”)

实验发现即使不设置这个,只要group.id是全新的,就会从最新的的offset开始消费

场景二:kafka在实时在灌入数据,kafka上已经积累了两天的数据,如何从两天前最开始的位置消费?

1.将group.id换成新的名字

2.properties.setProperty("auto.offset.reset", "earliest”)

场景三:不更改group.id,只是添加了properties.setProperty("auto.offset.reset", "earliest”),consumer会从两天前最开始的位置消费吗?

不会,只要不更改消费组,只会从上次消费结束的地方继续消费

场景四:不更改group.id,只是添加了properties.setProperty("auto.offset.reset", "latest”),consumer会从距离现在最近的位置消费吗?

不会,只要不更改消费组,只会从上次消费结束的地方继续消费

应用:

正式打包上线前应该使用新的group.id,以便于从kafka最新的位置开始消费

只要将group.id换成全新的,不论"auto.offset.reset”是否设置,设置成什么,都会从最新的位置开始消费

标签:消费,group,auto,位置,指定,kafka,offset,consumer,id
From: https://blog.51cto.com/u_11334685/5740082

相关文章

  • kafka消费者学习
    转自:https://www.cnblogs.com/cxuanBlog/p/11949238.html1.介绍  Kafka消费者从属于消费者群组。一个群组中的消费者订阅的都是相同的主题,每个消费者接收主题一部分分......
  • kafka的缺陷
    转自:https://mp.weixin.qq.com/s/_RIvZwK1sJJP8xnUDyAk1Q1.broker和partition的问题全量复制的问题。  2.缓存页写回可能丢失 3.进阶 ......
  • kafka的生产者学习
    转自:https://www.cnblogs.com/cxuanBlog/p/11949238.html1.流程介绍图1大概流程:创建一个ProducerRecord 对象,它是Kafka中的一个核心类,由记录要发送到的主题名称(T......
  • msf-地理位置获取
    相关内容一、GPS简介物理位置定位:根据IP的定位不准确,容易被欺骗,网上有很多IP伪造技术,所以定位肯定也不准确。GPS全球定位系统:(使用最广泛)GPS是英文GlobalPositioningSyst......
  • 动手开发一个有用的 ABAP ALV 工具 - 查看指定用户的 ABAP 传输请求,模拟 SE10 事物码
    我们知道ABAP系统里有一个有用的工具,事物码SE10,输入用户名称,可以查看该用户在本系统上创建的传输请求(TransportRequest)列表:点击Display按钮,能看到用户名WANGJER......
  • CentOS 7 离线安装指定版本docker
    这里以docker-ce-18.06版本为例第一步:下载指定版本docker安装包wget--no-check-certificatehttps://mirrors.tuna.tsinghua.edu.cn/docker-ce/linux/centos/7.9/x86_64......
  • 日志从Kafka到Loki的N种方式​
    最近群里有小伙伴有说到自己的日志存储路径先是从客户端到Kafka,再通过消费kafka到ElasticSearch。现在要将ES换成Loki面临需要同时支持Kafka和Loki插件的工具。小白查了下当......
  • Linux CMake 指定gcc编译版本
    背景:无root下手动升级gcc版本为5.5之后,但是由于默认目录/usr/bin下的gcc是4.8.5,在cmake默认使用老版本的gcc,导致cmake失败。解决方案:注意!将下面的yourpath替换成新的gc......
  • 【bug】kafka:WARN Connection to node -1 could not be established. Broker may not
    bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--describe--groupgroup1报错Note:ThiswillnotshowinformationaboutoldZookeeper-basedcons......
  • fast-data-dev 快速基于容器的kafka 环境
    fast-data-dev是由lensesio团队提供的一个快速部署kafka测试环境的工具包含的组件kafkazkschemaregistrykafkaconnect以及其他不少的connector参考资料​​https://git......