首页 > 其他分享 >消息中间件kafka之客户端操作

消息中间件kafka之客户端操作

时间:2022-12-19 14:09:09浏览次数:50  
标签:name DEFAULT value kafka Topic source 消息中间件 CONFIG 客户端


消息中间件Kafka之客户端操作

一、客户端API类型

  • ​AdminClient API​​: 允许管理和检测Topic、broker以及其他Kafka对象
  • ​Producer API​​:发布消息到一个或者多个Topic中
  • ​Consumer API​​:订阅一个或者多个Topic,并处理产生的消息
  • ​Stream API​​:高效地将输入流转换为输出流
  • ​Connector API​​:从一些源系统或者应用程序中拉取数据到Kafka

二、kafka项目环境搭建

创建一个Springboot的web项目,引入一些基本的依赖。

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

创建一个​​Bean​​​用来初始化​​Kafka​​​的​​AdminClient​​:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
* kafka配置
* @author long
*/
@Slf4j
@Configuration
public class KafkaConfig {

@Bean
public AdminClient adminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.26:9092");
log.info("创建Kafka客户端");
return AdminClient.create(properties);
}

}

启动项目:

消息中间件kafka之客户端操作_kafka

看到如下的输出说明就项目启动成功,并且连接到了​​Kafka​​。

三、​​AdminClient API​

下面是一些最常用的API:

API名称

作用

AdminClient

AdminClient客户端对象

NewTopic

创建Topic

CreateTopicsResult

创建Topic的返回结果

ListTopicsResult

查询Topic列表

ListTopicsOptions

查询Topic列表及选项

DescribeTopicsResult

查询Topics

DescribeConfigsResult

查询Topics配置项

3.1. 创建Topic

/**
* 创建Topic
* @param name topic名字
* @param partitions 分区数量
* @param factor 副本因子
* @return
*/
@GetMapping("/create_topic")
public List<Map<String, Object>> createTopic(@RequestParam("name") String name,
@RequestParam(value = "partitions", defaultValue = "1") Integer partitions,
@RequestParam(value = "replication_factor", defaultValue = "1") Integer factor) throws ExecutionException, InterruptedException {
NewTopic topic = new NewTopic(name, partitions, factor.shortValue());
CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(topic));
KafkaFuture<Config> config = topics.config(name);
return config.get().entries().stream().map(item -> {
HashMap<String, Object> map = new HashMap<>();
map.put("name", item.name());
map.put("value", item.value());
map.put("source", item.source().name());
return map;
}).collect(Collectors.toList());
}

通过​​Postman​​ 测试,

[
{
"name": "compression.type",
"source": "DEFAULT_CONFIG",
"value": "producer"
},
{
"name": "leader.replication.throttled.replicas",
"source": "DEFAULT_CONFIG",
"value": ""
},
{
"name": "message.downconversion.enable",
"source": "DEFAULT_CONFIG",
"value": "true"
},
{
"name": "min.insync.replicas",
"source": "DEFAULT_CONFIG",
"value": "1"
},
{
"name": "segment.jitter.ms",
"source": "DEFAULT_CONFIG",
"value": "0"
},
{
"name": "cleanup.policy",
"source": "DEFAULT_CONFIG",
"value": "delete"
},
{
"name": "flush.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "follower.replication.throttled.replicas",
"source": "DEFAULT_CONFIG",
"value": ""
},
{
"name": "retention.ms",
"source": "DEFAULT_CONFIG",
"value": "604800000"
},
{
"name": "segment.bytes",
"source": "STATIC_BROKER_CONFIG",
"value": "1073741824"
},
{
"name": "flush.messages",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "message.format.version",
"source": "DEFAULT_CONFIG",
"value": "2.6-IV0"
},
{
"name": "file.delete.delay.ms",
"source": "DEFAULT_CONFIG",
"value": "60000"
},
{
"name": "max.compaction.lag.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "max.message.bytes",
"source": "DEFAULT_CONFIG",
"value": "1048588"
},
{
"name": "min.compaction.lag.ms",
"source": "DEFAULT_CONFIG",
"value": "0"
},
{
"name": "message.timestamp.type",
"source": "DEFAULT_CONFIG",
"value": "CreateTime"
},
{
"name": "preallocate",
"source": "DEFAULT_CONFIG",
"value": "false"
},
{
"name": "index.interval.bytes",
"source": "DEFAULT_CONFIG",
"value": "4096"
},
{
"name": "min.cleanable.dirty.ratio",
"source": "DEFAULT_CONFIG",
"value": "0.5"
},
{
"name": "unclean.leader.election.enable",
"source": "DEFAULT_CONFIG",
"value": "false"
},
{
"name": "delete.retention.ms",
"source": "DEFAULT_CONFIG",
"value": "86400000"
},
{
"name": "retention.bytes",
"source": "DEFAULT_CONFIG",
"value": "-1"
},
{
"name": "segment.ms",
"source": "DEFAULT_CONFIG",
"value": "604800000"
},
{
"name": "message.timestamp.difference.max.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "segment.index.bytes",
"source": "DEFAULT_CONFIG",
"value": "10485760"
}
]

