首页 > 其他分享 >Kafka客户端操作

Kafka客户端操作

时间:2023-07-23 10:45:13浏览次数:37  
标签:adminClient void Kafka Topic AdminClient static 操作 public 客户端

五类API

 Kafka客户端API类型

  • AdminClient API:允许管理和检测Topic、broker以及其它Kafka对象(类似于命令行的create topic)
  • Producer API:发送消息到1个或多个Topic
  • Consumer API:订阅一个或多个Topic,并处理产生的消息
  • Streams API:高效地将输入流转换到输出流
  • Connector API:从一些源系统或应用程序中拉取数据到kafka

初始化工程

创建一个SpringBoot项目

引入依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

AdminClient客户端API的使用

 客户端建立

public class AdminSample {

    public static void main(String[] args) {
        AdminClient adminClient = AdminSample.adminClient();
        System.out.println("adminClient: " + adminClient);
    }

    /**
     * 设置AdminClient
     */
    public static AdminClient adminClient(){
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.75.136:9092");
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }
}

连接测试成功

 

创建Topic

public final static String TOPIC_NAME = "topic_test";

    /**
     * 创建Topic实例
     */
    public static void createTopic(){
        AdminClient adminClient = adminClient();
        // 副本因子
        Short rs = 1;
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, rs);
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
        System.out.println("CreateTopicsResult: " + topics);
    }

    public static void main(String[] args) {
//        AdminClient adminClient = AdminSample.adminClient();
//        System.out.println("adminClient: " + adminClient);
        createTopic();
    }

创建成功

 

查看Topic列表及Internal

 查看是否成功创建topic

    /**
     * 获取Topic列表
     */
    public static void topicList() throws Exception {
        AdminClient adminClient = adminClient();
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        //打印
        names.stream().forEach(System.out::println);
    }

    public static void topicList() throws Exception {
        AdminClient adminClient = adminClient();
        //是否查看Internal选项
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true);
        //ListTopicsResult listTopicsResult = adminClient.listTopics();
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        Set<String> names = listTopicsResult.names().get();
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        //打印
        names.stream().forEach(System.out::println);
        //打印topicListing
        topicListings.stream().forEach((topicList)->{
            System.out.println(topicList);
        });
    }

 删除Topic

    /**
     * 删除Topic
     */
    public static void delTopics() throws Exception {
        AdminClient adminClient = adminClient();
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
        deleteTopicsResult.all().get();
    }

成功删除

 

Topic描述信息查看

    /**
     * 描述Topic
     */
    public static void describeTopics() throws Exception {
        AdminClient adminClient = adminClient();
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        entries.stream().forEach((entry)->{
            System.out.println("name: " + entry.getKey() + ", desc: " + entry.getValue());
        });
    }

输出以下内容

 

Topic配置信息查看

    /**
     * 查看配置
     */
    public static void describeConfig() throws Exception {
        AdminClient adminClient = adminClient();
        //TODO 预留,用来做集群
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
        Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
        configResourceConfigMap.entrySet().stream().forEach((entry)->{
            System.out.println("configResource: " + entry.getKey() + ", Config: " + entry.getValue());
        });
    }

输出信息

