首页 > 其他分享 >kafka复习:(10)按分区获取ConsumerRecord

kafka复习:(10)按分区获取ConsumerRecord

时间:2023-10-31 11:32:43浏览次数:33  
标签:ConsumerConfig 10 properties kafka org apache import ConsumerRecord


package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
/*
消费某个topic的消息,使用records方法按分区获取消息,然后处理
 */

public class KafkaTest07 {

    private static Properties getProperties(){
        Properties properties=new Properties();

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup7");
        return properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());
        myConsumer.subscribe(Arrays.asList("student"));

        while(true){
            ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
            Set<TopicPartition> partitions = consumerRecords.partitions();


            for(TopicPartition tp: partitions){
                for(ConsumerRecord record: consumerRecords.records(tp)){
                    System.out.println(record.value());
                    System.out.println("record offset is: "+record.offset());
                }
            }

        }



    }
}


标签:ConsumerConfig,10,properties,kafka,org,apache,import,ConsumerRecord
From: https://blog.51cto.com/amadeusliu/8102883

相关文章

  • kafka复习:(8)消费某个主题指定分区的消息
    packagecom.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka......
  • 10月31日线程基础
    目录线程基础线程的概念如果把操作系统当成一个工厂进程之间是竞争关系,线程之间是什么关系纠正概念进程与线程的区别是什么?线程基础线程的概念在操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程线程顾名思义,就是一条流水线工作的过程,可以这么想一条流水线必须属于......
  • 【NO.98】LeetCode HOT 100—621. 任务调度器
    题目:621.任务调度器给你一个用字符数组tasks表示的CPU需要执行的任务列表。其中每个字母表示一种不同种类的任务。任务可以以任意顺序执行,并且每个任务都可以在1个单位时间内执行完。在任何一个单位时间,CPU可以完成一个任务,或者处于待命状态。然而,两个相同种类的任务之间......
  • 【NO.99】LeetCode HOT 100—647. 回文子串
    题目:647.回文子串给你一个字符串s,请你统计并返回这个字符串中回文子串的数目。回文字符串是正着读和倒过来读一样的字符串。子字符串是字符串中的由连续字符组成的一个序列。具有不同开始位置或结束位置的子串,即使是由相同的字符组成,也会被视作不同的子串。示例1:输入:s=“......
  • 【NO.100】LeetCode HOT 100—739. 每日温度
    题目:739.每日温度给定一个整数数组temperatures,表示每天的温度,返回一个数组answer,其中answer[i]是指对于第i天,下一个更高温度出现在几天后。如果气温在这之后都不会升高,请在该位置用0来代替。示例1:输入:temperatures=[73,74,75,71,69,72,76,73]输出:[1,1,4,2,1,1......
  • Kafka-生产者、broker、消费者的调优参数总结
     生产环境下,为了尽可能提升Kafka的整体吞吐量,可以对Kafka的相关配置参数进行调整,以达到提升整体性能的目的。本文主要从Kafka的不同组件出发,讲解各组件涉及的配置参数和参数含义。一、生产者(producer.properties或者代码中)1、acks:Producer需要Leader确认的Producer请求的应答......
  • 2023.10.30
    运行超市抹零结账行为代码如下:1print("3107")2money=39.87+24.47+78.07#计算总金额3money_str=str(money)4print("商品总金额:"+money_str)5print("实收金额:{:.0f}".format(money))#进行抹零行为结果如下:计算学生成绩的分差和平均分代码如下:......
  • 文心一言 VS 讯飞星火 VS chatgpt (124)-- 算法导论10.5 5题
    五、用go语言,给定一个n结点的二叉树,写出一个O(n)时间的非递归过程,将该树每个结点的关键字输出。要求除该树本身的存储空间外只能使用固定量的额外存储空间,且在过程中不得修改该树,即使是暂时的修改也不允许。文心一言:您可以使用一个栈来存储节点,以便在遍历二叉树时进行回溯。由于您......
  • 102102151黄靖数据采集实践三
    作业一(1)要求:使用单线程和多线程的方法爬取中国气象网的限定数量的图片(2)下面给出代码实现:weather.pyimportscrapyfrom..itemsimportWeatherItemclassweatherSpider(scrapy.Spider):page=0number=0user_agent="Mozilla/5.0(WindowsNT10.0......
  • 10.30
    今天Java考试部分代码如下packagebean;publicclassBean{privateintid;privateStringname;privateStringgaishu;privateStringfangshi;privateStringstarttime;privateStringfianltime;privateStringgongyi;publicvoidsetId......