首页 > 其他分享 >查看kafka指定位置offset消息

查看kafka指定位置offset消息

时间:2023-05-26 11:46:57浏览次数:40  
标签:查看 props kafka apache offset put import consumer

package com.infinitus.cdc.test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * .
 *
 * @author levi
 * @version 1.0
 * @date 2023/5/26 10:50
 **/
public class KafkaOffsetTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //调用返回的记录数
        props.put("max.poll.records", 100);
//        props.put("enable.auto.commit", "true");
//        props.put("auto.commit.interval.ms", "1000");
        //可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)
        props.put("auto.offset.reset", "earliest");
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "127.0.0.1:9093");
        //设置消费者组名称
        props.put("group.id", KafkaOffsetTest.class.getSimpleName());

        //从哪个offset和partition开始
        String offsetn = "10";
        int partiton = 1;
        String topicName = "topic_name_1";
        KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
        consumer.assign(Arrays.asList(new TopicPartition(topicName, partiton)));
//        consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, partiton)));//不改变当前offset
       consumer.seek(new TopicPartition(topicName, partiton), Long.parseLong(offsetn));//不改变当前offset

        while (true) {
            ConsumerRecords<String, Object> records = consumer.poll(100);
            for (ConsumerRecord<String, Object> record : records){
                System.out.println(record.toString());
                // 在这里打断点
                System.out.println(record.toString());
            }
        }

    }

}

标签:查看,props,kafka,apache,offset,put,import,consumer
From: https://www.cnblogs.com/levi125/p/17434319.html

相关文章

  • Kafka实时数据即席查询应用与实践
    作者:vivo互联网搜索团队-DengJieKafka中的实时数据是以Topic的概念进行分类存储,而Topic的数据是有一定时效性的,比如保存24小时、36小时、48小时等。而在定位一些实时数据的Case时,如果没有对实时数据进行历史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。一、背景Ka......
  • 使用linux安装kafka
    以下是在Linux上安装Kafka的详细步骤:下载Kafka二进制文件在Kafka的官方网站(http://kafka.apache.org/downloads)上下载最新版本的Kafka二进制文件。解压缩下载的文件在终端中进入下载目录,使用以下命令解压缩下载的文件:tar-xzfkafka_<version>.tgz其中,<version>应替换为下载的Kaf......
  • Kafka实时数据即席查询应用与实践
    作者:vivo互联网搜索团队-DengJie Kafka中的实时数据是以Topic的概念进行分类存储,而Topic的数据是有一定时效性的,比如保存24小时、36小时、48小时等。而在定位一些实时数据的Case时,如果没有对实时数据进行历史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。一......
  • kafka
    broker一般指服务器资源概念理解topic:逻辑概念,用于联系Producer和Consumer的message生产和消费。Producer生产的消息放入一个topic中,由Consumer通过对同一个topic的订阅进行消费 broker:物理资源,一般一个broker指底层的一台物理服务器。 partition:逻辑分区存储,用于将......
  • Ubuntu下查看IP地址
    1.默认安装不能使用ifconfig命令进行查看IP地址,使用命令: sudoapt-getinstallnet-tools进行安装;2.安装完毕后即可使用ifconfig命令,如下图,红色矩形框中的即为IP地址:......
  • Linux查看端口被那个进程占用
    netstat-antunlp|grep1521......
  • 查看Linux磁盘文件占用大小
    1、查看磁盘占用(KB)df-lk2、查看磁盘占用(GB)df-hl3、查看每个目录占用大小:du--max-depth=1-h4、查看文件的大小ls-lht......
  • Kafka常用命令之kafka-console-consumer.sh
    ./kafka-console-consumer.sh--bootstrap-serverlocalhost:2181--topictestTopic--from-beginning注意:Kafka从2.2版本开始将kafka-topic.sh脚本中的−−zookeeper参数标注为“过时”,推荐使用−−bootstrap-server参数。若读者依旧使用的是2.1及以下版本,请将下述......
  • IDEA下查看Java字节码(插件ByteCode Viewer)
    安装jclasslibbytecodeviewer插件 使用结果......
  • 如何查看windows某个目录下所有文件/文件夹的大小
    下载 (TreeSizeFree) 软件链接:https://pan.baidu.com/s/1tgJTvmQWoZ2qvC7kzSO9MQ提取码:gewv ......