首页 > 其他分享 >kafka重置消费位点

kafka重置消费位点

时间:2023-03-16 18:36:32浏览次数:43  
标签:TopicPartition 位点 重置 kafka offset put new partitionInfos consumer

kafka重置消费位点一般分几种情况

  • 重置到最新的消费位点
  • 重置到最早的消费位点
  • 根据时间戳重置消费位点
  • 跟据指定偏移量重置消费位点

基于kafka 2.0.0

package com.realtime.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.realtime.db.KafkaDataSource;
import org.apache.dolphinscheduler.realtime.dto.ConsumerRecordDtoNew;
import org.apache.dolphinscheduler.realtime.dto.KafkaConsumerDtoNew;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;


@Slf4j
public class KafkaConsumer extends KafkaBase {

    private KafkaConsumerDtoNew kafkaConsumerDto;

    private Consumer<String, String> consumer;

    private String consumerGroup;

    public KafkaConsumer(KafkaDataSource kafkaDataSource, KafkaConsumerDtoNew kafkaConsumerDto) {
        super(kafkaDataSource);
        this.kafkaConsumerDto = kafkaConsumerDto;
        if (StringUtils.isNotBlank(kafkaConsumerDto.getGroupId())) {
            this.consumerGroup = kafkaConsumerDto.getGroupId();
        } else {
            this.consumerGroup = "consumer-" + kafkaConsumerDto.getLoginUserName() + "-" + ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSS"));
        }

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaDataSource.getBootstrapServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaDataSource.getKeyDeserializer());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaDataSource.getValueDeserializer());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.consumerGroup);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerDto.getOffset());//earliest,latest
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
        consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
    }

    //跟据指定偏移量重置消费位点
    public void resetOffsetByTopicPartitionOffset(Map<TopicPartition, Long> partitionLongMap) {
        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        if (null != partitionLongMap && partitionLongMap.size() > 0) {
            for (Map.Entry<TopicPartition, Long> p : partitionLongMap.entrySet()) {
                offset.put(p.getKey(), new OffsetAndMetadata(p.getValue()));
            }
        }
        consumer.commitSync(offset);
    }
    //重置到最新的消费位点
    public void resetOffsetToEnd() {
        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(kafkaConsumerDto.getTopic());
        if (null != partitionInfos && partitionInfos.size() > 0) {
            for (PartitionInfo p : partitionInfos) {
                consumer.assign(Collections.singleton(new TopicPartition(p.topic(), p.partition())));
                //移动到最新offset
                consumer.seekToEnd(Collections.singleton(new TopicPartition(p.topic(), p.partition())));
                //移动到最早offset
                //consumer.seekToBeginning(Collections.singleton(new TopicPartition(p.topic(), p.partition())));
                //获取到该分区的last offset
                long position = consumer.position(new TopicPartition(p.topic(), p.partition()));
                offset.put(new TopicPartition(p.topic(), p.partition()), new OffsetAndMetadata(position));
            }
        }
        consumer.commitSync(offset);
    }

    //重置到最早的消费位点
    public void resetOffsetToBeginning() {
        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(kafkaConsumerDto.getTopic());
        if (null != partitionInfos && partitionInfos.size() > 0) {
            for (PartitionInfo p : partitionInfos) {
                consumer.assign(Collections.singleton(new TopicPartition(p.topic(), p.partition())));
                //移动到最新offset
//                consumer.seekToEnd(Collections.singleton(new TopicPartition(p.topic(), p.partition())));
                //移动到最早offset
                consumer.seekToBeginning(Collections.singleton(new TopicPartition(p.topic(), p.partition())));
                //获取到该分区的last offset
                long position = consumer.position(new TopicPartition(p.topic(), p.partition()));
                offset.put(new TopicPartition(p.topic(), p.partition()), new OffsetAndMetadata(position));
            }
        }
        consumer.commitSync(offset);
    }
    //根据时间戳重置消费位点
    public void resetOffsetByTimestamps(long timestampMs) {
        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        /*这两个方法需要绑定使用,否则consumer.assignment()获取的数据为空
        consumer.assign(Arrays.asList(new TopicPartition("t7", 2)));
        Set<TopicPartition> partitionInfos = consumer.assignment();*/
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(kafkaConsumerDto.getTopic());
        if (null != partitionInfos && partitionInfos.size() > 0) {
            Map<TopicPartition, Long> map = new HashMap<>();
            for (PartitionInfo p : partitionInfos) {
                map.put(new TopicPartition(p.topic(), p.partition()), timestampMs);
            }
            Map<TopicPartition, OffsetAndTimestamp> offsetTimestamp = consumer.offsetsForTimes(map);
            for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetTimestamp.entrySet()) {
                TopicPartition key = entry.getKey();
                OffsetAndTimestamp value = entry.getValue();
                //根据消费里的timestamp确定offset
                long position = 0;
                if (value != null) {
                    position = value.offset();
                } else {
                    //当指定时间戳大于最分区最新数据时间戳时,为null
                    consumer.assign(Collections.singleton(key));
                    consumer.seekToEnd(Collections.singleton(key));
                    position = consumer.position(key);
                }
                offset.put(key, new OffsetAndMetadata(position));
                //以下是从指定offset开始消费
                //consumer.seek(entry.getKey(), position);
            }
        }
        consumer.commitSync(offset);
    }

    @Override
    public void close() {
        consumer.close();
    }

}

 