创建Topic之前

消息中间件kafka之客户端操作_API_02

创建Topic之后:

消息中间件kafka之客户端操作_kafka_03

3.2. 删除Topic

/**
* 删除Topic
* @param name topic名字
* @return
*/
@GetMapping("/delete_topic")
public String deleteTopic(@RequestParam("name") String name) throws ExecutionException, InterruptedException {
DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList(name));
KafkaFuture<Void> kafkaFuture = result.values().get(name);
return kafkaFuture.isDone() ? "删除kafka的Topic:" + name + "成功" : "删除kafka的Topic:" + name + "失败";
}

消息中间件kafka之客户端操作_spring_04

3.3. 查看Topic的列表

/**
* 获取所有的Topic
* @param showHide 是否显示隐藏Topic
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
@GetMapping("/list_topic")
public Set<String> listTopic(@RequestParam(value = "show_hide", defaultValue = "false") Boolean showHide) throws ExecutionException, InterruptedException {
ListTopicsResult listTopics = null;
if (showHide) {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
listTopics = adminClient.listTopics(options);
} else {
listTopics = adminClient.listTopics();
}
return listTopics.names().get();
}

消息中间件kafka之客户端操作_API_05

显示隐藏的​​Topic​

消息中间件kafka之客户端操作_spring_06

3.4. 获取Topic的描述

/**
* 查看Topic描述
* @param name
*/
@GetMapping("/describe_topics")
public Map<String, Object> describeTopics(@RequestParam("name") String name) throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(name));
Map<String, Object> resMap = new HashMap<>(2);
TopicDescription description = result.all().get().get(name);
resMap.put("topic_name", description.name());
resMap.put("topic_desc", description.partitions().toString());
return resMap;
}

消息中间件kafka之客户端操作_API_07

这里面的​​partition​​​就是我们在​​Topic​​中存放的消息地方。

3.6. 获取​​Topic​​配置信息

/**
* 获取Topic的配置
* @param name
* @return
*/
@GetMapping("/config_topics")
public List<Map<String, Object>> configTopics(@RequestParam("name") String name) throws ExecutionException, InterruptedException {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, name);
DescribeConfigsResult result = adminClient.describeConfigs(Collections.singletonList(resource));
return result.values()
.get(resource)
.get()
.entries()
.stream()
.map(item -> {
Map<String, Object> map = new HashMap<>();
map.put("name", item.name());
map.put("value", item.value());
map.put("source", item.source().name());
return map;
}).collect(Collectors.toList());
}

​postman​​ 测试结果:

[
{
"name": "compression.type",
"source": "DEFAULT_CONFIG",
"value": "producer"
},
{
"name": "leader.replication.throttled.replicas",
"source": "DEFAULT_CONFIG",
"value": ""
},
{
"name": "min.insync.replicas",
"source": "DEFAULT_CONFIG",
"value": "1"
},
{
"name": "message.downconversion.enable",
"source": "DEFAULT_CONFIG",
"value": "true"
},
{
"name": "segment.jitter.ms",
"source": "DEFAULT_CONFIG",
"value": "0"
},
{
"name": "cleanup.policy",
"source": "DEFAULT_CONFIG",
"value": "delete"
},
{
"name": "flush.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "follower.replication.throttled.replicas",
"source": "DEFAULT_CONFIG",
"value": ""
},
{
"name": "segment.bytes",
"source": "STATIC_BROKER_CONFIG",
"value": "1073741824"
},
{
"name": "retention.ms",
"source": "DEFAULT_CONFIG",
"value": "604800000"
},
{
"name": "flush.messages",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "message.format.version",
"source": "DEFAULT_CONFIG",
"value": "2.6-IV0"
},
{
"name": "max.compaction.lag.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "file.delete.delay.ms",
"source": "DEFAULT_CONFIG",
"value": "60000"
},
{
"name": "max.message.bytes",
"source": "DEFAULT_CONFIG",
"value": "1048588"
},
{
"name": "min.compaction.lag.ms",
"source": "DEFAULT_CONFIG",
"value": "0"
},
{
"name": "message.timestamp.type",
"source": "DEFAULT_CONFIG",
"value": "CreateTime"
},
{
"name": "preallocate",
"source": "DEFAULT_CONFIG",
"value": "false"
},
{
"name": "index.interval.bytes",
"source": "DEFAULT_CONFIG",
"value": "4096"
},
{
"name": "min.cleanable.dirty.ratio",
"source": "DEFAULT_CONFIG",
"value": "0.5"
},
{
"name": "unclean.leader.election.enable",
"source": "DEFAULT_CONFIG",
"value": "false"
},
{
"name": "retention.bytes",
"source": "DEFAULT_CONFIG",
"value": "-1"
},
{
"name": "delete.retention.ms",
"source": "DEFAULT_CONFIG",
"value": "86400000"
},
{
"name": "segment.ms",
"source": "DEFAULT_CONFIG",
"value": "604800000"
},
{
"name": "message.timestamp.difference.max.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "segment.index.bytes",
"source": "DEFAULT_CONFIG",
"value": "10485760"
}
]

