首页 > 其他分享 >kafka 常用命令

kafka 常用命令

时间:2023-03-03 18:55:05浏览次数:37  
标签:bin -- kafka topic host sh 常用命令

kafka介绍

kafka官网:http://kafka.apache.org/

各个名词解释:
*producer:发布消息的对象称之为消息生产者(Kafka topic producer)
*topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
*consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
*broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

消息中间件 对比

选择建议
消息中间件 建议
Kafka 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
RocketMQ 可靠性要求很高的金融互联网领域,稳定性搞,尽力了多次阿里双11考验
RabbitMQ 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较晚上的RabbitMQ

常用命令介绍

启动kafka服务
bin/kafka-server-start.sh config/server.properties &
停止kafka服务
bin/kafka-server-stop.sh
创建一个叫demo-topic的主题(topic),有两个分区,每个分区3个副本,同时指定该主题的消息保留时长(72小时)
bin/kafka-topics.sh --zookeeper(host:port) --create --topic demo-topic --replication-factor 3 --partitions 2 --config retention.ms=259200000
列出指定主题(topic)的详细信息
bin/kafka-topics.sh  --zookeeper(host:port) --describe  --topic demo-topic
查看所有的主题
bin/kafka-topics.sh --list --zookeeper(host:port) kafka-host(host:port)
查看所有主题的详细信息
bin/kafka-topics.sh --zookeeper(host:port) --describe
删除一个主题
bin/kafka-topics.sh --zookeeper(host:port) --topic demo-topic --delete
向kafka指定topi写入数据
bin/kafka-console-producer.sh --broker-list kafka-host(host:port)--topic demo-topic
命令行消费某个topic消息
#加了--from-beginning 从头消费所有消息
bin/kafka-console-consumer.sh --bootstrap-server kafka-host(host:port) --topic demo-topic --from-beginning   

#不加--from-beginning 从最新的一条消息开始消费
bin/kafka-console-consumer.sh --bootstrap-server kafka-host(host:port) --topic demo-topic
查看某个topic对应的消息数量
# time为-1时表示最大值,time为-2时表示最小值 --partitions num 指定分区
bin/kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list kafka-host(host:port) --topic demo-topic --time -1
kafka重置分组已经消费的偏移量offest
bin/kafka-consumer-groups.sh --bootstrap-server=kafka-host(host:port) --execute --reset-offsets --topic=demo-topic --group=testPlatform --to-earliest
topic增加分区
bin/kafka-topics.sh --alter --zookeeper(host:port) --topic demo-topic --partitions 12
指定topic创建消费者分组
bin/kafka-console-consumer.sh  --bootstrap-server=kafka-host(host:port) --topic demo-topic --consumer-property group.id=testPlatform
查看消费组组所属topic的消费情况
bin/kafka-consumer-groups.sh --bootstrap-server=kafka-host(host:port)  --group=demo-group --describ
显示所有消费者
bin/kafka-consumer-groups.sh --bootstrap-serverkafka-host(host:port) --list
获取正在消费的topic的group的offset
bin/kafka-consumer-groups.sh --describe --group demo-group --bootstrap-serverkafka-host(host:port)

##### 重设 consumer group的offset

确定topic作用域:
--all-topics   为consumer group下所有topic的所有分区调整位移
--topic t1 --topic t2  为指定的若干个topic的所有分区调整位移
--topic t1:0,1,2     为指定的topic分区调整位移


确定位移重设策略
--to-current                            把位移调整到分区当前位移.       
--to-datetime <String: datetime>       把位移调整到大于给定时间的最早位移处.datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'  2020-07-01T12:00:00.000
--to-earliest                           把位移调整到分区当前最小位移
--to-latest                             把位移调整到分区当前最新位移        
--to-offset <Long: offset>              把位移调整到指定位移处
--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
--from-file <file>:从CSV文件中读取调整策略
例:(--dry-run 不运行只查看结果,类似k8s里面;--execute执行)
按时间点重置(--to-datetime)
bin/kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --topic=test --group=testPlatform  --execute --reset-offsets --to-datetime 2022-01-18T12:00:00.000

