消息中间件Kafka之客户端操作
一、客户端API类型
-
AdminClient API
: 允许管理和检测Topic、broker以及其他Kafka对象 -
Producer API
:发布消息到一个或者多个Topic中 -
Consumer API
:订阅一个或者多个Topic,并处理产生的消息 -
Stream API
:高效地将输入流转换为输出流 -
Connector API
:从一些源系统或者应用程序中拉取数据到Kafka
二、kafka项目环境搭建
创建一个Springboot的web项目,引入一些基本的依赖。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
创建一个Bean
用来初始化Kafka
的AdminClient
:
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* kafka配置
* @author long
*/
@Slf4j
@Configuration
public class KafkaConfig {
@Bean
public AdminClient adminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.26:9092");
log.info("创建Kafka客户端");
return AdminClient.create(properties);
}
}
启动项目:
看到如下的输出说明就项目启动成功,并且连接到了Kafka
。
三、AdminClient API
下面是一些最常用的API:
API名称 | 作用 |
AdminClient | AdminClient客户端对象 |
NewTopic | 创建Topic |
CreateTopicsResult | 创建Topic的返回结果 |
ListTopicsResult | 查询Topic列表 |
ListTopicsOptions | 查询Topic列表及选项 |
DescribeTopicsResult | 查询Topics |
DescribeConfigsResult | 查询Topics配置项 |
3.1. 创建Topic
/**
* 创建Topic
* @param name topic名字
* @param partitions 分区数量
* @param factor 副本因子
* @return
*/
@GetMapping("/create_topic")
public List<Map<String, Object>> createTopic(@RequestParam("name") String name,
@RequestParam(value = "partitions", defaultValue = "1") Integer partitions,
@RequestParam(value = "replication_factor", defaultValue = "1") Integer factor) throws ExecutionException, InterruptedException {
NewTopic topic = new NewTopic(name, partitions, factor.shortValue());
CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(topic));
KafkaFuture<Config> config = topics.config(name);
return config.get().entries().stream().map(item -> {
HashMap<String, Object> map = new HashMap<>();
map.put("name", item.name());
map.put("value", item.value());
map.put("source", item.source().name());
return map;
}).collect(Collectors.toList());
}
通过Postman
测试,
[
{
"name": "compression.type",
"source": "DEFAULT_CONFIG",
"value": "producer"
},
{
"name": "leader.replication.throttled.replicas",
"source": "DEFAULT_CONFIG",
"value": ""
},
{
"name": "message.downconversion.enable",
"source": "DEFAULT_CONFIG",
"value": "true"
},
{
"name": "min.insync.replicas",
"source": "DEFAULT_CONFIG",
"value": "1"
},
{
"name": "segment.jitter.ms",
"source": "DEFAULT_CONFIG",
"value": "0"
},
{
"name": "cleanup.policy",
"source": "DEFAULT_CONFIG",
"value": "delete"
},
{
"name": "flush.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "follower.replication.throttled.replicas",
"source": "DEFAULT_CONFIG",
"value": ""
},
{
"name": "retention.ms",
"source": "DEFAULT_CONFIG",
"value": "604800000"
},
{
"name": "segment.bytes",
"source": "STATIC_BROKER_CONFIG",
"value": "1073741824"
},
{
"name": "flush.messages",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "message.format.version",
"source": "DEFAULT_CONFIG",
"value": "2.6-IV0"
},
{
"name": "file.delete.delay.ms",
"source": "DEFAULT_CONFIG",
"value": "60000"
},
{
"name": "max.compaction.lag.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "max.message.bytes",
"source": "DEFAULT_CONFIG",
"value": "1048588"
},
{
"name": "min.compaction.lag.ms",
"source": "DEFAULT_CONFIG",
"value": "0"
},
{
"name": "message.timestamp.type",
"source": "DEFAULT_CONFIG",
"value": "CreateTime"
},
{
"name": "preallocate",
"source": "DEFAULT_CONFIG",
"value": "false"
},
{
"name": "index.interval.bytes",
"source": "DEFAULT_CONFIG",
"value": "4096"
},
{
"name": "min.cleanable.dirty.ratio",
"source": "DEFAULT_CONFIG",
"value": "0.5"
},
{
"name": "unclean.leader.election.enable",
"source": "DEFAULT_CONFIG",
"value": "false"
},
{
"name": "delete.retention.ms",
"source": "DEFAULT_CONFIG",
"value": "86400000"
},
{
"name": "retention.bytes",
"source": "DEFAULT_CONFIG",
"value": "-1"
},
{
"name": "segment.ms",
"source": "DEFAULT_CONFIG",
"value": "604800000"
},
{
"name": "message.timestamp.difference.max.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "segment.index.bytes",
"source": "DEFAULT_CONFIG",
"value": "10485760"
}
]
创建Topic之前
创建Topic之后:
3.2. 删除Topic
/**
* 删除Topic
* @param name topic名字
* @return
*/
@GetMapping("/delete_topic")
public String deleteTopic(@RequestParam("name") String name) throws ExecutionException, InterruptedException {
DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList(name));
KafkaFuture<Void> kafkaFuture = result.values().get(name);
return kafkaFuture.isDone() ? "删除kafka的Topic:" + name + "成功" : "删除kafka的Topic:" + name + "失败";
}
3.3. 查看Topic的列表
/**
* 获取所有的Topic
* @param showHide 是否显示隐藏Topic
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
@GetMapping("/list_topic")
public Set<String> listTopic(@RequestParam(value = "show_hide", defaultValue = "false") Boolean showHide) throws ExecutionException, InterruptedException {
ListTopicsResult listTopics = null;
if (showHide) {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
listTopics = adminClient.listTopics(options);
} else {
listTopics = adminClient.listTopics();
}
return listTopics.names().get();
}
显示隐藏的Topic
3.4. 获取Topic的描述
/**
* 查看Topic描述
* @param name
*/
@GetMapping("/describe_topics")
public Map<String, Object> describeTopics(@RequestParam("name") String name) throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(name));
Map<String, Object> resMap = new HashMap<>(2);
TopicDescription description = result.all().get().get(name);
resMap.put("topic_name", description.name());
resMap.put("topic_desc", description.partitions().toString());
return resMap;
}
这里面的partition
就是我们在Topic
中存放的消息地方。
3.6. 获取Topic
配置信息
/**
* 获取Topic的配置
* @param name
* @return
*/
@GetMapping("/config_topics")
public List<Map<String, Object>> configTopics(@RequestParam("name") String name) throws ExecutionException, InterruptedException {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, name);
DescribeConfigsResult result = adminClient.describeConfigs(Collections.singletonList(resource));
return result.values()
.get(resource)
.get()
.entries()
.stream()
.map(item -> {
Map<String, Object> map = new HashMap<>();
map.put("name", item.name());
map.put("value", item.value());
map.put("source", item.source().name());
return map;
}).collect(Collectors.toList());
}
postman
测试结果:
[
{
"name": "compression.type",
"source": "DEFAULT_CONFIG",
"value": "producer"
},
{
"name": "leader.replication.throttled.replicas",
"source": "DEFAULT_CONFIG",
"value": ""
},
{
"name": "min.insync.replicas",
"source": "DEFAULT_CONFIG",
"value": "1"
},
{
"name": "message.downconversion.enable",
"source": "DEFAULT_CONFIG",
"value": "true"
},
{
"name": "segment.jitter.ms",
"source": "DEFAULT_CONFIG",
"value": "0"
},
{
"name": "cleanup.policy",
"source": "DEFAULT_CONFIG",
"value": "delete"
},
{
"name": "flush.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "follower.replication.throttled.replicas",
"source": "DEFAULT_CONFIG",
"value": ""
},
{
"name": "segment.bytes",
"source": "STATIC_BROKER_CONFIG",
"value": "1073741824"
},
{
"name": "retention.ms",
"source": "DEFAULT_CONFIG",
"value": "604800000"
},
{
"name": "flush.messages",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "message.format.version",
"source": "DEFAULT_CONFIG",
"value": "2.6-IV0"
},
{
"name": "max.compaction.lag.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "file.delete.delay.ms",
"source": "DEFAULT_CONFIG",
"value": "60000"
},
{
"name": "max.message.bytes",
"source": "DEFAULT_CONFIG",
"value": "1048588"
},
{
"name": "min.compaction.lag.ms",
"source": "DEFAULT_CONFIG",
"value": "0"
},
{
"name": "message.timestamp.type",
"source": "DEFAULT_CONFIG",
"value": "CreateTime"
},
{
"name": "preallocate",
"source": "DEFAULT_CONFIG",
"value": "false"
},
{
"name": "index.interval.bytes",
"source": "DEFAULT_CONFIG",
"value": "4096"
},
{
"name": "min.cleanable.dirty.ratio",
"source": "DEFAULT_CONFIG",
"value": "0.5"
},
{
"name": "unclean.leader.election.enable",
"source": "DEFAULT_CONFIG",
"value": "false"
},
{
"name": "retention.bytes",
"source": "DEFAULT_CONFIG",
"value": "-1"
},
{
"name": "delete.retention.ms",
"source": "DEFAULT_CONFIG",
"value": "86400000"
},
{
"name": "segment.ms",
"source": "DEFAULT_CONFIG",
"value": "604800000"
},
{
"name": "message.timestamp.difference.max.ms",
"source": "DEFAULT_CONFIG",
"value": "9223372036854775807"
},
{
"name": "segment.index.bytes",
"source": "DEFAULT_CONFIG",
"value": "10485760"
}
]
3.6. 修改Topic的config
/**
* 修改Topic
* @param name Topic名字
* @param topics
* @return
*/
@PostMapping ("/alter_topics/{name}")
public String alterTopic(@PathVariable String name, @RequestBody Map<String, String> topics) throws ExecutionException, InterruptedException {
Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>(topics.size());
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, name);
List<AlterConfigOp> collect = topics.entrySet()
.stream()
.map(item -> {
return new AlterConfigOp(new ConfigEntry(item.getKey(), item.getValue()), AlterConfigOp.OpType.SET);
}).collect(Collectors.toList());
configMaps.put(configResource, collect);
AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps);
KafkaFuture<Void> future = result.values().get(configResource);
future.get();
return future.isDone() ?
"修改Topic:" + name + "修改成功" : "修改Topic:" + name + "修改失败";
}
3.7. 增加Partition
/**
* 增加partition数量
* @param size
* @param name
* @return String
*/
@GetMapping("/add_partition")
public String addPartition(@RequestParam("name") String name, @RequestParam(value = "size", defaultValue = "2") Integer size) throws ExecutionException, InterruptedException {
Map<String, NewPartitions> partitionsMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(size);
partitionsMap.put(name, newPartitions);
CreatePartitionsResult result = adminClient.createPartitions(partitionsMap);
result.all().get();
return result.values().get(name).isDone() ? "增加Partition数量成功" : "增加Partition数量失败";
}