3.6. 修改Topic的config

/**
* 修改Topic
* @param name Topic名字
* @param topics
* @return
*/
@PostMapping ("/alter_topics/{name}")
public String alterTopic(@PathVariable String name, @RequestBody Map<String, String> topics) throws ExecutionException, InterruptedException {
Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>(topics.size());
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, name);
List<AlterConfigOp> collect = topics.entrySet()
.stream()
.map(item -> {
return new AlterConfigOp(new ConfigEntry(item.getKey(), item.getValue()), AlterConfigOp.OpType.SET);
}).collect(Collectors.toList());
configMaps.put(configResource, collect);
AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps);
KafkaFuture<Void> future = result.values().get(configResource);
future.get();
return future.isDone() ?
"修改Topic:" + name + "修改成功" : "修改Topic:" + name + "修改失败";
}

3.7. 增加​​Partition​

/**
* 增加partition数量
* @param size
* @param name
* @return String
*/
@GetMapping("/add_partition")
public String addPartition(@RequestParam("name") String name, @RequestParam(value = "size", defaultValue = "2") Integer size) throws ExecutionException, InterruptedException {
Map<String, NewPartitions> partitionsMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(size);
partitionsMap.put(name, newPartitions);
CreatePartitionsResult result = adminClient.createPartitions(partitionsMap);
result.all().get();
return result.values().get(name).isDone() ? "增加Partition数量成功" : "增加Partition数量失败";
}

消息中间件kafka之客户端操作_API_08


标签:name,DEFAULT,value,kafka,Topic,source,消息中间件,CONFIG,客户端
From: https://blog.51cto.com/luckyqilin/5952236

相关文章

  • Kafka知识总结之集群环境搭建
    简述Kafka是一个分布式流平台,本质是一个消息队列。消息队列的三个作用:异步、消峰和解耦。一.安装zookeeper1.1.下载并解压#下载wgethttps://mirror.bit.edu.cn/apache/z......
  • Kafka知识总结之Broker原理总结
    简介这篇文章介绍Kafka的Broker工作流程,包括其中控制器的选举过程;kafka副本的leader选举以及leader和follower故障流程;简单讲述了生产环境中如何调整分区副本;kafka的文件存......
  • Kafka知识总结之生产者简单使用
    一.测试环境搭建引入依赖:<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency>创......
  • .NET MQTT客户端SimpleMQTT的使用
    一、说明MQTT客户端组件.NETCore版本,基于新生命团队NewLife.MQTT的单例模式封装,支持.netcore3,.net6/7Github地址:https://github.com/zxzyjs/SimpleMQTT.gitGitee地......
  • 4. 使用mc客户端
    使用mc客户端minio可使用客户端mc操作服务端。一、安装客户端https://dl.min.io/client/mc/release/选择对应操作系统,下载后设置权限即可使用。例如,在CentOS系统下:w......
  • Qt音视频开发06-海康sdk内核linux客户端
    一、前言海康sdk的示例在官方是提供了的,但是无论UI还是交互简直是宇宙无敌的垃圾,猜测应该是初学者编写的,估计练手用的,所以老早就想把这个linux支持集成到自己的示例中,既然......
  • Kafka数据可靠性探究
    概述Kafka作为商业级消息中间件,消息的可靠性保障是非常重要。那Kafka是怎么保障消息的可靠性的呢?上图是Kafka的消息发送基础架构,一条消息的完整生命周期是:生产者发送消息至K......
  • 使用WPF或AspNetCore创建简易版ChatGPT客户端,让ChatGPT成为你的私人助理
    前言:前一天写的一个ChatGPT服务端,貌似大家用起来还不是那么方便,所以我顺便用WPF和AspNetCore的webapi程序做个客户端吧,通过客户端来快速访问chatgpt模型生成对话。 1、......
  • 微软跨平台maui开发chatgpt客户端
    image什么是maui.NET多平台应用UI(.NETMAUI)是一个跨平台框架,用于使用C#和XAML创建本机移动(ios,andriod)和桌面(windows,mac)应用。imagechagpt最近......
  • K8S的Kafka监控(Prometheus+Grafana)
    欢迎访问我的GitHub这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos对于部署在K8S上的Kafka来说,Prometheus+Grafana是常用的监控......