首页 > 其他分享 >kafka常用命令

kafka常用命令

时间:2023-08-16 09:00:14浏览次数:46  
标签:group -- kafka topic sh 常用命令 consumer

kafka常用命令

http://681314.com/A/h9nfEtAOIV

https://zhuanlan.zhihu.com/p/103915259

https://www.cnblogs.com/wushaoyu/p/11486551.html

https://blog.csdn.net/u010634066/article/details/119670405

一. topic相关命令

1.查看所有topic
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list 
2.查看topic消息总数
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.253.20.6:9092 --topic alert.door --time -1
其中-1表示获取某分区最新的唯一,如果改为-2表示最早的位移(一般都是0),他们的差值就是这个分区当前的消息数
3.查看topic的属性
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic nccc_rt_station_flow_5min
4.删除某个topic
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test --delete
5.创建topic
老版命令
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test1 --partitions 3 --replication-factor 3
说明:
--partitions 3表示三个分区
--replication-factor 3 表示3个副本
可以指定该topic的定期清理时间,加上下面的config参数:
--config retention.ms=172800000

新版命令
./kafka-topics.sh --bootstrap-server 10.255.60.118:9092 --create --topic first_topic_hong --partitions 3 --replication-factor 3
6.topic增加分区
./kafka-topics.sh  --zookeeper 127.0.0.1:2181 --topic test1 --alter --partitions 12  
分区增加到12个,会引起rebalance操作
7.指定topic创建消费者分组, 每次读取消息后会给kafka传递一个offset, 来定位读取到哪个位置
./kafka-console-consumer.sh  --bootstrap-server=10.111.30.3:9092 --topic test1 --consumer-property group.id=mytest1

其中group.id的值可以是任意值,是我们自己临时取的
然后我们使用kafka-console-producer加入新的数据后,再次执行下面命令,可以看到只输出新加入的数据
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --consumer-property group.id=my-group-1 --from beginning

8.查看kafka版本

./kafka-server-start.sh --version
2.1.1 (Commit:21234bee31165527)

二. 查看consumer group

https://www.jb51.net/article/267273.htm

1.查看消费者列表
新版
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
老版
./kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
2.查看Group详情
/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe

# CURRENT-OFFSET: 当前消费者群组最近提交的 offset,也就是消费者分区里读取的当前位置
# LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量
# LAG:消费者的 CURRENT-OFFSET 与 broker 的 LOG-END-OFFSET 之间的差距
3. 查询消费者成员信息
所有消费组成员信息
./kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9092
指定消费组成员信息
./kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9092
4.查询消费组状态信息
所有消费组状态信息
kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9092
指定消费组状态信息
kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9092
5. 删除group中的topic
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete
6. 删除group
删除指定消费者组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete
删除所有消费者组
./kafka-consumer-groups.sh  --all-groups --bootstrap-server xxxx:9090 --delete

想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常
Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

三. 查看消费情况

1. 读取topic消息
./kafka-console-consumer.sh --bootstrap-server 10.255.60.141:9092  --topic first_topic_hong
如果想从开始位置读取消息,再后面加上参数--from-beginning
2. 指定消费者Group读取topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning
3. 按照时间段查询topic消息
./kafka-console-consumer.sh --bootstrap-server 10.253.20.6:9092  --topic test11 --property print.timestamp=true 
--from-beginning |awk -F 'CreateTime:|\t' '$2 >= 1686546000000 && $2 <= 1686549600000 {print $0}'
时间戳的单位用毫秒

说明:

mat_realdata_hcl_station_5min

–from-beginning //表示重头开始读
–property print.timestamp=true //表示显示入kafka时间
使用awk对消费出来的记录进行筛选过滤,比如以下表示对每条消费记录按照“CreateTime:“或者”\t“进行分割,$0表示原始的记录,$1表示分割后的第1个字符串,以下$2为分割后的第二字符串,为入kafka的时间
原文链接:https://blog.csdn.net/u011311291/article/details/114574550

四. 生产数据