des:configResource: ConfigResource(type=TOPIC, name='topic_test'), 
                           Config: Config(entries=[ConfigEntry(name=compression.type, 
                                                               value=producer, 
                                                               source=DEFAULT_CONFIG, 
                                                               isSensitive=false, 
                                                               isReadOnly=false, 
                                                               synonyms=[]), 
                                   ConfigEntry(name=leader.replication.throttled.replicas, 
                                               value=, 
                                               source=DEFAULT_CONFIG, 
                                               isSensitive=false, 
                                               isReadOnly=false, 
                                               synonyms=[]), 
                                    。。。。。。

  

Topic修改描述信息

    /**
     * 修改配置
     */
    public static void alterConfig() throws Exception {
        AdminClient adminClient = adminClient();
        Map<ConfigResource, Config> configMaps = new HashMap<>();

        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true")));
        configMaps.put(configResource, config);
        AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMaps);
        alterConfigsResult.all().get();
    }

 

Patition增加

    /**
     * 增加Partition数量
     */
    public static void incrPartitions(int partitions) throws Exception {
        AdminClient adminClient = adminClient();
        Map<String, NewPartitions> partitionsMap = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
        partitionsMap.put(TOPIC_NAME, newPartitions);
        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
        createPartitionsResult.all().get();
    }

 

标签:adminClient,void,Kafka,Topic,AdminClient,static,操作,public,客户端
From: https://www.cnblogs.com/szhNJUPT/p/17574421.html

相关文章

  • 数据结构练习笔记——顺序栈的基本操作
    顺序栈的基本操作【问题描述】按照要求完成顺序栈的设计【输入形式】无【输出形式】2019181716151413121110987654321#include<iostream>usingnamespacestd;#include<stdlib.h>#include<stdio.h>#defineSTACKSIZE10#defineINCRE2......
  • debezium同步postgresql数据至kafka
    0实验环境全部部署于本地虚拟机debeziumdocker部署postgresql、kafka本机部署1postgresql1.1配置设置postgres密码为123仿照example,创建databasepostgres,schemeinventory,tablecustomers因为postgres用户有replication权限,所以可以直接使用修改postgresql.conf文......
  • 数据库之Sharding分库分表操作详解
    目录1分库分表1.1简介1.2实操准备1.2.1Sharding与SpringBoot公共依赖pom1.3Sharding-Jdbc与SpringBoot1.3.1pom.xml1.3.2配置文件1.3.2.1application.yml1.3.2.2application-sharding_4.yml1.4ShardingSphere与SpringBoot1.4.1pom.xml1.4.2配置文件1.4.2.1applicati......
  • kafka基础操作
    什么是kafkakafka本身并不是消息队列,而是一份分布式流平台(高并发,低延迟。高吞吐量)。kafka是基于zookeeper的分布式消息系统。kafka具有高吞吐率、高性能、实时及高可靠等特点。kafka基本概念Topic:一个虚拟的概念,由一个到多个Partitions组成Partition:实际消息存储单位P......
  • WEB自动化-复选框操作
    importtimefromseleniumimportwebdriverfromselenium.webdriver.common.byimportByfromselenium.webdriver.support.selectimportSelectdriver=webdriver.Edge()driver.get("file:///D:\本地练习网页.html")#创建一个select对象select_element=driver......
  • redis数据类型及操作命令
    数据类型Redis存储的是key-value结构的数据,其中key是字符串类型,value有5种常用的数据类型:字符串string哈希hash列表list集合set有序集合sortedset/zset解释说明:字符串(string):普通字符串,常用哈希(hash):适合存储对象列表(list):按照插入顺序排序,可以有重复元素......
  • SQL日期操作函数(CONCAT、DATE_FORMAT、LAST_DAY)
    获取某月底日期:SELECTLAST_DAY('2021-07-01')ASmonth_end_date;拼接年月格式:CONCAT(DATE_FORMAT(hp.planned_payment_date,'%Y-%m'),'-01')如果数据库内存的是2023-07-19经过处理后会变成:2023-07-01SELECTbp.UNIT_ID......
  • windows redis 客户端
    WindowsRedis客户端Redis是一个开源的内存数据结构存储系统,用于存储和检索数据。它可以通过通信协议在客户端和服务器之间进行交互,从而实现高效的数据操作。在Windows操作系统上,我们可以使用各种Redis客户端来与Redis服务器进行通信。本文将介绍一些常见的WindowsRedis客户端以......
  • [爬虫]2.2.1 使用Selenium库模拟浏览器操作
    Selenium是一个非常强大的工具,用于自动化Web浏览器的操作。它可以模拟真实用户的行为,如点击按钮,填写表单,滚动页面等。由于Selenium可以直接与浏览器交互,所以它可以处理那些需要JavaScript运行的动态网页。安装Selenium首先,我们需要安装Selenium库。你可以使用pip命令来安装:pip......
  • 什么是分布式操作系统?我们为什么需要分布式操作系统?
    分布式操作系统是一种特殊的操作系统,本质上属于多机操作系统,是传统单机操作系统的发展和延伸。它是将一个计算机系统划分为多个独立的计算单元(或者也可称为节点),这些节点被部署到每台计算机上,然后被网络连接起来,并保持着持续的通信状态。在分布式操作系统中,每个节点即可以独立地......