首页 > 其他分享 >Kafka Server之kafka-console-consumer.bat

Kafka Server之kafka-console-consumer.bat

时间:2022-10-25 14:34:35浏览次数:59  
标签:bat console -- partition kafka offset deserializer consumer

Kafka Server之kafka-console-consumer.bat

注意:博主使用kafka版本:kafka_2.12-3.3.1.tgz windows版

一、订阅主题全部消息

(注意:Producer 已经生产:0~4999共5000条消息)

kafka\bin\windows目录下,打开cmd.exe

输入命令:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic data-time --from-beginning

由以上2张图片可知,消费者消费5000条消息(由于kafka无法保证消费者订阅消息的有序性,故可能上图可能看起来无法证明有5000条消息,实际上确实是5000条消息)

二、订阅指定分区消息

kafka\bin\windows目录下,打开cmd.exe

输入命令:kafka-console-consumer.bat --bootstrap-server localhost:9092 --partition 0 --offset earliest --topic data-time

data-time.log内容:

由以上2张图片可知:消费订阅:partition=0的全部消息共2563

那么partition=0的全部消息是否就是2563条,需要进行验证。使用offset explorer验证结果:

验证完毕,符合猜测,即可以通过命令行指定主题的分区进行消费

帮助文档

Exactly one of --include/--topic is required. ()
Option                                   Description
------                                   -----------
--bootstrap-server <String: server to    REQUIRED: The server(s) to connect to.
  connect to>
--consumer-property <String:             A mechanism to pass user-defined
  consumer_prop>                           properties in the form key=value to
                                           the consumer.
--consumer.config <String: config file>  Consumer config properties file. Note
                                           that [consumer-property] takes
                                           precedence over this config.
--enable-systest-events                  Log lifecycle events of the consumer
                                           in addition to logging consumed
                                           messages. (This is specific for
                                           system tests.)
--formatter <String: class>              The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--from-beginning                         If the consumer does not already have
                                           an established offset to consume
                                           from, start with the earliest
                                           message present in the log rather
                                           than the latest message.
--group <String: consumer group id>      The consumer group id of the consumer.
--help                                   Print usage information.
--include <String: Java regex (String)>  Regular expression specifying list of
                                           topics to include for consumption.
--isolation-level <String>               Set to read_committed in order to
                                           filter out transactional messages
                                           which are not committed. Set to
                                           read_uncommitted to read all
                                           messages. (default: read_uncommitted)
--key-deserializer <String:
  deserializer for key>
--max-messages <Integer: num_messages>   The maximum number of messages to
                                           consume before exiting. If not set,
                                           consumption is continual.
--offset <String: consume offset>        The offset to consume from (a non-
                                           negative number), or 'earliest'
                                           which means from beginning, or
                                           'latest' which means from end
                                           (default: latest)
--partition <Integer: partition>         The partition to consume from.
                                           Consumption starts from the end of
                                           the partition unless '--offset' is
                                           specified.
--property <String: prop>                The properties to initialize the
                                           message formatter. Default
                                           properties include:
 print.
                                           timestamp=true|false
 print.
                                           key=true|false
 print.
                                           offset=true|false
 print.
                                           partition=true|false
 print.
                                           headers=true|false
 print.
                                           value=true|false
 key.separator=<key.
                                           separator>
 line.separator=<line.
                                           separator>
 headers.separator=<line.
                                           separator>
 null.literal=<null.
                                           literal>
 key.deserializer=<key.
                                           deserializer>
 value.
                                           deserializer=<value.deserializer>
                                           header.deserializer=<header.
                                           deserializer>

Users can also pass
                                           in customized properties for their
                                           formatter; more specifically, users
                                           can pass in properties keyed with
                                           'key.deserializer.', 'value.
                                           deserializer.' and 'headers.
                                           deserializer.' prefixes to configure
                                           their deserializers.
--skip-message-on-error                  If there is an error when processing a
                                           message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms>       If specified, exit if no message is
                                           available for consumption for the
                                           specified interval.
--topic <String: topic>                  The topic to consume on.
--value-deserializer <String:
  deserializer for values>
--version                                Display Kafka version.
--whitelist <String: Java regex          DEPRECATED, use --include instead;
  (String)>                                ignored if --include specified.
                                           Regular expression specifying list
                                           of topics to include for consumption.

标签:bat,console,--,partition,kafka,offset,deserializer,consumer
From: https://www.cnblogs.com/caojun97/p/16824379.html

相关文章

  • kafka问题总结
    这篇文章主要记录自己遇到和在网上看到的一些关于kafka的相关问题。问题1:客户端和服务端版本不一致造成的消息发送延迟高现象kafka客户端支持多语言api,这里只关......
  • kafka与eventing
    项目地址https://strimzi.io/quickstarts/https://github.com/strimzi/strimzi-kafka-operator/tree/0.31.1/examples/kafka部署ClusterRole和CRDkubectlcreate-f'h......
  • mybatis 判断字符是否相等
    mybatis判断字符是否相等1.外面单引号里面双引号表示字符串<iftest='showColumn==nullorshowColumn==""'>srr.id......
  • Kafka工具:Offset Explorer
    Kafka工具:OffsetExplorer一、OffsetExplorer安装offsetexplorer_64bit.exeWindowsx64版本下载链接安装方式:选择默认安装(即全部默认下一步)官网链接:https://www.kafk......
  • Mybatis原理分析-核心组件
    Mybatis四大对象指的是:Executor,StatementHandler,ParameterHandler和ResultSetHandler对象。四个对象在SqlSession内部共同协作完成sql语句的执行,同时也是我们自定义插件拦......
  • Mybatis事务控制
    https://blog.csdn.net/weixin_34392906/article/details/91425640一.1. 概述 对数据库的事务而言,应该具有以下几点:创建(create)、提交(commit)、回滚(rollback)、关闭(close......
  • MybatisPlus分页查询插件
    MybatisPlus分页查询插件公共配置类@ConfigurationpublicclassMybatisPlusPageConfig{/* 旧版本配置 @Bean publicPaginationInterceptorpaginationInterc......
  • 14. MyBatis注解式开发
    一、什么是MyBatis注解式开发  MyBatis中也提供了注解式开发方式,采用注解可以减少SQL映射文件的配置。如果使用注解式开发的话,SQL语句是写在Java程序中的,这种方式......
  • MyBatis
    MyBatis什么是MybatisMybatis是一款优秀的持久层框架,用于简化JDBC开发Mybatis本是Apache的一个开源项目iBatis,2010年这个项目由apachesoftwarefoundation迁移到了goog......
  • Mybatis
    ResultMap结果集映射多对一按照查询嵌套处理:需要两个查询,被嵌套的<association>需要column以及select属性按照结果嵌套处理:不需要column以及select属性对于数据......