首页 > 其他分享 >kafka复习:(8)消费某个主题指定分区的消息

kafka复习:(8)消费某个主题指定分区的消息

时间:2023-10-31 11:32:26浏览次数:36  
标签:ConsumerConfig 复习 分区 kafka org apache import properties


package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/*
消费某个topic的指定分区的消息
 */

public class KafkaTest05 {

    private static Properties getProperties(){
        Properties properties=new Properties();

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");
        return properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());
        myConsumer.assign(Arrays.asList(new TopicPartition("student", 3)));

        while(true){
            ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord record: consumerRecords){
                System.out.println(record.value());
                System.out.println("record offset is: "+record.offset());
            }

        }



    }
}


标签:ConsumerConfig,复习,分区,kafka,org,apache,import,properties
From: https://blog.51cto.com/amadeusliu/8102885

相关文章

  • 【面试题】前端面试复习6---性能优化
    性能优化一、性能指标要在Chrome中查看性能指标,可以按照以下步骤操作:打开Chrome浏览器,并访问你想要测试的网页。使用快捷键F12或右键点击页面并选择“检查”,打开开发者工具。在开发者工具中,切换到“Performance”(性能)选项卡。点击开始录制按钮,即红色的圆点按钮。开始加载页......
  • Kafka-生产者、broker、消费者的调优参数总结
     生产环境下,为了尽可能提升Kafka的整体吞吐量,可以对Kafka的相关配置参数进行调整,以达到提升整体性能的目的。本文主要从Kafka的不同组件出发,讲解各组件涉及的配置参数和参数含义。一、生产者(producer.properties或者代码中)1、acks:Producer需要Leader确认的Producer请求的应答......
  • python爬虫知识体系80页md笔记,0基础到scrapy项目高手,第(2)篇:http协议复习精讲
    本文主要学习一下关于爬虫的相关前置知识和一些理论性的知识,通过本文我们能够知道什么是爬虫,都有那些分类,爬虫能干什么等,同时还会站在爬虫的角度复习一下http协议。完整体系笔记直接地址:请移步这里共8章,37子模块,总计5.6w+字今天这一篇主讲:爬虫基础本阶段本文主要学......
  • 数据库数据恢复—误操作导致SqlServer数据库所在NTFS分区损坏的数据恢复案例
    SqlServer数据库数据恢复环境:一台服务器,windows操作系统+NTFS文件系统,运行了12个sqlserver数据库。SqlServer数据库故障:根据用户描述,故障情况是工作人员误操作导致服务器硬盘上sqlserver数据库所在分区损坏。经过北亚企安数据恢复工程师对故障服务器硬盘的初步检测,确认sqlserver......
  • SpringCloud复习:(2)@LoadBalanced注解的工作原理
    @LoadBalanced注解标记了一个RestTemplate或WebClientbean使用LoadBalancerClient来进行负载均衡。LoadBalancerAutoConfiguration类给带注解的@RestTemplate添加了拦截器:LoadBalancerInterceptor.具体流程如下:首先定义一个LoadBalancerInterceptor然后定义了一个RestTemplateC......
  • SpringCloud复习:(3)LoadBalancerInterceptor
    使用Ribbon时,execute方法会由RibbonLoadBalancerClient类来实现它会调用重载的execute方法getLoadBalancer默认会返回ZoneAwareLoadBalancer(基类是BaseLoadBalancer).此处调用的getServer方法就会根据负载均衡策略选择适当的服务器来为下一步的http请求做准备。这个execute方法......
  • SpringCloud复习:(1)netflix包里的DiscoveryClient类
    DiscoveryClient类实现了EurekaClient接口它的主要作用:服务注册,服务续约,服务下线,获取服务列表。initScheduledTasks方法用来开启定时任务来完成上述功能。上图中的代码用来从服务器定期(默认30秒)拉取服务列表(ScheduledExecutorService的应用场景)其中TimedSupervisorTask这个Run......
  • Kafka基础学习笔记
    一、Kafka:1、简介:Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。最大的特性就是可以实时并高速的处理大量数据来满足需求,同时对消息数据进行持久化存储。2、优点:Kafka与其他消息队列MQ(如ActiveMQ、Rabb......
  • linux基本文件命令复习笔记
    1,放大缩小终端窗口字体  放大 ctrlshift+=   缩小  ctrl-2,6个常见终端命令 (1)ls  查看当前文件夹下的内容 (2)pwd 查看当前所在文件夹  (3)cd目录名 切换文件夹 (4)touch文件名 如果文件不存在,新建文件。和mkdir不同的是,mkdir创......
  • kafka代码实践
    安装kafka:Windows安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851Linux安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353添加依赖包:<dependency><groupId>org.springframework.k......