首页 > 其他分享 >kafka数据检索

kafka数据检索

时间:2023-05-31 23:13:36浏览次数:35  
标签:数据检索 props kafka offset test import consumer

Kafka可以通过消费者组来查找数据。消费者组是一组消费者的集合,它们共同读取一个或多个主题。消费者组可以使用Kafka提供的命令行工具或Kafka API来实现。

使用命令行工具kafka-console-consumer可以查找数据。例如,以下命令可以从名为test的主题中读取消息:

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

这将从test主题的开头开始读取消息,并将它们输出到控制台。

使用Kafka API,可以编写Java或Scala代码来创建消费者并读取数据。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            records.forEach(record -> System.out.println(record.value()));
        }
    }
}

此代码将创建一个消费者,订阅名为test的主题,并在无限循环中读取消息,将它们输出到控制台。

 

总结:查找数据的过程:

第一步:通过offset确定数据保存在哪一个segment里面了,

第二部:查找对应的segment里面的index文件 。index文件都是key/value对的。key表示数据在log文件里面的顺序是第几条。value记录了这一条数据在全局的标号。如果能够直接找到对应的offset直接去获取对应的数据即可

如果index文件里面没有存储offset,就会查找offset最近的那一个offset,例如查找offset为7的数据找不到,那么就会去查找offset为6对应的数据,找到之后,再取下一条数据就是offset为7的数据

 

index文件名加偏移量可以找到log文件里面的偏移量,通过log文件里面的偏移量可以找到这条信息的位置。

 

标签:数据检索,props,kafka,offset,test,import,consumer
From: https://www.cnblogs.com/zqlmianshi/p/17447601.html

相关文章

  • kafka单独集群搭建
     查看kafka配置下面配置是由ambari配置生成。catconf/server.properties#GeneratedbyApacheAmbari.TueOct2510:40:072022auto.create.topics.enable=trueauto.leader.rebalance.enable=truecompression.type=producercontrolled.shutdown.enable=truecontro......
  • SpringBoot集成kafka全面实战
    一、前戏1、在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中IP为公网IP),advertised.listeners=PLAINTEXT://112.126.74.249:90922、在开始前我们先创建两个topic:topic1、topic2,其分区和副本数都设置为2,用来测试,[root@iZ2zegzlkedbo3e64vkbefZ~]#cd/......
  • kafka消费端可以用预提交的方式进行精确消费
      为了避免重复消费:在服务器的数据库端记录一个状态,这个状态标志着这条消息被正确消费了。如果在向kafka提交commit之前服务器崩掉了,再次启动时服务器读取这个状态,如果是这个消息被正确消费过的,就把指针指向下一条数据。 ......
  • kafka的leader,follow,offset
     1.partition分为主从。2.当需要严格顺序时(比如秒杀场景),每个topic里面只能有一个partition,这样可以严格保证顺序。虽然多个partition时也可以保证partition内部是顺序执行的,但是不能保证整体是顺序执行的。3.同一个partition只能由一个消费者。就像一个椅子里只能坐一个人,咋......
  • dp-runtime去Kafka依赖方案
    背景现有原生kafkaconnectruntime,在客户环境运行遇到诸多问题,问题列表如下:强依赖Kafka集群做任务分配、connector配置信息、connector状态管理、source进度维护等等当遇到数据量大、并行数多,topic数量较多时,可能引发kakfa集群的不稳定包括(节点宕机,controller切换等)从而引......
  • 去kafka依赖runtime版本梳理
    背景xxx数据同步产品,在客户环境长期运行过程中,发现runtime主要存在以下问题•当前架构下,worker集群管理依赖kafka,kafka同时承担任务分配协调和数据缓冲二项职责,当Kafka作为数据缓存不稳定,这二项工作相互干扰导致worker集群不稳定解决方案•为了解决上述的问题,需要重构现有的......
  • kafka集群是如何选择leader,你知道吗?
    前言kafka集群是由多个broker节点组成,这里面包含了许多的知识点,以下的这些问题你都知道吗?你知道topic的分区leader是怎么选举的吗?你知道zookeeper中存储了kafka的什么信息吗?起到什么做呢?你知道kafka消息文件是怎么存储的吗?如果kafka中leader节点或者follower节点发生故障,消......
  • 详解大数据中必不可少的消息中间件 kafka(3.x 新版本)
    楔子本次来聊一聊kafka,相信大家都知道它是一个应用于大数据实时领域、基于发布/订阅模式的分布式消息中间件(或者说消息队列),能够和不同的进程进行通信,从而实现上下游之间的消息传递。有了消息队列之后,上游服务和下游服务就无需直接通信了,上游服务将消息发送到队列中,下游从队列中......
  • kafka安装
    下载下载kafka二进制文件源码不可用于启动,会报错,无法找到kafka.kafka类配置server配置文件config/server.properties#每一个broker的唯一标识broker.id=0#kafka数据目录#Acommaseparatedlistofdirectoriesunderwhichtostorelogfileslog.dirs=/usr/loca......
  • 查看kafka指定位置offset消息
    packagecom.infinitus.cdc.test;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.consumer.OffsetAndM......