首页 > 其他分享 >ClickHouse Kafka引擎

ClickHouse Kafka引擎

时间:2023-09-07 17:23:43浏览次数:31  
标签:01 name -- kafka 引擎 2020 Kafka ClickHouse

转:https://blog.csdn.net/qq_28603127/article/details/109556177

Kafka引擎用力读取kafka中的数据,创建表语句

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]
或者
 ENGINE = Kafka(kafka_broker_list, kafka_topic_list ,kafka_group_name , kafka_format)
 settings [kafka_row_delimiter = xxx,]  [kafka_schema = '',]....

必选项:
kafka_broker_list kafka地址
kafka_topic_list 可以订阅多个主题
kafka_group_name 消费者组
kafka_format 消息的格式,常见 JSONEachRow,CSV,TSV
可选项:
kafka_row_delimiter 消息的结束符,默认’\0’
kafka_schema 不知道怎么解释这个,大部分不会用这个参数
kafka_num_consumers 消费者的个数,小于等于分区数
kafka_max_block_size 一次性拉取最大的消息大小,默认64k
kafka_skip_broken_messages 默认为0对于消息解析错误的容忍度,设置为0出现异常就不会再接收消息
kafka_commit_every_batch 偏移量提交频率,默认0,写完一个数据块后提交,设置为1每批次都会提交
kafka_thread_per_consumer 默认为0,既所有消费者线程消费的数据会归总之后写入数据表,不然就会启动独立线程,独立线程自己写入数据表互不干扰.

启动zk与kafka
kafka创建主题
formate为CSV的主题

./kafka-topics.sh --create --topic clickhouseCSV --partitions 3 
--zookeeper node01:2181 --replication-factor 1
formate为JSONEachRow的主题

./kafka-topics.sh --create --topic clickhouseJSON --partitions 3
--zookeeper node01:2181 --replication-factor 1

启动命令行并输入消息

./kafka-console-producer.sh --topic clickhouseCSV --broker-list node01:9092

 创建clickhouse 的Kafka引擎表

CREATE TABLE kafkatest.kafkatest(     `id` Int16,     `name` String,     `createDate` Date )
 ENGINE = Kafka('node01:9092', 'clickhouseCSV', 'group', 'CSV');

select * from kafkatest;
┌─id─┬─name──────┬─createDate─┐
│  1 │ zhangsan  │ 2020-01-02 │
│  4 │ zhaoliu   │ 2020-01-01 │
│  7 │ gaojiu    │ 2020-01-02 │
│ 10 │ zhoushier │ 2020-01-01 │
│  3 │ wangwu    │ 2020-01-02 │
│  6 │ luba      │ 2020-01-01 │
│  9 │ xiaoshiyi │ 2020-01-02 │
│  2 │ lisi      │ 2020-01-01 │
│  5 │ tianqi    │ 2020-01-02 │
│  8 │ kanshi    │ 2020-01-01 │
└────┴───────────┴────────────┘

成功读取到了数据,并且是从earlast读取,而不是lartest.
再执行一次查询,发现数据消失了.

node01.hadoop.com :) select * from kafkatest;
SELECT *
FROM kafkatest
Ok.
0 rows in set. Elapsed: 5.017 sec. 

这是因为Kafka引擎只能读取消费的数据,读取完了以后就会删除数据.那么用这个引擎就没有意义了,数据只能查询一次.
其实Kafka引擎是需要结合物化视图一起使用的,物化视图不断将从kafka接收的消息数据写入到其他表引擎.

CREATE TABLE kafkaMergeTree
(
    `id` Int16,
    `name` String,
    `createDate` Date
)
ENGINE = MergeTree
ORDER BY id;

CREATE MATERIALIZED VIEW kafkaview TO kafkaMergeTree AS
SELECT *
FROM kafkatest;

创建好后,继续向kafka生产数据几条数据,之后查询

node01.hadoop.com :) select * from kafkaMergeTree;

SELECT *
FROM kafkaMergeTree

┌─id─┬─name─────┬─createDate─┐
│  1 │ zhangsan │ 2020-01-02 │
│  2 │ lisi     │ 2020-01-01 │
│  3 │ wangwu   │ 2020-01-02 │
│  4 │ zhaoliu  │ 2020-01-01 │
└────┴──────────┴────────────┘

下面再演示一下Json格式并查询Kafka引擎隐藏列

CREATE TABLE kafkatest.kafkatest2
(
    `id` Int16,
    `name` String,
    `createDate` Date
)
ENGINE = Kafka('node01:9092', 'clickhouseJSON', 'group', 'JSONEachRow')

启动命令行生产者并生产数据

