首页 > 其他分享 >kafka重置偏移量

kafka重置偏移量

时间:2024-03-22 18:22:05浏览次数:19  
标签:TopicPartition -- 重置 偏移量 kafka topic offset 位移

背景

某些时候,kafka上游生产者生产的消息有错误,或者下游消费者并不需要消费某部分的数据,这时候,通常有两个解决方案,一种是对数据做不解析处理,直接略过。另一种就是暂时关掉kafka的消费者组,等到生产者正常后再进行消费,但由于kafka本身是默认断点续传的,此时就需要我们先重置kafka中当前kafka组的offset。

解决方案

更改消费者组

由于kafka对某topic中offset的管理是以组的形式来进行的,因此,在新建或更改消费者组后,对于offset的管理也会重新开始,策略取决于配置的auto.offset.reset参数

在重启动时指定起始offset

在再次启动时,通过配置指定要消费topic中分区的offset
@KafkaListener(groupId = "topic_group_test",topicPartitions = { @TopicPartition(topic = "topic_test",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "9830")) })
java springboot版本

通过kafka服务端脚本指定重置

kafka-consumer-groups.sh --bootstrap-server 10.202.13.27:9092 \ --group cjw --reset-offsets --topic cjw-test --to-earliest --execute
具体支持8种操作
--to-earliest
--to-latest
--to-current
--to-offset
--shift-by N: 把位移调整到当前位移+N 处,N可以为负数
--to-datetime : 把位移调整到大于给定时间的最早位移处,datetime格式yyyy-MM-ddTHH:mm:ss.xxx
--by-duration:把位移调整到距离当前时间指定间隔的位移处,格式为PnDTnHnMnS
--from-file:从CSV文件中读取调整策略

通过API代码来指定

consumer.seekToEnd( consumer.partitionsFor(topic).stream().map(partitionInfo-> new TopicPartition(topic,partitionInfo.partition())) .collect(Collectors.toList()) );
void seek(TopicPartition partition, long offset);
Void seek(TopicPartition partition,OffsetAndMetadata offsetAndMetadata);
Void seekToBeginning(Collection partitions)
Void seekToEnd(Collection partitions)

注意

以上所有操作都需要在消费者组处于未激活的情况下进行
使用代码方式时,需要指定所有分区的消费策略

标签:TopicPartition,--,重置,偏移量,kafka,topic,offset,位移
From: https://www.cnblogs.com/BlackAndBrown/p/18090233

相关文章

  • Debezium vs OGG vs Tapdata:如何实时同步 Oracle 数据到 Kafka 消息队列?
    随着信息时代的蓬勃发展,企业对实时数据处理的需求逐渐成为推动业务创新和发展的重要驱动力。在这个快速变化的环境中,许多企业选择将Oracle数据库同步到Kafka,以满足日益增长的实时数据处理需求。本文将深入探讨这一趋势的背后原因,并通过一个真实的客户案例来强调实时性在业务场......
  • Go操作Kafka
    目录一、Go操作之kafka二、sarama1.下载及安装2.注意事项三、连接使用kafka1.发送消息2.消费消息一、Go操作之kafkaKafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了......
  • python操作kafka
    目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细节2.1生产者demo2.2消费者demo2.3消费者(消费群组)2.4消费者(读取目前最早可读的消息)2.5消费者(手动设置偏移量)2.6消费者(订阅多个主题)......
  • Kafka集群部署
    目录Kafka集群部署1.1服务器资源1.1.1安装JDK(所有设备)1.1.2配置ip和主机名映射(所有服务器)(可不做)1.1.3配置主机名(所有设备)(可不做)1.2在node1上安装、配置kafka1.2.1安装kafka1.2.2修改配置文件1.2.2.1修改zookeeper.properties1.2.2.2配置Zookeeper的id1.2.2.3......
  • 从零开始学Spring Boot系列-集成Kafka
    Kafka简介ApacheKafka是一个开源的分布式流处理平台,由LinkedIn公司开发和维护,后来捐赠给了Apache软件基金会。Kafka主要用于构建实时数据管道和流应用。它类似于一个分布式、高吞吐量的发布-订阅消息系统,可以处理消费者网站的所有动作流数据。这种动作流数据包括页面浏览、搜索......
  • Zookeeper+Kafka单节点部署
    一.Zookeeper部署启动1.下载zookeeperApacheZooKeeper(http://zookeeper.apache.org/releases.html) 2.文件上传通过Xftp软件上传到Linux系统中若在外部解压可直接传入相应的文件地址,若未解压则传入压缩包在Linux系统中进行解压操作tar-zxvfzookeeper-3.9.2.tar.gz......
  • Windows密码重置工具 有几款
    Windows密码重置工具:PCUnlocker:PCUnlocker是一款功能强大的Windows密码重置工具,可以帮助用户重置本地Windows密码或Microsoft账户密码。它支持Windows11/10/8/7/Vista/XP/2019/2016/2012/2008/2003/2000等操作系统。PCUnlocker是一款专业的Windows密码重置工具,它允许用户......
  • kafka-按键分区
    配置#自定义分区#partitioner:#class:com.dfree.data.config.PartitionerByKey代码//publicclassPartitionerByKeyimplementsPartitioner{//@Override//publicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalu......
  • kafka面试题 1
    kafka面试题1简介kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使你能够将消息从一个端点传递到另一个端点,kafka适合离线和在线消息消费,kafka消息保留在磁盘上,并在集群内复制以防止数据丢失,kafka构建在zookeeper同步服务上,他与ApacheStorm......
  • 面试官:Kafka和ES选主有什么区别?
    Kafka和ES都是用来处理大数据的中间件,一个是消息中间件的代表(Kafka),另一个是大数据搜索引擎的代表(ES)。它们在Java领域的使用非常广泛,在大数据方面就更不用说了,但它们的选主(选择主节点)有什么关联与区别呢?接下来,我们一起来看。1.基础概念1.1什么是Kafka?Kafka是一个分布式......