首页 > 其他分享 >Kafka-常用命令行命令

Kafka-常用命令行命令

时间:2024-02-04 10:14:18浏览次数:30  
标签:bin -- hadoop01 9092 kafka topic 命令 常用命令 Kafka

第一章 Kafka常用命令

1. Topic(主题)

1.1. 创建Topic

bin/kafka-topics.sh --create --bootstrap-server hadoop01:9092 --replication-factor 2 --partitions 1 --topic test

 

1.2. 查询Topic列表

1.2.1. 查询所有Topic列表

bin/kafka-topics.sh --list --bootstrap-server hadoop01:9092

添加--exclude-internal可以将排除kafka内部topic,比如__consumer_offsets

bin/kafka-topics.sh --list --bootstrap-server hadoop01:9092  --exclude-internal

 

1.2.2. 查询test开头的所有Topic列表

bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --list --exclude-internal --topic "test.*"

 

1.3. 查询Topic配置

1.3.1. 单个Topic配置

bin/kafka-topics.sh --describe --bootstrap-server hadoop01:9092 --topic test

 

1.3.2. 批量查询Topic配置

bin/kafka-topics.sh --topic ".*?" --bootstrap-server hadoop01:9092 --describe --exclude-internal

 

1.4. 删除Topic

bin/kafka-topics.sh  --bootstrap-server hadoop01:9092 --delete --topic test-lgb

 

1.5. Topic扩容

1.5.1. 单个Topic扩容

bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --alter --topic test --partitions 3

 

1.5.2. 批量Topic扩容

将所有正则表达式匹配到的Topic分区扩容到4个

bin/kafka-topics.sh --topic ".*?" --bootstrap-server hadoop01:9092 --alter --partitions 3​​

前提是所有的Topic都不是3个分区,否则会报错。

 

2. Producer(生产者)

2.1. 发送消息

(1)发送消息

bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test

This is a message

This is another message

 

 

(2)发送消息,指定生产者参数 acks 为 -1,同时启用 LZ4 的压缩算法:

bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test --request-required-acks -1 --producer-property compression.type=lz4

当设置acks=-1时,Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功。

 

3. Consumer(消费者)

3.1. 消费消息

 (1)从头开始消费(--from-beginning参数表示从该主题最早的位移开始消费)

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic test --from-beginning

I am  a  student.

Hello, How are you?

 

This is a message

This is another message

 

(2)指定消费者组

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic test --from-beginning  --group  group01

I am  a  student.

Hello, How are you?

 

This is a message

This is another message

如果没有持续发送消息,第二次执行这条命令,将消费不到数据。

 

4. Consumer_groups(消费者组)

4.1. 查看消费者组的消费情况

bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group group01

 

4.2. 查看所有消费者组提交的位移数据

对于 __consumer_offsets 而言,由于它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。下面的命令可以帮助我们直接查看消费者组提交的位移数据。

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

 

除了查看位移提交数据,我们还可以直接读取该主题消息,查看消费者组的状态信息。

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

 

4.3. 重置消费组位移

重置位移可以大致从两个维度来进行。1.位移维度。2.时间维度。

 

 

4.3.1. 按照时间维度重置位移

DateTime 策略直接指定 --to-datetime

(1)把主题 test 的 消费者组group01的offset 重置到2024-02-02T00:00:00.000+0800

bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --group group01 --topic test --reset-offsets --to-datetime 2024-02-02T00:00:00.000+0800 --execute

最后一个参数--excute如果不加,只是打印位移调整方案,不实际执行;加上参数--excute执行真正的位移调整。

从NEW-OFFSET可以看到,OFFSET已经重置到0。

(2)查看消费者组的消费情况

bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group group01

可以看到CURRENT-OFFSET已经回到0。

4.3.2. 按照位移维度重置位移

Specified-Offset 策略直接指定--to-offset。

(1)把主题 test 的 消费者组group01的offset 重置到2。

bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --group group01 --topic test --reset-offsets --to-offset 2 --execute

从NEW-OFFSET可以看到,OFFSET已经重置到2。

 

(2)查看消费者组的消费情况

bin/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group group01

可以看到CURRENT-OFFSET已经回到2。

 

第二章 其他脚本工具

1. 产者性能测试

bin/kafka-producer-perf-test.sh --topic test --num-records 5000000 --throughput -1 --record-size 200 --producer-props bootstrap.servers=hadoop01:9092 acks=1 linger.ms=50

 

2. 消费者性能测试

bin/kafka-consumer-perf-test.sh --broker-list hadoop01:9092 --messages 5000000 --topic test

 

3. 查看topic消费进度