标签:TopicPartition,位点,重置,kafka,offset,put,new,partitionInfos,consumer
From: https://www.cnblogs.com/ttyypjt/p/17223752.html

相关文章

  • Centos7的kafka集群搭建
    CentOS7搭建kafka集群原创 莫问 记录栈 2022-11-1619:49 发表于陕西收录于合集#centos6个#kafka1个#linux9个#kafka集群1个/***@系统:CentOSLinux......
  • kafka消费者不能消费信息
     1、在kafka查看主题__consumer_offsets的情况 $kafka_home/bin/kafka-topics.sh--describe--zookeeperhostname:2181 --topic__consumer_offsets 显示图片的......
  • nginx+lua+openresty+kafka相关问题汇总
    这里使用的是kafka插件是doujiang大佬的https://github.com/doujiang24/lua-resty-kafka,版本为v0.2.0。应用场景在nginx转发中,记录非200请求的信息,遂打算在log_by_lua*中......
  • kafka容器内的server.properties在哪里
    刚刚需要修改kafka的配置文件server.properties,网上都说在config目录下,可我进去kafka容器找了一圈都没有看到该目录; 最后运气好,终于发现了 ......
  • Kafka Rebalance-重平衡
    消费者组ConsumerGroupKafka提供的可扩展且具有容错性的消费者机制共享一个公共的ID,这个ID被称为GroupID。组内的所有消费者协调在一起来消费订阅主题(Subscribe......
  • Kafka为什么性能这么快
    1、页缓存技术pagecacheKafka是基于操作系统的页缓存(pagecache)来实现文件写入的,我们也可以称之为oscache,意思就是操作系统自己管理的缓存。Kafka在写入磁盘文件......
  • linux(wsl2 ubuntu) mariadb重置密码
    可用于不知道默认密码或忘记密码等场景操作环境是WSL2版本ubuntu22停止MariaDB服务 sudoservicemariadbstop2.在不加载授权表的情况下启动MariaDB服务......
  • docker安装kafka
    dockerrun-d--namezookeeper-p2181:2181-tzookeeper:latest  dockerrun-d--namekafka-p9092:9092-eKAFKA_BROKER_ID=0-eKAFKA_ZOOKEEPER_CONNEC......
  • Kafka、RabbitMQ、RocketMQ差异
    消息中间件消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。在电商中,如订单系统处理完订单后,把订......
  • 3.docker 搭建kafka和kafka-manager集群
    1.搭建zookeeper集群docker-compose.yml文件version:'3.1'services:zoo1:image:zookeeperrestart:alwayshostname:zoo1ports:-2181:2181environment:ZOO_M......