转:https://blog.csdn.net/qq_28603127/article/details/109556177
Kafka引擎用力读取kafka中的数据,创建表语句
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,] [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N,] [kafka_max_block_size = 0,] [kafka_skip_broken_messages = N,] [kafka_commit_every_batch = 0,] [kafka_thread_per_consumer = 0] 或者 ENGINE = Kafka(kafka_broker_list, kafka_topic_list ,kafka_group_name , kafka_format) settings [kafka_row_delimiter = xxx,] [kafka_schema = '',]....
必选项:
kafka_broker_list kafka地址
kafka_topic_list 可以订阅多个主题
kafka_group_name 消费者组
kafka_format 消息的格式,常见 JSONEachRow,CSV,TSV
可选项:
kafka_row_delimiter 消息的结束符,默认’\0’
kafka_schema 不知道怎么解释这个,大部分不会用这个参数
kafka_num_consumers 消费者的个数,小于等于分区数
kafka_max_block_size 一次性拉取最大的消息大小,默认64k
kafka_skip_broken_messages 默认为0对于消息解析错误的容忍度,设置为0出现异常就不会再接收消息
kafka_commit_every_batch 偏移量提交频率,默认0,写完一个数据块后提交,设置为1每批次都会提交
kafka_thread_per_consumer 默认为0,既所有消费者线程消费的数据会归总之后写入数据表,不然就会启动独立线程,独立线程自己写入数据表互不干扰.
启动zk与kafka
kafka创建主题
formate为CSV的主题
./kafka-topics.sh --create --topic clickhouseCSV --partitions 3 --zookeeper node01:2181 --replication-factor 1
formate为JSONEachRow的主题
./kafka-topics.sh --create --topic clickhouseJSON --partitions 3
--zookeeper node01:2181 --replication-factor 1
启动命令行并输入消息
./kafka-console-producer.sh --topic clickhouseCSV --broker-list node01:9092
创建clickhouse 的Kafka引擎表
CREATE TABLE kafkatest.kafkatest( `id` Int16, `name` String, `createDate` Date ) ENGINE = Kafka('node01:9092', 'clickhouseCSV', 'group', 'CSV'); select * from kafkatest; ┌─id─┬─name──────┬─createDate─┐ │ 1 │ zhangsan │ 2020-01-02 │ │ 4 │ zhaoliu │ 2020-01-01 │ │ 7 │ gaojiu │ 2020-01-02 │ │ 10 │ zhoushier │ 2020-01-01 │ │ 3 │ wangwu │ 2020-01-02 │ │ 6 │ luba │ 2020-01-01 │ │ 9 │ xiaoshiyi │ 2020-01-02 │ │ 2 │ lisi │ 2020-01-01 │ │ 5 │ tianqi │ 2020-01-02 │ │ 8 │ kanshi │ 2020-01-01 │ └────┴───────────┴────────────┘
成功读取到了数据,并且是从earlast读取,而不是lartest.
再执行一次查询,发现数据消失了.
node01.hadoop.com :) select * from kafkatest; SELECT * FROM kafkatest Ok. 0 rows in set. Elapsed: 5.017 sec.
这是因为Kafka引擎只能读取消费的数据,读取完了以后就会删除数据.那么用这个引擎就没有意义了,数据只能查询一次.
其实Kafka引擎是需要结合物化视图一起使用的,物化视图不断将从kafka接收的消息数据写入到其他表引擎.
CREATE TABLE kafkaMergeTree ( `id` Int16, `name` String, `createDate` Date ) ENGINE = MergeTree ORDER BY id; CREATE MATERIALIZED VIEW kafkaview TO kafkaMergeTree AS SELECT * FROM kafkatest;
创建好后,继续向kafka生产数据几条数据,之后查询
node01.hadoop.com :) select * from kafkaMergeTree; SELECT * FROM kafkaMergeTree ┌─id─┬─name─────┬─createDate─┐ │ 1 │ zhangsan │ 2020-01-02 │ │ 2 │ lisi │ 2020-01-01 │ │ 3 │ wangwu │ 2020-01-02 │ │ 4 │ zhaoliu │ 2020-01-01 │ └────┴──────────┴────────────┘
下面再演示一下Json格式并查询Kafka引擎隐藏列
CREATE TABLE kafkatest.kafkatest2 ( `id` Int16, `name` String, `createDate` Date ) ENGINE = Kafka('node01:9092', 'clickhouseJSON', 'group', 'JSONEachRow')
启动命令行生产者并生产数据
sh kafka-console-producer.sh --topic clickhouseJSON --broker-list node01:9092 >{"id":1,"name":"zhangsan","createDate":"2020-01-01"} >{"id":2,"name":"lisi","createDate":"2020-01-02"}
查询数据
node01.hadoop.com :) select *,_topic,_key,_offset,_timestamp,_partition from kafkatest2; SELECT *, _topic, _key, _offset, _timestamp, _partition FROM kafkatest2 ┌─id─┬─name─────┬─createDate─┬─_topic─────────┬─_key─┬─_offset─┬──────────_timestamp─┬─_partition─┐ │ 1 │ zhangsan │ 2020-01-01 │ clickhouseJSON │ │ 1 │ 2020-11-08 10:54:30 │ 1 │ │ 2 │ lisi │ 2020-01-02 │ clickhouseJSON │ │ 1 │ 2020-11-08 10:54:31 │ 0 │ └────┴──────────┴────────────┴────────────────┴──────┴─────────┴─────────────────────┴────────────┘
上方以_开头的是隐藏列,不包含在*之中_topic
kafka主题。_key
消息的key。_offset
消息的偏移量。_timestamp
消息的时间戳。_partition
分区。
翻译
搜索
复制
标签:01,name,--,kafka,引擎,2020,Kafka,ClickHouse From: https://www.cnblogs.com/qsds/p/17685542.html