必须参数为--group, 不指定--topic,默认为所有topic。

bin/kafka-consumer-groups.sh --bootstrap-server  hadoop01:9092 --describe --group group01

 

4. 获取 topic 当前消息数

# 获取当前最大位移

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092 --topic test --time -1

 

# 当前获取最早位移

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092 --topic test --time -2

# 以上两个数相减,即可得出 topic 当前在集群的消息总数

标签:bin,--,hadoop01,9092,kafka,topic,命令,常用命令,Kafka
From: https://www.cnblogs.com/yeyuzhuanjia/p/18005645

相关文章

  • Kafka-如何重设消费者位移(OFFSET)
    1.为什么要重设消费者组位移?我们知道,Kafka和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka的消费者读取消息是可以重演的(replayable)。像RabbitMQ或ActiveMQ这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功......
  • Windows Server 20xx 命令行配置系统策略
    :WindowsServer命令行配置系统策略:如果感觉使用图形界面进行系统策略配置比较繁琐,可以通过命令行方式批量配置系统策略。:先编制如下内容的SetSysPolicies.cmd文件(其中“@echo”引导的为文字回显行),然后以管理员方式打开CMD,:直接运行SetSysPolicies.cmd便可以快速完成系统策略配置......
  • kafka系列(一)【消息队列、Kafka的基本概念、Kafka的工作机制、Kafka可满足的需求、Kafk
    (kafka系列一)转自《Kafka并不难学!入门、进阶、商业实战》一、消息队列1.消息队列的来源在高并发的应用场景中,由于来不及同步处理请求,接收到的请求往往会发生阻塞。例如,大量的插入、更新请求同时到达数据库,这会导致行或表被锁住,最后会因为请求堆积过多而触发“连接数过多的......
  • 关于go install安装cli找不到命令,zsh: command not found
    关于goinstall的安装位置goinstall命令构建并安装由命令行上的路径命名的软件包。可执行文件(主包)安装在GOBIN环境变量命名的目录中,如果没有设置GOPATH环境变量,则默认为$GOPATH/bin或$HOME/go/bin。$GOROOT中的可执行文件安装在$GOROOT/bin或$GOTOOLDIR中,而不是$GOBIN中。不可......
  • nginx默认的启动停止重启命令是什么?
    在Linux系统中,Nginx服务的启动、停止和重启命令通常如下:启动Nginx:对于基于systemd的系统(如Ubuntu15.04+、CentOS7+):sudosystemctlstartnginx对于不使用systemd管理的传统init系统(如Ubuntu14.04及以前版本):sudoservicenginxstart或者直接执行nginx可执行文件(如......
  • linux 查看端口并关闭端口命令
    1.查看服务器端口情况:lsof-i:端口号(lsof-i需要root用户的权限)相关命令:lsof-i:8080--查看8080端口占用lsofabc.txt--显示开启文件abc.txt的进程lsof-cabc--显示abc进程现在打开的文件lsof-c-p1234--列出进程号为1234的进程所打开的文件lsof-ggid--显示归......
  • tar命令 --null -T 参数详解
    tar命令的--null和-T参数可以一起使用,以从null设备读取文件名,并将这些文件名传递给tar命令来处理。--null参数的作用是将文件名作为null字符分隔的字符串传递给tar命令。这通常用于处理包含空格或特殊字符的文件名。-T参数的作用是从指定的文件中读取文件名,并将......
  • Linux基础47 Ansible之ad-hoc, 命令模块(command, shell, script), 软件管理模块(yum,
    Ansible之ad-hoc一、什么是ad-hoc1.什么是ad-hocad-hoc简而言之就是“临时命令”,执行完即结束,并不会保存2.ad-hoc使用场景可以用作查看远程机器的进程,或者磁盘,或者拷贝文件3.ad-hoc命令使用[root@m01~]#ansibleweb01-mshell-a'free-m'web01|CHANGED|rc=......
  • 【Docker】从零开始:9.Docker命令:Push推送仓库(Docker Hub,阿里云)
    【Docker】从零开始:9.Docker命令:Push推送仓库(DockerHub,阿里云):https://blog.csdn.net/sinat_36528886/article/details/134575054?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-1-134575054-blog-132139578.235^v43^pc_blog_bo......
  • scp命令的问题
    rsaUnabletonegotiatewith192.168.20.10port22:nomatchinghostkeytypefound.Theiroffer:ssh-rsascp:Connectionclosed手动指定加密方式为rsascp-oHostKeyAlgorithms=ssh-rsasftp不支持sh:/usr/libexec/sftp-server:Nosuchfileordirectoryscp:C......