第一章 Kafka常用命令
1. Topic(主题)
1.1. 创建Topic
bin/kafka-topics.sh --create --bootstrap-server hadoop01:9092 --replication-factor 2 --partitions 1 --topic test
1.2. 查询Topic列表
1.2.1. 查询所有Topic列表
bin/kafka-topics.sh --list --bootstrap-server hadoop01:9092
添加--exclude-internal可以将排除kafka内部topic,比如__consumer_offsets
bin/kafka-topics.sh --list --bootstrap-server hadoop01:9092 --exclude-internal
1.2.2. 查询test开头的所有Topic列表
bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --list --exclude-internal --topic "test.*"
1.3. 查询Topic配置
1.3.1. 单个Topic配置
bin/kafka-topics.sh --describe --bootstrap-server hadoop01:9092 --topic test
1.3.2. 批量查询Topic配置
bin/kafka-topics.sh --topic ".*?" --bootstrap-server hadoop01:9092 --describe --exclude-internal
1.4. 删除Topic
bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --delete --topic test-lgb
1.5. Topic扩容
1.5.1. 单个Topic扩容
bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --alter --topic test --partitions 3
1.5.2. 批量Topic扩容
将所有正则表达式匹配到的Topic分区扩容到4个
bin/kafka-topics.sh --topic ".*?" --bootstrap-server hadoop01:9092 --alter --partitions 3
前提是所有的Topic都不是3个分区,否则会报错。
2. Producer(生产者)
2.1. 发送消息
(1)发送消息
bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test
This is a message
This is another message
(2)发送消息,指定生产者参数 acks 为 -1,同时启用 LZ4 的压缩算法:
bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test --request-required-acks -1 --producer-property compression.type=lz4
当设置acks=-1时,Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功。
3. Consumer(消费者)
3.1. 消费消息
(1)从头开始消费(--from-beginning参数表示从该主题最早的位移开始消费)
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic test --from-beginning
I am a student.
Hello, How are you?
This is a message
This is another message
(2)指定消费者组
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic test --from-beginning --group group01
I am a student.
Hello, How are you?
This is a message
This is another message
如果没有持续发送消息,第二次执行这条命令,将消费不到数据。
4. Consumer_groups(消费者组)
4.1. 查看消费者组的消费情况
bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group group01
4.2. 查看所有消费者组提交的位移数据
对于 __consumer_offsets 而言,由于它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。下面的命令可以帮助我们直接查看消费者组提交的位移数据。
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
除了查看位移提交数据,我们还可以直接读取该主题消息,查看消费者组的状态信息。
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
4.3. 重置消费组位移
重置位移可以大致从两个维度来进行。1.位移维度。2.时间维度。
4.3.1. 按照时间维度重置位移
DateTime 策略直接指定 --to-datetime
(1)把主题 test 的 消费者组group01的offset 重置到2024-02-02T00:00:00.000+0800
bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --group group01 --topic test --reset-offsets --to-datetime 2024-02-02T00:00:00.000+0800 --execute
最后一个参数--excute如果不加,只是打印位移调整方案,不实际执行;加上参数--excute执行真正的位移调整。
从NEW-OFFSET可以看到,OFFSET已经重置到0。
(2)查看消费者组的消费情况
bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group group01
可以看到CURRENT-OFFSET已经回到0。
4.3.2. 按照位移维度重置位移
Specified-Offset 策略直接指定--to-offset。
(1)把主题 test 的 消费者组group01的offset 重置到2。
bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --group group01 --topic test --reset-offsets --to-offset 2 --execute
从NEW-OFFSET可以看到,OFFSET已经重置到2。
(2)查看消费者组的消费情况
bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group group01
可以看到CURRENT-OFFSET已经回到2。
第二章 其他脚本工具
1. 生产者性能测试
bin/kafka-producer-perf-test.sh --topic test --num-records 5000000 --throughput -1 --record-size 200 --producer-props bootstrap.servers=hadoop01:9092 acks=1 linger.ms=50
2. 消费者性能测试
bin/kafka-consumer-perf-test.sh --broker-list hadoop01:9092 --messages 5000000 --topic test
3. 查看topic消费进度
必须参数为--group, 不指定--topic,默认为所有topic。
bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group group01
4. 获取 topic 当前消息数
# 获取当前最大位移
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092 --topic test --time -1
# 当前获取最早位移
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092 --topic test --time -2
# 以上两个数相减,即可得出 topic 当前在集群的消息总数
标签:bin,--,hadoop01,9092,kafka,topic,命令,常用命令,Kafka From: https://www.cnblogs.com/yeyuzhuanjia/p/18005645