首页 > 其他分享 >kafka 常见命令以及增加topic的分区数

kafka 常见命令以及增加topic的分区数

时间:2023-02-13 16:47:54浏览次数:35  
标签:-- 分区 bootstrap kafka topic sh server

基础命令
1.创建topic
kafka-topics.sh --bootstrap-server ${kafkaAddress} --create --topic ${topicName} --partitions ${partipartions} --replication-factor ${replication}

2.查看topic详情
kafka-topics.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --describe

3.删除topic
kafka-topics.sh --bootstrap-server ${kafkaAddress} --delete--topic ${topicName} --partitions ${partitions} --replication-factor ${replication}

4.查看topic list
kafka-topics.sh --bootstrap-server ${kafkaAddress} --list

5.消费topic
kafka-console-consumer.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --from-beginning

6.查看kafka consumer group 消费情况
kafka-consumer-groups.sh --describe --bootstrap-server ${kafkaAddress} --group ${groupName}

7.已经创建的topic修改partipartions 和 replication-factor 数量
step:
a.修改partitions数量
kafka-topics.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --alter --partitions 4
b.创建increase-replication-factor.json in config,配置各分区replication-factor位置

{
  "version": 1,
  "partitions": [
    {
      "topic": "${topicName}",
      "partition": 0,
      "replicas": [
        0,
        1
      ]
    },
    {
      "topic": "${topicName}",
      "partition": 1,
      "replicas": [
        1,
        2
      ]
    },
    {
      "topic": "${topicName}",
      "partition": 2,
      "replicas": [
        2,
        3
      ]
    },
    {
      "topic": "${topicName}",
      "partition": 3,
      "replicas": [
        3,
        0
      ]
    }
  ]
}
c.更新replication-factor
kafka-reassign-partitions.sh --bootstrap-server ${kafkaAddress} --reassignment-json-file config/increase-replication-factor.json --execute
3.创建topic话题
bin/kafka-topics.sh --create --topic test --bootstrap-server 192.168.218.128:9092 -partitions 3 -replication-factor 1

查看所有topic话题
bin/kafka-topics.sh --list --bootstrap-server 192.168.218.128:9092

查看指定话题的详情
bin/kafka-topics.sh --bootstrap-server 192.168.230.128:9092 --describe --topic test

3.创建生产者
bin/kafka-console-producer.sh --broker-list 192.168.218.128:9092 --topic test

4.创建消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.218.128:9092 --topic test --from-beginning

## 创建topic(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test

### kafka版本 >= 2.2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

## 分区扩容
### kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

### kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2

## 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

 

标签:--,分区,bootstrap,kafka,topic,sh,server
From: https://www.cnblogs.com/banger/p/17116863.html

相关文章

  • kafka删除topic清空数据
    一般情况下,是不会删除数据的。到达一定时间后,kafka会自动删除。如果一定要删除可以删除topic在重建topic了No.1:如果需要被删除topic此时正在被程序produce和consum......
  • kafka笔记
    1、概念:kafka是一个用scala语言编写的,分布式、支持分区(partition)、多副本(replica),基于zookeeper协调分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各种需......
  • go连接kafka
    Part1前言本文主要介绍如何通过go语言连接kafka。这里采用的是sarama库。​​https://github.com/Shopify/sarama​​Part2库的安装goget-ugithub.com/Shopify/saramago......
  • kafka如何开启kerberos认证
    参考:     https://www.cnblogs.com/wuyongyin/p/15624452.html kerberos基本原理       https://www.cnblogs.com/wuyongyin/p/15634397.html kerb......
  • 05-KafkaConsumer
    1.工作流程1.1消费者组概述ConsumerGroup(CG):由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupId相同。消费者与消费组这种模型可以让整体的消费......
  • 04-KafkaBroker
    1.工作流程1.1Zk存储的信息1.2总体工作流程step1~6:step7~11:模拟broker上下线,Zk中的数据变化:(1)查看/kafka/brokers/ids路径上的节点[zk:localhost:2181(......
  • 02-KafkaProducer
    1.发送消息流程1.1整体架构整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的......
  • 搭建kafka
    1、安装jdkjava-version查看jdk版本yum-ylistjava*查看已安装和可安装的软件包yum-yinstalljava安装新版本 2、安装zookeeper(单机模式)a) 下载地址http......
  • rocketMq和kafka对比
    为什么在RocketMQ和kafka中选型在单机同步发送的场景下,Kafka>RocketMQ,Kafka的吞吐量高达17.3w/s,RocketMQ吞吐量在11.6w/s。kafka高性能原因生产者Kafka会把收到的消息......
  • Kafka
    1、为什么有消息系统1.解耦合2.异步处理例如电商平台,秒杀活动。一般流程会分为:1:风险控制、2:库存锁定、3:生成订单、4:短信通知、5:更新数据3.通过消息系统将......