Kafka Server之kafka-console-consumer.bat
注意:博主使用kafka版本:kafka_2.12-3.3.1.tgz windows版
一、订阅主题全部消息
(注意:Producer
已经生产:0~4999
共5000条消息)
在kafka\bin\windows
目录下,打开cmd.exe
,
输入命令:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic data-time --from-beginning
由以上2张图片可知,消费者消费5000
条消息(由于kafka无法保证消费者订阅消息的有序性,故可能上图可能看起来无法证明有5000条消息,实际上确实是5000条消息)
二、订阅指定分区消息
在kafka\bin\windows
目录下,打开cmd.exe
,
输入命令:kafka-console-consumer.bat --bootstrap-server localhost:9092 --partition 0 --offset earliest --topic data-time
data-time.log内容:
由以上2张图片可知:消费订阅:partition=0
的全部消息共2563
条
那么partition=0
的全部消息是否就是2563
条,需要进行验证。使用offset explorer验证结果:
验证完毕,符合猜测,即可以通过命令行指定主题的分区进行消费
帮助文档
Exactly one of --include/--topic is required. ()
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to>
--consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
--consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--group <String: consumer group id> The consumer group id of the consumer.
--help Print usage information.
--include <String: Java regex (String)> Regular expression specifying list of
topics to include for consumption.
--isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommitted to read all
messages. (default: read_uncommitted)
--key-deserializer <String:
deserializer for key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--offset <String: consume offset> The offset to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
--partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless '--offset' is
specified.
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.
timestamp=true|false
print.
key=true|false
print.
offset=true|false
print.
partition=true|false
print.
headers=true|false
print.
value=true|false
key.separator=<key.
separator>
line.separator=<line.
separator>
headers.separator=<line.
separator>
null.literal=<null.
literal>
key.deserializer=<key.
deserializer>
value.
deserializer=<value.deserializer>
header.deserializer=<header.
deserializer>
Users can also pass
in customized properties for their
formatter; more specifically, users
can pass in properties keyed with
'key.deserializer.', 'value.
deserializer.' and 'headers.
deserializer.' prefixes to configure
their deserializers.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic to consume on.
--value-deserializer <String:
deserializer for values>
--version Display Kafka version.
--whitelist <String: Java regex DEPRECATED, use --include instead;
(String)> ignored if --include specified.
Regular expression specifying list
of topics to include for consumption.
标签:bat,console,--,partition,kafka,offset,deserializer,consumer
From: https://www.cnblogs.com/caojun97/p/16824379.html