sh kafka-console-producer.sh --topic clickhouseJSON --broker-list node01:9092
>{"id":1,"name":"zhangsan","createDate":"2020-01-01"}
>{"id":2,"name":"lisi","createDate":"2020-01-02"}
查询数据
node01.hadoop.com :) select *,_topic,_key,_offset,_timestamp,_partition from kafkatest2; SELECT *, _topic, _key, _offset, _timestamp, _partition FROM kafkatest2 ┌─id─┬─name─────┬─createDate─┬─_topic─────────┬─_key─┬─_offset─┬──────────_timestamp─┬─_partition─┐ │ 1 │ zhangsan │ 2020-01-01 │ clickhouseJSON │ │ 1 │ 2020-11-08 10:54:30 │ 1 │ │ 2 │ lisi │ 2020-01-02 │ clickhouseJSON │ │ 1 │ 2020-11-08 10:54:31 │ 0 │ └────┴──────────┴────────────┴────────────────┴──────┴─────────┴─────────────────────┴────────────┘

上方以_开头的是隐藏列,不包含在*之中
_topic kafka主题。
_key 消息的key。
_offset 消息的偏移量。
_timestamp 消息的时间戳。
_partition 分区。

翻译

搜索

复制

标签:01,name,--,kafka,引擎,2020,Kafka,ClickHouse
From: https://www.cnblogs.com/qsds/p/17685542.html

相关文章

  • kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?
    默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。一、自定义PartitionAssignor.packagecom.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;importorg.apache.kafka.clients.consumer.internals.AbstractPartitionAssign......
  • kafka复习:(24)consume-transform-produce模式
    packagecom.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.produc......
  • 火山引擎 DataTester 首推A/B实验经验库,帮助企业高效优化实验设计能力
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群 近日,火山引擎DataTester推出了重要功能——A/B实验经验库。基于在字节跳动已完成150万余次A/B实验的经验,DataTester首创了A/B实验经验库功能。该功能可帮助业务人员将历史的A/B实验经......
  • 火山引擎 DataTester 首推A/B实验经验库,帮助企业高效优化实验设计能力
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群近日,火山引擎DataTester推出了重要功能——A/B实验经验库。基于在字节跳动已完成150万余次A/B实验的经验,DataTester首创了A/B实验经验库功能。该功能可帮助业务人员将历史的A/B实验经验沉淀,并......
  • Java低代码开发:jvs-list(列表引擎)功能(二)字段及样式配置
    字段的增减进入列表页设计器-页表设计界面,点击新增一行、或者删除按钮,可以对字段进行增减操作,如果对于权限的列表页,可以使用批量创建字段的按钮:字段的批量设置,点击批量添加如下图所示字段为中文名称,每一行为一个字段,默认去除空格默认字段系统提供创建人、创建时间、修改人、修改时......
  • 独享IP对网站引擎优化的影响
    首先,我们来了解一下什么是独享IP。独享IP是指一个IP地址只分配给一个网站,而不是和其他网站共享。那么,独享IP到底对网站SEO有什么影响呢?1、提高网站权威性独享IP可以提高网站的权威性。因为搜索引擎会认为独享IP的网站更加专业、稳定,因此在排名中会给予更高的权重。这对于网站的SEO......
  • 《自己动手建搜索引擎》日志分析类代码解析与修正为兼容lucene3.0.2
    搜索日志是用来分析用户搜索行为和信息需求的重要依据。一般记录如下信息:搜索关键字用户来源IP本次搜索返回结果数量搜索时间其他需要记录的应用相关信息  例如:搜索时间|日志类型|搜索类型|搜索关键字|IP地址|本次搜索返回结果数量, 存放日志文件内容如下:2008-04......
  • clickhouse的简单介绍及使用
    转:https://blog.csdn.net/qq_44275894/article/details/123973699一、介绍cliskhouse官方地址ClickHouse是一个真正的面向列的数据库管理系统(DBMS),用于查询的在线分析处理(OLAP)。数据按列存储,并且在执行数组(向量或列块)期间存储。只要有可能,操作就会被发送到数组上,而不是单......
  • 安防监控/视频汇聚/云存储/AI视频智能算法引擎:遛狗AI检测算法详解
    根据最新修订发布的《中华人民共和国动物防疫法》规定:遛狗不栓绳,养狗不办证、未定期接种疫苗等行为都是违法行为。作为一个合格的“铲屎官"出门遛狗一定要牵好狗绳,保护他人和爱犬的安全。但就算法律明文规定,还是有很多人无视法律法规,在外遛狗不牵绳,任其自由活动。在日常管理中,遛狗......
  • 安防监控/视频汇聚/云存储/AI视频智能算法引擎系统:遛狗检测算法详解
    根据最新修订发布的《中华人民共和国动物防疫法》规定:遛狗不栓绳,养狗不办证、未定期接种疫苗等行为都是违法行为。作为一个合格的“铲屎官"出门遛狗一定要牵好狗绳,保护他人和爱犬的安全。但就算法律明文规定,还是有很多人无视法律法规,在外遛狗不牵绳,任其自由活动。在日常管理中,......