按offset重置 (--to-offset )
重置消费组下指定topic
bin/kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --topic=test --group=testPlatform --execute --reset-offsets --to-offset 359905139  

重置消费组下面所以topic
bin/kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --all-topics --group testPlatform --reset-offsets  --to-latest  --execute
指定offset与partition导出消息
bin/kafka-console-consumer.sh --bootstrap-server=kafka-host(host:port) --topic --topic=demo-topic  --offset 825000 --partition 0 >> messages.log

##### 修改topic的参数

kafka-configs.sh --zookeeper(host:port) --entity-type topics --entity-name demo-topic --alter --add-config max.message.bytes=1048576

##### 测试生产者性能脚本

bin/kafka-producer-perf-test.sh --topic demo-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host(host:port) acks=-1 linger.ms=2000 compression.type=lz4

##### 测试消费者性能脚本

bin/kafka-consumer-perf-test.sh --broker-list kafka-host(host:port) --messages 10000000 --topic demo-topic

标签:bin,--,kafka,topic,host,sh,常用命令
From: https://www.cnblogs.com/qingtianyu2015/p/17176663.html

相关文章

  • zookeeper 常用命令
    ###Zookeeper服务常用命令```shell启动ZooKeeper服务:./zkServer.shstart查看ZooKeeper服务状态:./zkServer.shstatus停止ZooKeeper服务:./zkServer.shstop重启Z......
  • scrcpy安装与adb常用命令
    一、资源下载scrcpy安装包:https://download.csdn.net/download/qq_28807911/87527008github:https://github.com/Genymobile/scrcpy二、scrcpy安装1.右击此电脑-属性......
  • mpffinance常用命令参数
    主题相关查看可用预设主题mpf.available_styles()默认的主题包括:'binance','blueskies','brasil','charles','checkers','classic','default','ibd','kenan','mike','n......
  • Linux运用一些常用命令,优秀的PHPer都需掌握
    作为一名优秀的phper,Linux是必备的一项技能,工作3-5年的基本能明白我讲的道理!今天搜集整理了一些Linux服务器运维常用命令,希望对大家有帮助:1.删除0字节文件 find -type......
  • kubectl 常用命令
    1、显示一个或多个资源的详细状态kubectldescribe-nrbd-systemporainbond-operator-6d5bb9c7cb-jz84b2、使用默认编辑器编辑和更新服务器上一个或多个资源的定义。......
  • Kafka3.4:基于kraft集群搭建
    环境准备环境:jdk8,三台centos机器,基于kraft模式(不使用zk)kafka安装包:https://mirrors.aliyun.com/apache/kafka/3.4.0/kafka_2.13-3.4.0.tgz机器iphostname配置......
  • git常用命令教程
    补充requirements.txt的生成方式第一种方法是,在终端窗口输入pipfreeze>requirements.txtpipinstall-rrequirements.txt#一键安装所有依赖git常用指令指......
  • 【ZooKeeper基础-数据结构、服务端/客户端常用命令】
    一、ZooKeeper简介二、ZooKeeper数据结构&命令**1、数据结构**2、服务端常用命令①单机启动命令:./zkServer.shstart②状态查询命令:./zkServer.shstatus③停止服务......
  • linux常用命令
    1.man[命令或配置文件](功能描述:获得帮助信息)获取一个信息文档,查看具体信息,按q退出2.help命令(功能描述:获得shell内置命令的帮助信息)仅仅能获取bash内置命令的信息,不......
  • Linux中的常用命令
    Linux中的命令严格区分大小写pwd printworkingdirectory:输出当前工作目录,光标所有位置的目录(绝对路径)cd changedirectory:’切换目录 ‘.’表示当前目当‘........