1. 向topic写入数据
./kafka-console-producer.sh --broker-list 10.255.60.141:9092 --topic first_topic_hong
> hi
> hello

标签:group,--,kafka,topic,sh,常用命令,consumer
From: https://www.cnblogs.com/regit/p/17632981.html

相关文章

  • vagrant常用命令
    vagrant--version#查看vagrant版本vagrantboxlist#查看box列表vagrantboxadd[boxname][url]#添加boxvagrantboxremove[boxname]#移除boxvagrantboxupdate#更新boxvagrantboxrepackage[......
  • 为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量
    (目录)一、为什么需要带有subscribe的group.id消费概念:Kafka使用消费者组的概念来实现主题的并行消费-每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以group参数是强制性的,如果没有组,Kafka将不知道如何对待订阅同一主题的其他消费者。偏移......
  • 《高级程序员 面试攻略 》rabitmq rcoketmq kafka的区别 和应用场景
    RabbitMQ、RocketMQ和Kafka都是流行的消息中间件系统,用于实现分布式应用程序之间的异步通信。虽然它们都有类似的目标,但在设计和应用场景上存在一些区别。1.RabbitMQ(兔子消息队列):-描述:RabbitMQ是一个开源的消息代理系统,实现了高性能、可靠的消息传递机制。它使用AMQP(高......
  • 《高级程序员 面试攻略 》Kafka如何实现高吞吐量和持久性。
    Kafka是一个分布式流处理平台,它通过一些关键特性来实现高吞吐量和持久性。下面是Kafka实现这些特性的主要方法:1.分布式架构:Kafka是一个分布式系统,它通过将数据分布在多个节点上来实现高吞吐量。每个节点(称为KafkaBroker)负责处理一部分数据和请求。生产者和消费者可以同时......
  • Linux常用命令
    Linux常用命令一、日期时间date[OPTION]...[+FORMAT]-u:printUTC+FORMAT:like+%Y-%m-%d-s:settimehwclock:显示硬件时间cal:查看日历uptime:查看系统运行时间二、输出&查看echo:显示输出的内容cat:显示文件内容more:向下翻页,查看文件内容;空格向下翻页......
  • 《面试1v1》Kafka的性能好在那里
    我是javapub,一名Markdown程序员从......
  • Kafka从入门到精通零基础进阶学习路线?
    Kafka从入门到精通零基础进阶学习路线?1.学习基础概念和架构:-了解Kafka的基础概念,如生产者、消费者、主题、分区等。-理解Kafka的架构,包括Kafkabroker、Zookeeper、消费者群组等。2.安装和配置Kafka:-下载和安装Kafka。-配置Kafkabroker和Zookeeper。3.发送......
  • 深入解析 Kafka 消息传递机制及其在 Spring Boot 中的应用
    Kafka作为一款高性能的分布式消息中间件,被广泛用于构建实时数据流处理和事件驱动的架构。在本篇博客中,我们将深入探讨Kafka的消息传递机制,并结合SpringBoot框架,演示如何在应用中使用Kafka进行消息传递。1.Kafka消息传递机制Kafka使用发布-订阅模型来实现消息传递。核心......
  • Kafka 消息传递机制与 Spring Boot 集成实践
    Kafka作为一款强大的分布式消息中间件,被广泛应用于实时数据流处理和事件驱动的架构。在本篇博客中,我们将深入探讨Kafka的消息传递机制,并结合SpringBoot框架,演示如何在应用中集成和使用Kafka进行消息传递。1.Kafka消息传递机制概述Kafka使用发布-订阅模型来实现高效的消......
  • Kafka 消息传递原理及在 Spring Boot 中的应用实践
    Kafka作为一款强大的分布式消息中间件,在实时数据流处理和事件驱动架构中扮演着重要角色。在本篇博客中,我们将深入探讨Kafka的消息传递原理,并结合SpringBoot框架,演示如何在应用中使用Kafka进行消息传递。1.Kafka消息传递原理Kafka采用发布-订阅模型实现消息传递,核心概念......