什么是ClickHouse
Clickhouse云数据库是开源列式数据库管理系统ClickHouse在OPPO Cloud上的托管服务,用户可搭建自己的ClickHouse集群。总体架构如下:
- ClickHouse中无中心节点,所有节点是完全对等的,每一个节点都可以承载查询请求和写入请求,以及后台数据的计算和操作。
- 每个ClickHouse集群包含1个或多个分片(Shard),每个分片内部包含1个或多个副本(Replica)。
- 所有ClickHouse节点都部署在OPPO Cloud的容器主机中。
ClickHouse特性
ClickHouse是面向联机分析处理的列式数据库,支持SQL查询,且查询性能好,特别是基于大宽表的聚合分析查询性能非常优异,比其他分析型数据库速度快一个数量级。
- 数据压缩比高。
- 多核并行计算。
- 向量化计算引擎。
- 支持嵌套数据结构。
- 支持稀疏索引。
基础概念
Clickhouse集群(Cluster)
集群是由多个Clickhouse server实例组成的分布式数据库,根据配置的不同包含1个或者多个分片(Shard)以及1个或多个副本(Replica)。
分片(Shard)
单台服务器存储和计算资源有限,分布式场景下通过横向扩展服务器资源来进一步提升处理能力和效率。海量数据分散存储到多台服务器上,每台服务器只存储和处理其中一部分,这样的一台服务器被称之为一个分片(Shard)。
副本(Replica)
在每一个分片的服务器可能存在异常的情况下保证数据的安全性和服务的高可用,将每一个分片的数据冗余存储到2台或者多台服务器上,其中每一份冗余都被称为一个副本。
数据库(Database)
数据库是ClickHouse集群中的最高级别对象,内部包含表(Table)、列(Column)、视图(View)、函数、数据类型等。
表(Table)
表是数据的组织形式,由多行、多列构成。ClickHouse的表从数据分布上,可以分为本地表、分布式表两种类型。从存储引擎上,可以分为单机表、复制表两种类型。
本地表(Local Table)
本地表的数据,只会存储在当前写入的节点上,不会被分散到多台机器上。
分布式表(Distributed Table)
分布式表是本地表的集合,它将多个本地表抽象为一张统一的表,对外提供写入、查询功能。当写入分布式表时,数据会被自动分发到集合中的各个本地表中;当查询分布式表时,集合中的各个本地表都会被分别查询,并且把最终结果汇总后返回。
说明:本地表与分布式表的区别在于:本地表的写入和查询,受限于单台服务器的存储、计算资源,不具备横向拓展能力;而分布式表的写入和查询,则可以利用多台服务器的存储、计算资源,具有较好的横向拓展能力。分布式表本身不存储,实际数据存储于本地表中。
单机表(Non-Replicated Table)
单机表的数据,只会存储在当前服务器上,不会被复制到其他服务器,即只有一个副本。
复制表(Replicated Table)
复制表的数据,会被自动复制到多个服务器上,形成多个副本。
说明:单机表与复制表的区别在于:
单机表在异常情况下无法保证服务高可用。
复制表在至少有一个正常副本的情况下,仍旧能够对外提供服务。
使用限制
高可用集群必须用复制表引擎;
DDL操作需要使用ON CLUSTER default语句在所有server上执行。
只支持分布式表,也即多台ClickHouse server会自动组成分布式集群,不支持单机表;默认所有server都会自动组成名字为default的集群,DDL需要使用ON CLUSTER default语句在所有server上执行。
对象命名规范
对象 | 命名规则 | 限制 |
---|---|---|
数据库名 | 以小写字符开头,可包含字母、数字以及下划线(),但不能包含连续两个及以上的下划线(),长度不超过64个字符。 | 数据库名不能是system,system是内置数据库。 |
表名 | 以字母或下划线()开头,可包含字母、数字以及下划线(),长度为1到127个字符。 | 表名不包含引号、感叹号(!)和空格。表名不能是SQL保留关键字。 |
列名 | 以字母或下划线()开头,可包含字母、数字以及下划线(),长度为1到127个字符。 | 列名不包含引号、感叹号(!)和空格。列名不能是SQL保留关键字。 |
账号名 | 以小写字母开头,小写字母或数字结尾,可包含小字母、数字以及下划线(_),长度为3到16个字符。 | - |
密码 | 包含大写字母、小写字母、数字以及特殊字符(!)、(@)、(#)、($)、(%)、(^)、(&)、(*)(())、(_)、(+)、(-)、(=),每个密码至少包含其中三项(大写字母、小写字母、数字以及特殊字符),长度为8到32个字符。 | - |
数据类型
常用数据类型列表如下,完整数据类型请参考官方文档:数据类型
分类 | 关键字 | 数据类型 | 取值/取值范围 |
---|---|---|---|
整数类型 | Int8 | Int8 | 取值范围:-128 - 127。 |
整数类型 | Int16 | Int16 | 取值范围 :-32768 - 32767。 |
整数类型 | Int32 | Int32 | 取值范围: -2147483648 - 2147483647。 |
整数类型 | Int64 | Int64 | 取值范围 :-9223372036854775808 - 9223372036854775807。 |
浮点类型 | Float32 | 单精度浮点数 | 同C语言Float类型,单精度浮点数在机内占4个字节,用32位二进制描述。 |
浮点类型 | Float64 | 双精度浮点数 | 同C语言Double类型,双精度浮点数在机内占8个字节,用64位二进制描述。 |
Decimal类型 | Decimal | Decimal | 有符号的定点数,可在加、减和乘法运算过程中保持精度。支持几种写法:Decimal(P, S),Decimal32(S),Decimal64(S)Decimal128(S) |
字符串类型 | String | 字符串 | 字符串可以是任意长度的。它可以包含任意的字节集,包含空字节。因此,字符串类型可以代替其他 DBMSs 中的VARCHAR、BLOB、CLOB 等类型。 |
字符串类型 | FixedString | 固定字符串 | 当数据的长度恰好为N个字节时,FixedString类型是高效的。 在其他情况下,这可能会降低效率。可以有效存储在FixedString类型的列中的值的示例:二进制表示的IP地址(IPv6使用FixedString(16)),语言代码(ru_RU, en_US … ),货币代码(USD, RUB … )。二进制表示的哈希值(MD5使用FixedString(16),SHA256使用FixedString(32)) |
时间日期类型 | Date | 日期 | 用两个字节存储,表示从 1970-01-01 (无符号) 到当前的日期值。日期中没有存储时区信息。 |
时间类型 | DateTime | 时间戳 | 用四个字节(无符号的)存储 Unix 时间戳。允许存储与日期类型相同的范围内的值。最小值为 0000-00-00 00:00:00。时间戳类型值精确到秒(没有闰秒)。时区使用启动客户端或服务器时的系统时区。 |
时间类型 | Datetime64 | Datetime64 | 此类型允许以日期(date)加时间(time)的形式来存储一个时刻的时间值。 |
布尔型 | Boolean | Boolean | ClickHouse没有单独的类型来存储布尔值。可以使用UInt8 类型,取值限制为0或 1。 |
数组类型 | Array | Array | Array(T),由 T 类型元素组成的数组。T 可以是任意类型,包含数组类型。但不推荐使用多维数组,ClickHouse对多维数组的支持有限。例如,不能在MergeTree表中存储多维数组。 |
元组类型 | Tuple | Tuple | Tuple(T1, T2, …),元组,其中每个元素都有单独的类型,不能在表中存储元组(除了内存表)。它们可以用于临时列分组。在查询中,IN表达式和带特定参数的 lambda 函数可以来对临时列进行分组。 |
Domain数据类型 | Domain | Domain | Domain类型是特定实现的类型:IPv4是与UInt32类型保持二进制兼容的Domain类型,用于存储IPv4地址的值。它提供了更为紧凑的二进制存储的同时支持识别可读性更加友好的输入输出格式。 IPv6是与FixedString(16)类型保持二进制兼容的Domain类型,用于存储IPv6地址的值。它提供了更为紧凑的二进制存储的同时支持识别可读性更加友好的输入输出格式。 |
枚举类型 | Enum8 | Enum8 | 取值范围:-128 - 127。 |
枚举类型 | Enum16 | Enum16 | 取值范围 :-32768 - 32767。 |
可为空 | Nullable | Nullable | 除非在 ClickHouse 服务器配置中另有说明,否则 NULL 是任何 Nullable 类型的默认值。Nullable 类型字段不能包含在表索引中。 |
嵌套类型 | nested | nested | 嵌套的数据结构就像单元格内的表格。嵌套数据结构的参数(列名和类型)的指定方式与CREATE TABLE查询中的指定方式相同。每个表行都可以对应于嵌套数据结构中的任意数量的行。 |
ClickHouse集群有若干参数,参数通常分为Clickhouse配置参数和用户级别参数。
Clickhouse配置参数
通常配置在Config.xml配置文件中,可以通过控制台进行在线修改并重启实例使修改后的参数生效。
基于修改频率和实例稳定性考虑,通过控制台修改的参数如下表所示:
参数名称 | 默认值 | 取值范围 | 是否需要重启 | 参数说明 |
---|---|---|---|---|
max_connections | 4096 | >0 | 是 | 最大连接数 |
max_concurrent_queries | 100 | >0 | 是 | 最大同时查询数 |
keep_alive_timeout | 3 | >0 | 是 | 保持连接的超时时间秒数 |
use_uncompressed_cache | 0 | 0、1 | 是 | 是否使用未压缩缓存块:0:不使用,1:使用 |
uncompressed_cache_size | 8589934592 | >=0 | 是 | MergeTree表引擎使用的未压缩数据的缓存大小(单位:字节)。在use_uncompressed_cache为1时有效,按需分配的共享缓存,比较有利于短查询 |
mark_cache_size | 5368709120 | >=5368709120 | 是 | MergeTree表引擎使用的标记缓存的近似大小(单位:字节)。按需分配,整个实例共享 |
max_table_size_to_drop | 53687091200 (50GB) | >=0 | 是 | 用于限制可删除的MergeTree表引擎的表大小(单位:字节)。设置为0即不限制,可以任意删除;设置为大于0的数值,该实例表大小超过设置值则无法删除. |
max_partition_size_to_drop | 53687091200 (50GB) | >=0 | 是 | 用于限制可删除的MergeTree表引擎的分区大小(单位:字节)。设置为0即不限制,可以任意删除分区;设置为大于0的数值,该实例表的分区大小超过设置值则无法删除. |
timezone | Asia/Shanghai | 合法的时区格式:Asia/Singapore,Europe/Madrid,America/Chicago | 是 | 实例的时区信息 |
用户级别参数
用户级别参数通常配置在user.xml文件中,还可以通过Sql语句来设置生效,不需要重启实例。详细参数清单,请参考Clickhouse用户参数列表
Sql语句方式
Set命令
通过Set命令设置,会话级别有效,链接断开后失效。
例子:
Set load_balancing = 'random';
Setting设置
通过在SQL语句末尾的Setting设置,单条Sql语句有效。
例子:
Select col1,col2 from table SETTINGS max_threads = 16;
user.xml文件配置
修改user.xml配置后,需要当前账户重新连接才能生效,目前暂时不支持。
表引擎
ClickHouse提供了丰富的表引擎,不同的表引擎也代表着不同的表类型,本文主要对ClickHouse中常见的表引擎进行介绍。
表引擎的作用是什么
- 决定表存储在哪里以及以何种方式存储
- 支持哪些查询以及如何支持
- 并发数据访问
- 索引的使用
- 是否可以执行多线程请求
- 数据复制参数
表引擎分类
引擎分类 | 引擎名称 |
---|---|
MergeTree系列 | MergeTree 、ReplacingMergeTree 、SummingMergeTree 、 AggregatingMergeTree、CollapsingMergeTree 、 VersionedCollapsingMergeTree 、GraphiteMergeTree |
Log系列 | TinyLog 、StripeLog 、Log |
Integration Engines | Kafka 、MySQL、ODBC 、JDBC、HDFS、S3 |
Special Engines | Distributed 、MaterializedView、 Dictionary 、Merge 、File、Null 、Set 、Join 、 URL View、Memory 、 Buffer |
- MergeTree系列:适用于高负载任务的最通用和功能最强大的表引擎。这些引擎的共同特点是可以快速插入数据并进行后续的后台数据处理。
- Log系列:功能相对简单,主要用于快速写入小表(1百万行左右的表),然后全部读出的场景。
- Integration系列:主要用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。
- Special系列:大多是为了特定场景而定制的。
本文重点描述了常用的几种表引擎,如需获得更多内容,请参见ClickHouse官方文档表引擎介绍
MergeTree系列
Clickhouse 中最强大的表引擎当属MergeTree(合并树)引擎及该系列(*MergeTree)中的其他引擎。MergeTree系列的引擎被设计用于插入极大量的数据到一张表当中。数据可以以数据片段的形式一个接着一个的快速写入,数据片段在后台按照一定的规则进行合并。相比在插入时不断修改(重写)已存储的数据,这种策略会高效很多。
MergeTree表引擎
MergeTree在写入一批数据时,数据总会以数据片段的形式写入磁盘,且数据片段不可修改。为了避免片段过多,ClickHouse会通过后台线程,定期合并这些数据片段,属于相同分区的数据片段会被合成一个新的片段。这种数据片段往复合并的特点,也正是合并树名称的由来。
MergeTree作为家族系列最基础的表引擎,主要有以下特点:
-
存储的数据按照主键排序:允许创建稀疏索引,从而加快数据查询速度
-
支持分区,可以通过PARTITION BY语句指定分区字段。
-
支持数据副本
-
支持数据采样
-
主键并不用于去重
-
ReplacingMergeTree表引擎
为了解决MergeTree相同主键无法去重的问题,ClickHouse提供了ReplacingMergeTree引擎,用于删除排序键值相同的重复项。
虽然ReplacingMergeTree提供了主键去重的能力,但是仍旧有以下限制: -
在没有彻底optimize之前,可能无法达到主键去重的效果,比如部分数据已经被去重,而另外一部分数据仍旧有主键重复。
-
在分布式场景下,相同primary key的数据可能被sharding到不同节点上,不同shard间可能无法去重。
-
optimize是后台动作,无法预测具体执行时间点。
-
手动执行optimize在海量数据场景下要消耗大量时间,无法满足业务即时查询的需求。
因此ReplacingMergeTree更多被用于确保数据最终被去重,而无法保证查询过程中主键不重复。
SummingMergeTree表引擎
ClickHouse通过SummingMergeTree来支持对主键列进行预先聚合。在后台Compaction时,会将主键相同的多行进行sum求和,然后使用一行数据取而代之,从而大幅度降低存储空间占用,提升聚合计算性能。
值得注意的是:
- ClickHouse只在后台Compaction时才会进行数据的预先聚合,而compaction的执行时机无法预测,所以可能存在部分数据已经被预先聚合、部分数据尚未被聚合的情况。因此,在执行聚合计算时,SQL中仍需要使用GROUP BY子句。
- 在预先聚合时,ClickHouse会对主键列之外的其他所有列进行预聚合。如果这些列是可聚合的(比如数值类型),则直接sum;如果不可聚合(比如String类型),则随机选择一个值。
- 通常建议将SummingMergeTree与MergeTree配合使用,使用MergeTree来存储具体明细,使用SummingMergeTree来存储预先聚合的结果加速查询。
AggregatingMergeTree表引擎
AggregatingMergeTree也是预先聚合引擎的一种,用于提升聚合计算的性能。与SummingMergeTree的区别在于:SummingMergeTree对非主键列进行sum聚合,而AggregatingMergeTree则可以指定各种聚合函数。
AggregatingMergeTree的语法比较复杂,需要结合物化视图或ClickHouse的特殊数据类型AggregateFunction一起使用。在insert和select时,也有独特的写法和要求:写入时需要使用-State语法,查询时使用-Merge语法。
CollapsingMergeTree表引擎
CollapsingMergeTree用来消除ReplacingMergeTree的功能限制。该引擎要求在建表语句中指定一个标记列Sign,按照Sign的值将行分为两类:Sign=1的行称之为状态行,Sign=-1的行称之为取消行。每次需要新增状态时,写入一行状态行;需要删除状态时,则写入一行取消行。
后台Compaction时会将主键相同、Sign相反的行进行折叠(删除)。而尚未进行Compaction的数据,状态行与取消行同时存在。因此为了能够达到主键折叠(删除)的目的,需要业务层进行适当改造:
- 执行删除操作需要写入取消行,而取消行中需要包含与原始状态行主键一样的数据(Sign列除外)。所以在应用层需要记录原始状态行的值,或者在执行删除操作前先查询数据库获取原始状态行。
- 由于后台Compaction时机无法预测,在发起查询时,状态行和取消行可能尚未被折叠;另外,ClickHouse无法保证primary key相同的行落在同一个节点上,不在同一节点上的数据无法折叠。因此在进行count(*)、sum(col)等聚合计算时,可能会存在数据冗余的情况。为了获得正确结果,业务层需要改写SQL,将count()、sum(col)分别改写为sum(Sign)、sum(col * Sign)。
CollapsingMergeTree虽然解决了主键相同的数据即时删除的问题,但是状态持续变化且多线程并行写入情况下,状态行与取消行位置可能乱序,导致无法正常折叠。
VersionedCollapsingMergeTree表引擎
为了解决CollapsingMergeTree乱序写入情况下无法正常折叠问题,VersionedCollapsingMergeTree表引擎在建表语句中新增了一列Version,用于在乱序情况下记录状态行与取消行的对应关系。主键相同,且Version相同、Sign相反的行,在Compaction时会被删除。
与CollapsingMergeTree类似, 为了获得正确结果,业务层需要改写SQL,将count()、sum(col)分别改写为sum(Sign)、sum(col * Sign)。
Log系列
Log系列表引擎功能相对简单,主要用于快速写入小表(1百万行左右的表),然后全部读出的场景。
几种Log表引擎的共性是:
- 数据被顺序append写到磁盘上;
- 不支持delete、update;
- 不支持index;
- 不支持原子性写;
- insert会阻塞select操作。
它们彼此之间的区别是:
- TinyLog:不支持并发读取数据文件,查询性能较差;格式简单,适合用来暂存中间数据;
- StripLog:支持并发读取数据文件,查询性能比TinyLog好;将所有列存储在同一个大文件中,减少了文件个数;
- Log:支持并发读取数据文件,查询性能比TinyLog好;每个列会单独存储在一个独立文件中。
Integration系列
该系统表引擎主要用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。
- Kafka:将Kafka Topic中的数据直接导入到ClickHouse;
- MySQL:将Mysql作为存储引擎,直接在ClickHouse中对MySQL表进行select等操作;
- JDBC/ODBC:通过指定jdbc、odbc连接串读取数据源;
- HDFS:直接读取HDFS上的特定格式的数据文件。
- S3:直接读取S3上的特定格式的数据文件。
Special系列
Special系列的表引擎,大多是为了特定场景而定制的。
- Memory:将数据存储在内存中,重启后会导致数据丢失。查询性能极好,适合于对于数据持久性没有要求的1亿以下的小表。在ClickHouse中,通常用来做临时表;
- Buffer:为目标表设置一个内存buffer,当buffer达到了一定条件之后会flush到磁盘;
- File:直接将本地文件作为数据存储;
- Null:写入数据被丢弃、读取数据为空。
JDBC实例连接
版本
下面介绍的几种sdk,针对目前线上环境集群的版本推荐
client 版本 | clickhouse 版本 |
---|---|
0.3.2 | 22.3 |
0.2.6 | 20.8 |
客户端链接代码示例
- clickhouse-jdbc sdk
引入ClickHouse官方官方依赖包
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
编写程序demo代码如下:
import java.sql.*;
public class CHTest {
private static Connection connection= null;
static {
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String url = "jdbc:clickhouse://hostname:8123/mydatabase";
String user = "your username";
String password = "your password";
connection = DriverManager.getConnection(url,user,password);
} catch (Exception e) {
e.printStackTrace( );
}
}
public static void main(String[] args) throws SQLException {
Statement statement = connection.createStatement( );
ResultSet resultSet = statement.executeQuery("select * from mydatabase.test");
ResultSetMetaData metaData = resultSet.getMetaData( );
int columnCount = metaData.getColumnCount( );
while (resultSet.next()){
for (int i = 1; i <= columnCount; i++) {
System.out.println(metaData.getColumnName(i)+":"+resultSet.getString(i));
}
}
}
}
- clickhouse4j sdk
引入依赖包
<dependency>
<groupId>cc.blynk.clickhouse</groupId>
<artifactId>clickhouse4j</artifactId>
<version>1.4.4</version>
</dependency>
编写应用程序代码
public class client {
public static final String URL = "jdbc:clickhouse://10.91.117.23:8133";
public static final String USER = "***";
public static final String PASSWORD = "***";
public static void main(String[] args) throws Exception {
//1.加载驱动程序
Class.forName("cc.blynk.clickhouse.ClickHouseDriver");
//2. 获得数据库连接
Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);
//3.操作数据库,实现增删改查
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(" show databases");
Date nowTime = new Date();
System.out.println(nowTime);
while (rs.next()) {
System.out.println(rs.getString("name"));
}
}
}
- Java client
引入依赖包
<dependency>
<groupId>com.clickhouse</groupId>
<!-- or clickhouse-grpc-client if you prefer gRPC -->
<artifactId>clickhouse-http-client</artifactId>
<version>0.5.0</version>
</dependency>
编写应用程序代码
public class client {
public static final String URL = "jdbc:clickhouse://10.91.117.23:8133";
public static final String USER = "***";
public static final String PASSWORD = "***";
public static void main(String[] args) throws Exception {
/*
连接数据库
语法:protocol://host[:port][/database][?param[=value][¶m[=value]][#tag[,tag]]
*/
ClickHouseNodes servers = ClickHouseNodes.of(
"jdbc:ch:http://server1.domain,server2.domain,server3.domain/my_db"
+ "?load_balancing_policy=random&health_check_interval=5000&failover=2");
//查询
ClickHouseResponse response = client.connect(endpoint) // or client.connect(endpoints)
// you'll have to parse response manually if using a different format
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from numbers(:limit)")
.params(1000).executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
long totalRows = summary.getTotalRowsToRead();
//流式查询
ClickHouseResponse response = client.connect(endpoint) // or client.connect(endpoints)
// you'll have to parse response manually if using a different format
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from numbers(:limit)")
.params(1000).executeAndWait()) {
for (ClickHouseRecord r : response.records()) {
int num = r.getValue(0).asInteger();
// type conversion
String str = r.getValue(0).asString();
LocalDate date = r.getValue(0).asDate();
}
//插入
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
ClickHouseRequest<?> request = client.connect(servers).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
// load data into a table and wait until it's completed
request.write()
.query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
.data(myInputStream).execute().thenAccept(response -> {
response.close();
});
}
//多个查询
CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(servers.get(),
"create database if not exists my_base",
"use my_base",
"create table if not exists test_table(s String) engine=Memory",
"insert into test_table values('1')('2')('3')",
"select * from test_table limit 1",
"truncate table test_table",
"drop table if exists test_table");
// block current thread until queries completed, and then retrieve summaries
List<ClickHouseResponseSummary> results = future.get();
}
更多信息可以查阅官方文档
常见的集群间数据迁移方法
- 先将数据导出为TSV或CSV文件,再利用clickhouse-client --query命令导入
将数据导出为csv
示例:
clickhouse-client --query="select * from database.table FORMAT CSV" > /dev/shm/data.csv
将csv数据导入,示例:
cat /dev/shm/data | clickhouse-client -h 123.456.1.789 --query="insert into database.table FORMAT CSV
- insert into … select * from remote(‘ip’,db.table,‘user’,‘password’)
- 直接拷贝原始数据文件
拷贝的数据目录需要放在"/home/service/var/clickhouse/data/user_files/"目录下
执行attach命令,示例:
ATTACH TABLE test FROM '01188_attach/test' (s String, n UInt8) ENGINE = File(TSV);
attach命令执行成功后,/home/service/var/clickhouse/data/user_files/目录下的数据目录会自动被移除。
clickhouse-copier
管控台的数据导入功能主要是基于copier来实现的,官网介绍
针对常见问题这里做一些补充介绍:
- 迁移状态查询
copier支持–status命令用于查询整个迁移任务状态,该命令的实现原理是查询zk上的任务状态znode。
由于copier的迁移是根据任务配置里的表顺序来逐个处理的,因此没有开始处理的表在这里查不到状态信息
示例:
{
"table_event_local": {
"all_partitions_count": 65,
"processed_partitions_count": 65
}
}
- 实现原理
- 2.1 copier本身也是也是一个clickhouse节点,其代码实现是额外实现了ClusterCopierApp::mainImpl类,对应的二进制也是软链接到clickhouse二进制
- 2.2 piece表
目的端集群会创建piece表主要是为了拆分大的partition来使得迁移任务能均匀分配到多个copier进程,默认值为3。copier会跟据任务配置将需要迁移的表数按shard,partition,piece分配,每处理一下部分就会去zk上创建相应znode,当发现已有了相应znode标识已经有znode节点在执行,会跳过执行其它部分的迁移。 - 2.2.1 如果某个znode节点在处理过程中挂掉,相应的znode下的active_worker下记录的是个临时节点,对于这种部分会重新处理(drop相应的partition)。示例日志:
- 2.2.2 piece表数据的划分
根据目的端表新的分片策略取余来决定每个piece表的数据
示例:
WITH 197001 AS partition_key SELECT * FROM _local.`.read_shard_0.destination_cluster.default.xieyichen9` WHERE (toYYYYMM(EventDate) = partition_key) AND ((cityHash64(id) % 3) = 2) FORMAT Native
- 2.3 copier在迁移过程中会在自身节点创建_local数据库,这个库下会有copier需要用到的辅助表。
辅助表全部是distributed表,这样也符合了copier本地不存储数据的设计思路。
拉取数据的表
示例:
a. _local.`.read_shard_0.destination_cluster.default.xieyichen9`(从源端集群的shard 0来拉去数据的distributed表,使用的是特别的cluster来保证cluster里只有源端集群shard 0的拓扑信息)
b. _local.`.split.destination_cluster.default.xieyichen9_piece_2`(向目的端集群写数据的distributed表) - 2.4 数据的写入
数据写入是通过select insert的方式写入的,示例:
WITH 197001 AS partition_key SELECT * FROM _local.`.read_shard_0.destination_cluster.default.xieyichen9` WHERE (toYYYYMM(EventDate) = partition_key) AND ((cityHash64(id) % 3) = 2) FORMAT Native
INSERT INTO _local.`.split.destination_cluster.default.xieyichen9_piece_2` FORMAT Native
- 2.5 数据从piece辅助表move到最终表
使用attach partition from命令实现,这个命令不会删除源表和目的表的partition数据,即使有重名partition也是一个追加的动作。当然如果存在两条完全相同的数据会自动去重,这也是clickhouse的基本设计思路。
示例:
ALTER TABLE default.xieyichen9 ATTACH PARTITION 197001 FROM default.xieyichen9_piece_0
- 对于replicated类表的处理
读取源端数据:copier解析的时候会区分出shard和replica,读取的时候按照shard来读取
写入目的端数据:创建目的端的piece表时,会把engine中的replicated字段去掉,避免多余的副本同步,这样也会导致目的端的piece表只会存在某个副本上,而不是每个副本都有piece表 - 迁移过程断点续传
select insert前会检查目的端节点的piece表的相应partition,如果发现存在数据则认为存在脏数据,会在zk上创建is_dirty znode,后续发现会drop掉piece表上的partition再做处理。如果当前piece任务的znode中存在active_worker则表示当前存在正在处理这部分数据的copier,会跳过这部分继续处理其他piece - task配置动态加载
官网介绍task配置是可以动态加载的,其实是每次处理完当前piece任务后会重新读取zk上的配置。但是对于max_worker配置(允许并发运行的copier进程数),其限制是每次处理piece任务内会检查当前的active_worker,从而导致max_worker配置无法动态加载。 - copier进程停止
使用kill命令无法杀死copier,因为主线程启动的子线程没有处理kill信号。需要使用kill -9命令
ClickHouse 中支持 Join 的类型
INNER JOIN
可以省略INNER关键字,默认即INNER JOIN
OUTER JOIN
- 左外连接(LEFT OUTER JOIN)的行为类似于内连接(INNER JOIN),但是它还会返回左表中的非匹配行,同时用右表列的默认值填充。
- 右外连接(RIGHT OUTER JOIN)查询类似于左外连接,它会返回右表中的非匹配行,同时用左表列的默认值填充。
- 全外连接(FULL OUTER JOIN)查询结合了左外连接和右外连接的特性,它会返回左表和右表中的非匹配行,并使用左表和右表列的默认值填充。
CROSS JOIN
在不考虑连接键的情况下产生两个表的完全笛卡尔积。左表的每一行都与右表的每一行组合在一起。
SEMI JOIN
- 左半连接(LEFT SEMI JOIN)查询会返回左表中至少在右表中有一个连接键匹配的行的列值。它只返回找到的第一个匹配项,并禁用笛卡尔积。
- 右半连接(RIGHT SEMI JOIN)查询与之类似,它返回右表中至少在左表中有一个匹配的行的值,同样只返回找到的第一个匹配项。
ANTI JOIN
- 左反连接(LEFT ANTI JOIN)返回左表中所有非匹配行的列值。
- 右反连接(RIGHT ANTI JOIN)返回右表中所有非匹配行的列值。
ANY JOIN
相比默认的all join,any join在遇到多条匹配记录时,ClickHouse仅返回第一个匹配项的组合列值。
ASOF JOIN
ASOF JOIN提供了非精确匹配的能力。如果左表中的一行在右表中没有精确匹配,那么将使用最接近的右表行作为匹配。
join底层原理
hash join
下面展示了将hash join整合到ClickHouse查询流水线中的示意图:
可以看到:
① 右侧表的所有数据被流式传输(由于max_threads = 2,以2个线程并行),然后ClickHouse将这些数据填充到内存中的哈希表中。
② 左侧表的数据被流式传输(由于max_threads = 2,以2个线程并行),并且 ③ 通过在哈希表中进行查找来与右侧表进行联接。
使用更小的右表来加快查询
join关键字右侧的表被称为右表,由于ClickHouse将右侧表格并创建一个哈希表放在RAM中,将较小的表格放在联接的右侧会更节省内存。
示例:
SELECT
query,
formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
formatReadableSize(memory_usage) AS memory_usage,
formatReadableQuantity(read_rows) AS read_rows,
formatReadableSize(read_bytes) AS read_data
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;
Row 1:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'hash'
query_duration: 5 seconds
memory_usage: 8.95 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
Row 2:
──────
query: SELECT *
FROM roles AS r
JOIN actors AS a ON r.actor_id = a.id
FORMAT `Null`
SETTINGS join_algorithm = 'hash'
query_duration: 0 seconds
memory_usage: 716.44 MiB
read_rows: 101.00 million
read_data: 3.41 GiB
通过上面的示例可以看到,右侧较小的 actors 表的联接查询消耗的内存明显比右侧较大的 roles 表的联接查询要少。
另外,所指示的峰值内存使用量为8.95 GiB和716.44 MiB,比两个查询运行中各自右侧表的未压缩大小2.63 GiB和21.81 MiB要大。原因是哈希表的大小是根据联接键列的类型以及特定内部哈希表缓冲区大小的倍数来选择并动态增加的。 memory_usage 指标计算了为哈希表保留的总内存,尽管它可能没有完全填充。
对于两个查询的执行,ClickHouse读取了相同数量的总行数(和数据):从roles表中读取1亿行+从actors表中读取100万行。然而,右侧较大的roles表的联接查询速度慢了五倍。这是因为默认的哈希联接对于将右侧表的行插入哈希表而言并不是线程安全的。因此,哈希表的填充阶段在单个线程中运行。我们可以通过检查实际的查询流水线来进一步确认这一点。
使用EXPLAIN语句来打印用DOT图形描述语言描述的查询流水线图,并使用Graphviz dot将图形呈现为PDF格式,语句示例:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors a
JOIN roles r ON a.id = r.actor_id
SETTINGS max_threads = 2, join_algorithm = 'hash';" | dot -Tpdf > pipeline.pdf
这里给上面生成的图片加了一些注释,稍微简化了主要阶段的名称,并添加了两个参与联接的表,以使两个图表保持对齐:
可以看到查询流水线①从两个并行的流式传输阶段开始(因为max_threads设置为2),用于从右侧表格流式传输数据,接着是一个单个的填充阶段,用于填充哈希表。两个额外的并行流式传输阶段②和两个并行联接阶段③用于流式传输和联接左侧表格的数据。
正如前面提到的,对于将右侧表格的行插入哈希表, 默认的哈希联接算法不是线程安全的。因此,上述流水线中使用了一个调整大小阶段,将从右侧表格流式传输数据的两个线程减少为单线程的填充阶段。这可能会成为查询运行时的瓶颈。如果右侧表格很大-参见我们上面的两个查询运行,在联接的右侧有大型的 roles 表格的查询比较慢,速度是另一个查询的五倍。
代码实现
- addJoinedBlock根据join的限制条件最终决定将哪些右表数据存入到内存hash表中,hash表为key->row的结构。对于多个join连接键,多个键组成一个复合键;对于join all的场景,相同的join key如果对应多个row时,hash表的value也是个容器,可以存放多条匹配的row
- joinBlock将左表(通过 block 参数传递)与哈希表中存储的右表数据进行 JOIN 操作,并将结果插入到 block 中,同时可能添加右表的列(比如对于join all中的没有匹配的行)。
pipeline处理器和调度器
- Transform: Transform是ClickHouse查询处理过程中的一个基本操作单元,它负责对数据进行单个特定的转换或计算步骤,比如过滤(FilterTransform)、投影(ProjectionTransform)、聚合(AggregatingTransform)等。
- Pipeline: Pipeline可以理解为一系列Transform的有序组合,这些Transform按照一定的逻辑顺序排列,形成一条连续的数据处理流水线。数据从Pipeline的一端流入并经过每个Transform的处理后流向另一端。Pipeline模型使得ClickHouse能够以流式的方式高效地处理数据,并可能利用多核CPU资源实现并行计算。
- Step: 在ClickHouse的内部实现中,Step通常指的是Pipeline中的一个具体阶段或者动作,可以是一个Transform,也可以是一系列Transform的操作集合。每一个Step代表了查询执行过程中的一部分工作。
- QueryPlan: QueryPlan则是对整个SQL查询语句解析后的执行计划表示,它包含了完成整个查询所需的所有Pipeline和其他必要的信息。QueryPlan不仅包括各个Transforms的组织结构,还包括了Join策略、排序、分组、限制等高级查询特性的实现细节。构建QueryPlan的过程涉及到了优化器的选择和规划,确保最终生成的执行计划尽可能高效。
transform的具体执行是通过processor,通常是work方法,在join中,JoiningTransform本身就是继承了IProcessor方法的,从名字就可以看出JoiningTransform是用于将左表block和右表block的join操作(这里的join操作是join类指针,会根据具体的join类类型来做,具体的方法名是transformHeader),除此之外还有FillingRightJoinSideTransform等processor,这些processor的定义都在JoiningTransform.cpp中。每个transform的输入元数据由ISource接口类来提供,最终的输出由ISinker接口类来提供。processor各个状态之间的轮转控制在PipelineExecutor::executeStepImpl方法中实现。
当 QueryPipeline 进行 Transformer 编排时,我们还需要进行更加底层的 DAG 连通构建。实现代码如下示例:
connect(Source.OutPort, FilterTransform.InPort)
connect(FilterTransform.OutPort, SortingTransform.InPort)
connect(SortingTransform.OutPort, LimitByTransform.InPort)
connect(LimitByTransform.OutPort, Sinker.InPort)
这样就实现了数据的流向关系,一个 Transformer 的 OutPort 对接另外一个的 InPort,就像我们现实中的水管管道一样,接口有 3 通甚至多通。
整个pipeline的运行最终是通过PipelineExecutor类的executor方法来实现的。
block和chunk
- Block 类在 ClickHouse 中是一个核心数据结构,它代表了一组列和这些列中多行的数据。这个类可以理解为内存中的一个数据块,用于存储查询处理过程中的中间结果或最终结果。
- Chunk 类在 ClickHouse 中是一个轻量级的数据结构,用于存储一组相同长度的列数据。相比于 Block 类,它不存储列名、类型和索引信息,更注重性能与内存效率。
parallel hash join(22.7版本引入,目前未支持)
parallel hash join是hash join的一种变体,它将输入数据分割成多个部分并同时构建多个哈希表,以加快联接速度,但会增加内存开销。如下图所示:
可以看到:
① 从右侧表格的所有数据被流式传输(由于 max_threads = 2 ,以2个线程并行)到内存中。数据以块为单位进行流式传输。每个流式传输的块中的行通过将哈希函数应用于每一行的联接键而被分成2个桶( max_threads = 2 )。在上面的图表中用橙色和蓝色表示。同时每个桶使用单个线程填充一个内存中的哈希表。
② 从左侧表格中的数据被流式传输(由于 max_threads = 2 ,以2个线程并行),并且将步骤①中使用的相同“桶哈希函数”应用于每一行的联接键,以确定相应的哈希表,并 ③ 通过对相应的哈希表进行查找来进行联接。
max_threads 设置确定了并行哈希表的数量。
通过示例比较下hash join和parallel hash join两种算法的查询耗时和内存消耗情况
SELECT
query,
formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
formatReadableSize(memory_usage) AS memory_usage,
formatReadableQuantity(read_rows) AS read_rows,
formatReadableSize(read_bytes) AS read_data
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;
Row 1:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'parallel_hash'
query_duration: 2 seconds
memory_usage: 18.29 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
Row 2:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'hash'
query_duration: 5 seconds
memory_usage: 8.86 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
parallel hash join的运行时间大约比hash join快100%,但峰值内存消耗超过了两倍,尽管读取的行数、数据量以及右侧表格的大小对于两个查询来说是相同的。
上面的查询语句中没有指定max_threads参数,因此会自动取默认值为当前运行节点的cpu核数。
设置max_threads=2获取查询流水线图,查询语句示例:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors a
JOIN roles r ON a.id = r.actor_id
SETTINGS max_threads = 2, join_algorithm = 'parallel_hash';" | dot -Tpdf > pipeline.pdf
可以看到,在填充阶段中有两个并发(并行)的填充阶段,用于使用右侧表格的数据填充两个哈希表。此外,还使用两个并发的联接阶段来联接(通过哈希表查找)左侧表格的数据。
上面的查询流水线中resize阶段,用于在所有fill阶段和所有join阶段之间建立明确的联结:所有join阶段应等待所有fill阶段完成。
grace hash join(22.3.15-custom.1版本开始支持)
上述描述的哈希和并行哈希联接算法都非常快速,但受内存限制。如果右侧表格无法适应主内存,ClickHouse将引发OOM异常。
grace hash join使用两阶段的方法来联接数据。下图显示了第一阶段的过程:
① 从右侧表格的所有数据以块为单位(由于 max_threads = 2 ,以2个线程并行)流式传输到内存中。每个流式传输的块中的行通过将哈希函数应用于每一行的联接键而被分成3个桶(因为 grace_hash_join_initial_buckets = 3 )。在上图中用橙色、蓝色和绿色表示这一过程。一个内存中的哈希表使用来自第一个(橙色)桶的行进行填充。来自右侧表格的其他两个(绿色和蓝色)桶的联接被延迟,并保存到临时存储中。
如果内存中的哈希表超过了内存限制(由 max_bytes_in_join 设置),ClickHouse会动态增加桶的数量,并重新计算每一行分配的桶。不属于当前桶的任何行都会被刷新和重新分配。
ClickHouse始终将 grace_hash_join_initial_buckets 设置的值向上舍入到最接近的2的幂。因此,将3舍入为4,并使用4个初始桶。为了可读性,在图表中使用了3个桶,与使用4个桶的内部工作没有实质性差别。
② 左侧表格的数据以2个线程并行流式传输( max_threads = 2 ),并且将步骤①中使用的相同“桶哈希函数”应用于每一行的联接键,以确定相应的桶。与第一个桶对应的行被 ③ 联接(因为相应的哈希表在内存中)。其他桶的联接被延迟,并保存到临时磁盘文件中。
步骤①和②的关键是,“桶哈希函数”将一致地将值分配给相同的桶,从而有效地将数据进行分区,并通过分解来解决问题。
在第二阶段,ClickHouse处理在磁盘上剩余的桶。剩余的桶按顺序进行处理。以下两个图表概述了这一过程。第一个图表显示了如何首先处理蓝色桶。第二个图表显示了最后一个绿色桶的处理过程。
① ClickHouse从右侧表格数据的每个桶构建哈希表。同样,如果ClickHouse内存不足,它会动态增加桶的数量。
② 一旦从右侧表格的一个桶构建了哈希表,ClickHouse会流式传输相应左侧表格桶的数据,并 ③ 完成该对的联接。
在此阶段,可能会有一些行属于当前桶以外的另一个桶,这是因为它们在桶的数量动态增加之前被保存到临时存储中。在这种情况下,ClickHouse会将它们保存到新的实际桶中,并进一步处理它们。
这个过程对于剩余的所有桶都会重复进行。
通过示例对比grace hash join和hash join:
SELECT
query,
formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
formatReadableSize(memory_usage) AS memory_usage,
formatReadableQuantity(read_rows) AS read_rows,
formatReadableSize(read_bytes) AS read_data
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;
Row 1:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 3
query_duration: 13 seconds
memory_usage: 3.72 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
Row 2:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'hash'
query_duration: 5 seconds
memory_usage: 8.96 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
正如预期的那样,哈希联接更快。然而,grace hash联接只消耗了峰值主内存的一半。
通过增加 grace_hash_join_initial_buckets 设置,可以进一步减少grace hash联接的主内存消耗。
通过示例语句获取查询pipeline:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS max_threads = 2, join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 3';" | dot -Tpdf > pipeline.pdf
可以看到,在①处有两个并行流式传输阶段( max_threads=2 ),从右侧表格中的数据流式传输到内存中。我们还可以看到使用了两个并行填充阶段来填充内存中的哈希表。两个额外的并行流式传输阶段②和两个并行联接阶段③用于流式传输和联接左侧表格的数据。
这里无法从查询流水线中看到桶的数量,因为桶的创建是动态的,并且取决于内存压力,ClickHouse会根据需要动态增加桶的数量。所有的桶都在Delayed Joined Blocks Worker Transform阶段中进行处理。
代码实现
- pipeline图中的FillingRightJoinSide transformer对应的是addJoinedBlock方法,该方法会把输入的block打散到各个buckets(JoinCommon::scatterBlockByHash),之后将分散后的数据块flush到。buckets_snapshot指定的桶中,并将当前处理的数据块赋值给current_block。接下来,尝试将current_block添加到内存中的join结构,这部分其实是走的hash join的addJoinedBlock方法。如果使用hash join的addJoinedBlock方法时发现,超过了内存限制,则会将当前bucket数调整为两倍。
对于其他block写入磁盘临时文件,对应方法为flushBlocksToBuckets,每个buckets一个临时文件。 - JoiningTransform对应了joinBlock方法,类似上面的实现,也是对输入的block进行分桶,然后把当前处理的bucket对应的block,传给hash join的joinBlock方法。其余bucket的block通过flushBlocksToBuckets方法写到磁盘文件上。
- DelayedJoinedBlocksTransform对应的是getDelayedBlocks方法,实现了读取当前bucket对应的右表数据的临时文件,并生成DelayedBlocks。DelayedJoinedBlocksWorkerTransform则会调用DelayedBlocks的next方法,实现读取对应bucket的左表临时文件数据,并执行hash join的joinBlock方法。
为了检查已创建和处理的桶的数量,需要查看日志来查看grace hash join的实际执行情况。通过执行下述命令来获取trace日志。
set send_logs_level='trace'
对于这条查询语句:
SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT Null
SETTINGS max_threads = 2, join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 3;
其trace日志为:
...
... GraceHashJoin: Initialize 4 buckets
... GraceHashJoin: Joining file bucket 0
...
... imdb.actors ... Reading approx. 817718 rows with 2 streams
...
... imdb.roles ... Reading approx. 3431966 rows with 2 streams
...
... GraceHashJoin: Joining file bucket 1
... GraceHashJoin: Loaded bucket 1 with 204377(/858327) rows
...
... GraceHashJoin: Joining file bucket 2
... GraceHashJoin: Loaded bucket 2 with 204426(/861800) rows
...
... GraceHashJoin: Joining file bucket 3
... GraceHashJoin: Loaded bucket 3 with 204577(/854663) rows
...
... GraceHashJoin: Finished loading all 4 buckets
...
可以看到创建了4个而不是3个初始桶。ClickHouse始终将 grace_hash_join_initial_buckets 设置的值向上舍入到最接近的2的幂。还可以看到每个表格使用了2个并行流式传输阶段来读取表格的行。两个表格的bucket 0是立即进行联接。
其他3个桶被写入磁盘,并在稍后顺序加载以进行联接。可以看到,每个桶中的行数均匀分配 - 分别为2万行和约8万行,分别对应于24万行和34万行。
clickhouse磁盘临时文件
代码中临时文件的实现主要通过TemporaryFileOnDisk类来实现,TemporaryFileOnDisk类包装了一层对第三方库Poco::TemporaryFile来实现操作临时文件的,并补充了关于临时文件的相关监控项,包括文件数,文件大小,行数,以及读写速度等。对于临时文件的使用主要集中在sort,aggreation,join三个计算过程中。同时还补充了参数max_temporary_data_on_disk_size_for_user/max_temporary_data_on_disk_size_for_query用于限制临时文件大小。
full sorting merge join(暂不支持)
① 从右表格中的所有数据以块为单位并行地通过2个流式传输阶段(因为 max_threads = 2 )流式传输到内存中。两个并行的排序阶段按联接键列的值对每个流式传输的块中的行进行排序。这些排序后的块通过两个线程并行的存储到临时存储中。
② 与①同时进行,左表格的所有数据以块为单位并行地通过2个线程( max_threads = 2 )流式传输,类似于①,每个块都会进行排序并存储到磁盘上。
③ 以每个表格一个流的方式,从磁盘上读取排序后的块,并进行合并排序,通过合并(交替扫描)两个排序流来识别join on的列。
可以通过参数max_bytes_before_external_sort来触发将数据溢出到磁盘上的过程.
不设置max_bytes_before_external_sort
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;
将 max_bytes_before_external_sort 设置为较低的阈值来触发外部排序:
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0, max_bytes_before_external_sort = '100M';
通过上面两个示例查询语句来的query_log信息来对比查询情况。
SELECT
query,
formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
formatReadableSize(memory_usage) AS memory_usage,
formatReadableQuantity(read_rows) AS read_rows,
formatReadableSize(read_bytes) AS read_data,
formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;
Row 1:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge',
max_rows_in_set_to_optimize_join = 0,
max_bytes_before_external_sort = '100M'
query_duration: 12 seconds
memory_usage: 3.49 GiB
read_rows: 132.92 million
read_data: 4.49 GiB
data_spilled_to_disk_uncompressed: 1.79 GiB
data_spilled_to_disk_compressed: 866.36 MiB
Row 2:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge',
max_rows_in_set_to_optimize_join = 0
query_duration: 11 seconds
memory_usage: 4.71 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed: 0.00 B
可以看到,对于使用降低的 max_bytes_before_external_sort 设置运行的查询,使用的内存较少,并且数据溢出到磁盘,表明使用了外部排序。
使用下面的示例语句来查看查询流水线
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0
;" | dot -Tpdf > pipeline.pdf
如果要排序的块数据的最大内存占用量保持在配置的外部排序阈值以下,就不需要将数据存储在磁盘上,并且排序后的块会立即进行合并排序和联接。
要排序的块数据的最大内存占用量与两个联接表中的总数据量关系不大,而更多地取决于查询管道内配置的并行级别。在ClickHouse中,数据是流式处理的:数据以并行和块方式流式传输到(内存中的)查询引擎中。流式传输的数据块按顺序和并行方式由特定的查询管道阶段进行处理,因此一旦一些表示(部分)查询结果的块可用,它们就会从内存中流式传输回查询的发送方。
将 max_bytes_before_external_sort 设置为较低的阈值来触发外部排序,并查看trace log:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0, max_bytes_before_external_sort = '100M';"
...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
...
... MergeSortingTransform: ... writing part of data into temporary file …
...
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
...
... MergeJoinAlgorithm: Finished processing in … seconds, left: 16 blocks, 1000000 rows; right: 1529 blocks, 100000000 rows, max blocks loaded to memory: 3
...
这里使用了默认的 max_threads 设置。该设置控制查询管道内的并行级别。执行查询的节点具有30个CPU,因此默认的 max_threads 设置为30。为了使查询管道的可视化简洁和可读性好,设置 max_threads = 2 。
可以看到有6个并行流和30个并行流分别用于以块为单位将数据从两个表格流式传输到查询引擎中。使用了6个并行流,而不是30个并行流,用于包含100万行的 actors 表。这是因为设置了merge_tree_min_rows_for_concurrent_read_for_remote_filesystem。该设置配置了单个查询执行线程应该至少读取/处理的最小行数。默认值为163,840行。而1百万行/163,840行=6个线程。对于包含1亿行的 roles 表,结果将为610个线程,超过了配置的最大值30个线程。
还可以看到MergeSortingTransform(在上面的图表中被简化为’spill’)阶段的条目,指示数据(排序后的块的数据)溢出到磁盘上的临时存储中。MergingSortedTransform阶段(在上面的图表中称为’merge sort’)的条目总结了从临时存储中读取的排序块的合并排序过程。
最后的MergeJoinAlgorithm条目总结了联接处理过程:来自左表的1百万行以块为单位(通过6个并行流)以16个块的形式流式传输(每个块中约62500行-接近默认块大小)。来自右表的1亿行以块为单位(通过30个并行流)以1529个块的形式流式传输(每个块中约65400行)。在流式处理过程中,在 merge join 阶段同时最多在内存中保存3个具有相同联接键的块中的行。对于示例查询中INNER join的ALL严格性而言,这些行的笛卡尔乘积在内存中完成。
再查看一下没有降低外部排序阈值的查询运行的trace log:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;"
...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
...
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
...
... MergeJoinAlgorithm: Finished processing in … seconds, left: 16 blocks, 1000000 rows; right: 1529 blocks, 100000000 rows, max blocks loaded to memory: 3
...
日志条目表明查询执行未触发任何数据溢出到磁盘,因为块数据的最大内存使用量保持在默认的外部排序阈值以下。因此,溢出阶段被跳过,排序块立即进行了合并排序和联接操作,而无需进行基于磁盘的排序。
外部排序
外部排序通常应用于数据量超过内存容量的情形。在这种情况下,无法一次性将所有待排序的数据加载到内存中进行排序,所以需要借助硬盘等外部存储设备作为临时空间,将整个排序过程分解成多个阶段来实现。
外部排序通常包括以下几个步骤:
分割(Partitioning):首先将大文件分割成若干个子文件,每个子文件的大小足够小,能够被装入内存进行排序。
内部排序(In-memory Sorting):对每个子文件在内存中独立进行排序,生成有序的子序列。
归并(Merge):接着采用多路归并排序(如 k-way merge)的方法,将上述排序好的子文件逐个读入内存,并合并成更大的有序子序列。在归并过程中,会根据记录的排序关键字选取最小值加入到结果序列中,直到所有子文件都被处理完。
循环归并:如果单次归并后得到的序列仍然大于内存容量,则需要再次进行分割和归并操作,直至最终得到一个完整的有序序列。
写回(Write-back):将最终生成的全局有序序列写回硬盘或其他外部存储介质。
在联接之前,通过使用彼此的联接键值对表进行筛选
在排序合并联接之前,可以通过彼此的联接键对加入的表进行过滤,以最小化需要排序和合并的数据量。为此,ClickHouse会构建一个内存集合,其中包含右表的联接键列的(唯一)值,并使用该集合过滤掉左表中所有不可能有联接匹配的行,反之亦然。如果一张表比另一张表小得多,并且表的唯一联接键列值适合内存,那么这个方法尤其有效。
full sorting merge join以相同的方式适用于左表和右表,在两个表都大于可用内存的情况下,它将自动回退到外部排序。这种优化是为了将哈希联接的性能带到完全排序合并联接的特定用例中。max_rows_in_set_to_optimize_join设置控制着这种优化。将其设置为0会禁用它。默认值为100,000。该值指定了两个表集合的最大允许大小(按条目计算)。这意味着如果两个集合加在一起仍然低于阈值,则优化将应用于两个表。如果两个集合加在一起超过了阈值,那么仍然可能有一个集合低于阈值,并且优化将仅应用于一个表。下面的trace log展示了ClickHouse将顺序尝试为两个表构建集合,并在超过限制时进行回退和跳过构建集合。
查询键值键的基数
SELECT countDistinct(first_name)
FROM actors;
┌─uniqExact(first_name)─┐
│ 109993 │
└───────────────────────┘
SELECT countDistinct(role)
FROM roles;
┌─uniqExact(role)─┐
│ 999999 │
└─────────────────┘
使用 max_rows_in_set_to_optimize_join 设置的默认值100,000,该优化不会应用于任何表。执行示例查询语句:
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge';
执行带有 max_rows_in_set_to_optimize_join 设置为 200,000 的示例查询。请注意,这个限制对于构建两个表的集合来说仍然太低。但它允许构建较小的左表的集合,这是这种优化的主要思想,即当一个表远小于另一个表,并且表的唯一联接键列值适合内存时,它的效果尤其好:
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 200_000;
通过查看query_log查看更多细节:
SELECT
query,
formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
formatReadableSize(memory_usage) AS memory_usage,
formatReadableQuantity(read_rows) AS read_rows,
formatReadableSize(read_bytes) AS read_data,
formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;
Row 1:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge',
max_rows_in_set_to_optimize_join = 200_000;
query_duration: 2 seconds
memory_usage: 793.30 MiB
read_rows: 101.00 million
read_data: 3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed: 0.00 B
Row 2:
──────
query: SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge';
query_duration: 11 seconds
memory_usage: 4.71 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed: 0.00 B
可以看到预过滤优化的效果:执行时间快了5倍,峰值内存消耗减少了约6倍。
查询开启优化后的查询pipeline:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge';" | dot -Tpdf > pipeline.pdf
与未应用任何优化的全排序合并联接算法的通用版本相比,可以看到额外的阶段(在上面的图表中以蓝色和绿色表示)。这些阶段负责在联接之前通过彼此的联接键值来过滤两个表:
使用两个并行的蓝色 CreatingSetsOnTheFlyTransform 阶段来构建包含右表联接键列值的内存集合。然后,这个集合由两个并行的蓝色 FilterBySetOnTheFlyTransform 阶段使用,用于过滤掉左表中所有不可能有联接匹配的行。
使用两个并行的绿色 CreatingSetsOnTheFlyTransform 阶段来构建包含左表联接键列值的内存集合。然后,这个集合由两个并行的绿色 FilterBySetOnTheFlyTransform 阶段使用,用于过滤掉右表中所有不可能有联接匹配的行。
在这些集合从联接键列中完全构建之前,通过并行块来流式传输包含所有所需列的行,绕过过滤优化以便对每个块内的行按其联接键进行排序并(可能)将其溢写到磁盘。过滤器仅在集合准备好之后开始工作。这就是为什么还有两个 ReadHeadBalancedProcessor 阶段。这些阶段确保在集合准备好之前,以两个表的总大小成比例的方式从两个表中流式传输数据,以防止在小表用于过滤之前,大表的数据大部分被处理完的情况发生。
通过查看trace log来了解细节:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 200_000;"
...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
...
... CreatingSetsOnTheFlyTransform: Create set and filter Right joined stream: set limit exceeded, give up building set, after reading 577468 rows and using 96.00 MiB
...
... CreatingSetsOnTheFlyTransform: Create set and filter Left joined stream: finish building set for [first_name] with 109993 rows, set size is 6.00 MiB
...
... FilterBySetOnTheFlyTransform: Finished create set and filter right joined stream by [role]: consumed 3334144 rows in total, 573440 rows bypassed, result 642125 rows, 80.74% filtered
... FilterBySetOnTheFlyTransform: Finished create set and filter right joined stream by [role]: consumed 3334144 rows in total, 573440 rows bypassed, result 642125 rows, 80.74% filtered
...
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
...
... MergeJoinAlgorithm: Finished processing in 3.140038835 seconds, left: 16 blocks, 1000000 rows; right: 207 blocks, 13480274 rows, max blocks loaded to memory: 3
...
可以看到这里使用6个并行流和30个并行流来将数据从两个表流式传输到查询引擎。
另外可以看到一个 CreatingSetsOnTheFlyTransform 阶段的条目,表明无法构建包含右表联接键列的内存集,因为条目数量将超过 max_rows_in_set_to_optimize_join 设置的阈值200000。
另一个 CreatingSetsOnTheFlyTransform 阶段的条目显示成功构建了包含左表联接键列的集合。该集合用于过滤右表的行,这由 FilterBySetOnTheFlyTransform 阶段的30个条目表示(这里只显示前两个条目,省略其余的)。30个条目对应于ClickHouse使用30个并行流阶段从右表流式传输行,并使用30个并行的 FilterBySetOnTheFlyTransform 阶段来过滤这30个流。
partial merge join
ClickHouse的partial merge join是对full sorting merge join的一种优化。当联接大型表时,它通过仅对右表进行外部排序来最小化内存使用。为了减少在内存中处理的数据量,它在磁盘上创建了最小-最大索引。左表始终以块方式和内存中进行排序。如果左表的物理行顺序与联接键的排序顺序匹配,则内存中识别联接匹配更为高效。具体的代码实现对应了类MergeJoin
ClickHouse的partial merge join与hash join的流程非常相似。partial merge join重用了哈希联接的流程,因为它也具有构建和扫描阶段。hash join首先从右表构建哈希表,然后扫描左表。类似地,partial merge join首先构建右表的排序版本,然后扫描左表:
① 首先,将右表的所有数据按块通过并行的两个流(因为 max_threads = 2 )流式传输到内存中。通过填充阶段,对每个流式传输的块中的行按联接键列的值进行排序,并与每个排序块一起将它们溢出到临时存储中。每个排序块都包含一个最小-最大索引,该索引存储了该块包含的联接键的最小和最大值。在步骤 ② 中,通过这些最小-最大索引来最小化内存中处理的数据量,以便在识别联接匹配时使用。
② 然后,所有来自左表的数据按块通过两个流( max_threads = 2)同时流式传输。每个块在流式传输时根据联接键进行即时排序,然后与右表在磁盘上的排序块进行匹配(步骤 ③)。最小-最大索引用于仅加载可能包含联接匹配的右表块。
这种联接处理策略非常高效地利用了内存,无论联接表的大小和物理行顺序如何。在上述的步骤 ① 中,右表的只有少量块在被写入临时存储之前会通过内存进行流式传输。在步骤 ② 中,只有少量块从左表通过内存进行流式传输。步骤 ① 中创建的最小-最大索引有助于最小化从临时存储加载右表块的数量,以识别联接匹配。
如果左表的物理行顺序与联接键的排序顺序匹配,则基于最小-最大索引的跳过非匹配右表块的操作最为有效。然而,当左表的数据块具有一般分布的联接键值时,使用部分合并联接算法的代价最高。因为如果左表的每个数据块包含一大部分分布广泛的联接键值,那么右表的排序块的最小-最大索引就无法起到作用,实际上会在每个左表块和一大组从磁盘加载的右表排序块之间创建一个笛卡尔积。
这里有三个partial merge join相关的查询语句用来对比
使用联接键作为联接表排序键的前缀,以从上述基于最小-最大索引的性能优化中获益
SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';
0 rows in set. Elapsed: 33.796 sec. Processed 101.00 million rows, 3.67 GB (2.99 million rows/s., 108.47 MB/s.)
运行相同的查询,但是左表在磁盘上具有不同的物理顺序。重新创建了一个按非联接键列排序的演员表副本。这意味着行按照随机的联接键顺序排列。这对于部分合并联接的执行时间来说是最糟糕的情况:
SELECT *
FROM actors_unsorted AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';
0 rows in set. Elapsed: 44.872 sec. Processed 101.00 million rows, 3.67 GB (2.25 million rows/s., 81.70 MB/s.)
相比上一次运行,执行时间慢了36%。
使用完全排序合并算法运行相同的查询。为了与部分合并算法进行比较,强制进行外部排序。通过禁用full sorting merge join的“按顺序流式处理优化”(不将 max_rows_in_set_to_optimize_join 设置为0)。并且降低 max_bytes_before_external_sort 的值:
SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_bytes_before_external_sort = '100M';
0 rows in set. Elapsed: 7.381 sec. Processed 139.35 million rows, 5.06 GB (18.88 million rows/s., 685.11 MB/s.)
通过查询query_log获取这三次查询的详细信息
SELECT
query,
formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
formatReadableSize(memory_usage) AS memory_usage,
formatReadableQuantity(read_rows) AS read_rows,
formatReadableSize(read_bytes) AS read_data,
formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND (hasAll(tables, ['imdb_large.actors', 'imdb_large.roles']) OR hasAll(tables, ['imdb_large.actors_unsorted', 'imdb_large.roles']))
ORDER BY initial_query_start_time DESC
LIMIT 3
FORMAT Vertical;
Row 1:
──────
query: SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge',
max_bytes_before_external_sort = '100M';
query_duration: 7 seconds
memory_usage: 3.54 GiB
read_rows: 139.35 million
read_data: 4.71 GiB
data_spilled_to_disk_uncompressed: 1.62 GiB
data_spilled_to_disk_compressed: 1.09 GiB
Row 2:
──────
query: SELECT *
FROM actors_unsorted AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';
query_duration: 44 seconds
memory_usage: 2.20 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
data_spilled_to_disk_uncompressed: 5.27 GiB
data_spilled_to_disk_compressed: 3.52 GiB
Row 3:
──────
query: SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';
query_duration: 33 seconds
memory_usage: 2.21 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
data_spilled_to_disk_uncompressed: 5.27 GiB
data_spilled_to_disk_compressed: 3.52 GiB
在第2行和第3行中可以看到两次partial merge join的内存使用量和溢出到磁盘的数据量相同。然而,正如上面详细解释的那样,当左表的物理行顺序与联接键顺序匹配时,在第3行的运行中执行速度更快。
即使在联接表的(人为强制)完全外部排序的情况下,第1行的完全排序合并联接的执行速度几乎比第3行的部分合并联接的执行速度快5倍。尽管部分合并联接的设计意图是使用更少的内存。
通过下面的语句获取查询pipeline:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'partial_merge';" | dot -Tpdf > pipeline.pdf
partial merge join重用了hash join的流程,因为它像哈希联接一样有构建和扫描阶段:partial merge join首先构建右表的排序版本,然后扫描左表。
查看示例语句的trace log:
./bin/clickhouse client --port 9001 --password bQSknrGM3FdjDbpI --user ck_super_account --database=imdb --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';"
...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
...
... MergingSortedTransform: Merge sorted 1528 blocks, 100000000 rows …
...
可以看到有6个和30个并行流用于以块为单位将数据从两个表流式传输到查询引擎。
一个MergingSortedTransform条目总结了联接处理过程:来自具有1亿行的右表的1528个数据块被排序,然后与左表的块进行合并联接。这是因为,100 million行的1528个数据块相当于每个块约为~65445行,这对应于默认的块大小。
direct join(暂不支持)
直接联接算法可以在右侧表的底层存储支持低延迟键值请求时应用。ClickHouse提供了三种表引擎支持这一点:Join(基本上是一个预先计算的哈希表)、EmbeddedRocksDB和Dictionary。这里基于字典描述直接联接算法,但对于这三种引擎来说,机制是相同的。
直接联接算法的简要描述:
直接联接算法要求右表由字典支持,这样来自该表的待联接数据就已经以低延迟键值数据结构的形式存在于内存中。然后,①左表的所有数据通过2个流(因为 max_threads = 2 )并行地流式传输到查询引擎中,并且通过两个联接阶段并行地进行联接,通过对右表的底层字典进行查找来实现。
字典引擎特性
为了演示直接联接,首先需要创建一个字典。为此需要确定字典内容在内存中的存储方式。字典有flat和hash两种布局,这两种布局要求键属性的数据类型与UInt64类型兼容。平坦布局在所有布局选项中提供了最佳性能,并分配了一个内存数组,其中包含与键属性的最大值相等的条目数。例如,如果最大值为10万,则数组将有10万个条目的空间。这种数据布局允许以 0(1) 时间复杂度进行极快的键值查找,因为只需要进行简单的数组偏移查找。偏移量就是提供的键的值,数组中该偏移位置的条目包含相应的值。flat结构是用于示例数据中的actor表和role表,其中在源表的键列( id 和 actor_id )中具有密集和单调递增的值,从0开始。因此,每个分配的数组条目都将被使用。散列布局将字典内容存储在哈希表中,适用性更广泛。例如,对于非从0开始的非密集键属性值,在内存中不会为其分配不必要的空间。但是,访问速度较慢,通常比平坦布局慢2-5倍。
下面的示例语句创建一个具有平坦布局的字典,将角色表的内容完全加载到内存中,以进行低延迟的键值查找。这里将使用 actor_id 作为键属性。同时使用 max_array_size 设置指定初始和最大数组大小(默认值500,000太小)。通过将 LIFETIME 设置为0来禁用字典的内容更新:
CREATE DICTIONARY imdb_large.roles_dict_flat
(
created_at DateTime,
actor_id UInt32,
movie_id UInt32,
role String
)
PRIMARY KEY actor_id
SOURCE(CLICKHOUSE(db 'imdb_large' table 'roles'))
LIFETIME(0)
LAYOUT(FLAT(INITIAL_ARRAY_SIZE 1_000_000 MAX_ARRAY_SIZE 1_000_000));
再创建一个hash布局的字典:
CREATE DICTIONARY imdb_large.roles_dict_hashed
(
created_at DateTime,
actor_id UInt32,
movie_id UInt32,
role String
)
PRIMARY KEY actor_id
SOURCE(CLICKHOUSE(db 'imdb_large' table 'roles'))
LIFETIME(0)
LAYOUT(hashed());
通过system.dictionaries可以查看到字典细节:
SELECT
name,
status,
formatReadableSize(bytes_allocated) AS memory_allocated,
formatReadableTimeDelta(loading_duration) AS loading_duration
FROM system.dictionaries
WHERE startsWith(name, 'roles_dict_')
ORDER BY name;
┌─name──────────────┬─status─┬─memory_allocated─┬─loading_duration─┐
│ roles_dict_flat │ LOADED │ 1.52 GiB │ 12 seconds │
│ roles_dict_hashed │ LOADED │ 128.00 MiB │ 6 seconds │
└───────────────────┴────────┴──────────────────┴──────────────────┘
loading_duration 列显示了将源表内容加载到字典的内存布局中所花费的时间。 status 表示加载已完成。
与普通(MergeTree引擎系列)ClickHouse表不同,字典中的键属性是(自动)唯一的。例如,roles表中包含许多具有相同 actor_id 值的行,因为一般一个演员会有多个角色。当这些行以 actor_id 作为键属性加载到字典中时,具有相同键值的行会相互覆盖。实际上,字典中只包含特定 actor_id 的最后一行插入的数据。
使用字典将 actors 表中的行与 roles 表中的信息进行关联。这里使用dictGet函数来执行低延迟的键值查找。对于 actors 表中的每一行在字典中进行查找,使用 id 列的值,并请求以元组形式返回 created_at 、 movie_id 和 role 值:
WITH T1 AS (
SELECT
id,
first_name,
last_name,
gender,
dictGet('roles_dict_flat', ('created_at', 'movie_id', 'role'), id) as t
FROM actors)
SELECT
id,
first_name,
last_name,
gender,
id AS actor_id,
t.1 AS created_at,
t.2 AS movie_id,
t.3 AS role
FROM T1
LIMIT 1
FORMAT Vertical;
如果字典中不包含特定演员 id 值的键条目,则将返回配置的默认值作为所请求的属性值。另外,正如上面提到的,字典根据 actor_id 列对加载的数据进行去重,实际上只返回找到的第一个匹配项。因此,上述查询的行为等效于LEFT ANY JOIN。
在ClickHouse中,有一种更简单和更紧凑的方式来表达上述查询。上面的示例语句展示了创建具有特定名称的字典时,ClickHouse会自动通过字典表引擎创建一个同名的表,该表由字典支持。通过使用 direct 联接算法的联接查询,可以使用这个表来表达与上述查询相同的逻辑:
SELECT *
FROM actors AS a
JOIN roles_dict_flat AS r ON a.id = r.actor_id
LIMIT 1
SETTINGS join_algorithm='direct'
FORMAT Vertical;
在内部,ClickHouse使用对右侧表支持的字典进行高效的键值查找来实现联接。这类似于使用 dictGet 函数进行查找的上述查询。可以通过使用EXPLAIN PLAN子句审查联接查询的查询计划来验证这一点:
EXPLAIN PLAN
SELECT *
FROM actors AS a
JOIN roles_dict_flat AS r ON a.id = r.actor_id
SETTINGS join_algorithm='direct';
┌─explain───────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ FilledJoin (JOIN) │
│ Expression ((Convert JOIN columns + Before JOIN)) │
│ ReadFromMergeTree (imdb_large.actors) │
└───────────────────────────────────────────────────────┘
对比相同查询使用hash join的查询计划:
EXPLAIN PLAN
SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
SETTINGS join_algorithm='hash';
┌─explain──────────────────────────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ Join (JOIN FillRightFirst) │
│ Expression (Before JOIN) │
│ ReadFromMergeTree (imdb_large.actors) │
│ Expression ((Joined actions + (Rename joined columns + (Projection + Before ORDER BY)))) │
│ ReadFromMergeTree (imdb_large.roles) │
└──────────────────────────────────────────────────────────────────────────────────────────────
通过query_log对比下面四个查询语句的细节:
使用字典支持的右侧表的直接联接实际上是LEFT ANY JOIN。为了比较,示例中使用哈希算法的查询运行中使用了这种联接类型。
hash join:
SELECT *
FROM actors AS a
LEFT ANY JOIN roles AS r ON a.id = r.actor_id
SETTINGS join_algorithm='hash'
FORMAT Null;
parallel hash join:
SELECT *
FROM actors AS a
LEFT ANY JOIN roles AS r ON a.id = r.actor_id
SETTINGS join_algorithm='parallel_hash'
FORMAT Null;
direct join,右侧表的底层字典采用散列内存布局:
SELECT *
FROM actors AS a
JOIN roles_dict_hashed AS r ON a.id = r.actor_id
SETTINGS join_algorithm='direct'
FORMAT Null;
direct join,右侧表的底层字典采用平面内存布局:
SELECT *
FROM actors AS a
JOIN roles_dict_flat AS r ON a.id = r.actor_id
SETTINGS join_algorithm='direct'
FORMAT Null;
通过query_log对比查询细节:
SELECT
query,
query_duration_ms,
(query_duration_ms / 1000)::String || ' s' AS query_duration_s,
formatReadableSize(memory_usage) AS memory_usage,
formatReadableQuantity(read_rows) AS read_rows,
formatReadableSize(read_bytes) AS read_data
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND (hasAll(tables, ['imdb_large.actors', 'imdb_large.roles']) OR arrayExists(t -> startsWith(t, 'imdb_large.roles_dict_'), tables))
ORDER BY initial_query_start_time DESC
LIMIT 4
FORMAT Vertical;
Row 1:
──────
query: SELECT *
FROM actors AS a
JOIN roles_dict_flat AS r ON a.id = r.actor_id
SETTINGS join_algorithm='direct'
FORMAT Null;
query_duration_ms: 44
query_duration_s: 0.044 s
memory_usage: 83.66 MiB
read_rows: 1.00 million
read_data: 37.07 MiB
Row 2:
──────
query: SELECT *
FROM actors AS a
JOIN roles_dict_hashed AS r ON a.id = r.actor_id
SETTINGS join_algorithm='direct'
FORMAT Null;
query_duration_ms: 113
query_duration_s: 0.113 s
memory_usage: 102.90 MiB
read_rows: 1.00 million
read_data: 37.07 MiB
Row 3:
──────
query: SELECT *
FROM actors AS a
LEFT ANY JOIN roles AS r ON a.id = r.actor_id
SETTINGS join_algorithm='parallel_hash'
FORMAT Null;
query_duration_ms: 689
query_duration_s: 0.689 s
memory_usage: 4.78 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
Row 4:
──────
query: SELECT *
FROM actors AS a
LEFT ANY JOIN roles AS r ON a.id = r.actor_id
SETTINGS join_algorithm='hash'
FORMAT Null;
query_duration_ms: 1084
query_duration_s: 1.084 s
memory_usage: 4.44 GiB
read_rows: 101.00 million
read_data: 3.41 GiB
从第一行的直接联接运行结果来看,右侧表由平坦内存布局的字典支持,比第三行的parallel hash join运行结果快大约15倍,比第四行的hash join运行结果快约25倍,比第二行的右侧表由散列内存布局的字典支持的直接联接运行结果快约2.5倍。
主要原因是右侧表的数据已经存在于内存中。相反,哈希和并行哈希算法需要首先将数据加载到内存中。此外,正如前面提到的,具有平坦布局的字典的内存中数组允许以 0(1) 时间复杂度进行极快的键值查找,因为只需要进行简单的数组偏移查找。
query_log 系统表的 memory_usage 列不包括字典本身分配的内存。因此对于内存消耗比较,需要将字典系统表的 bytes_allocated 列的相应值添加进来。即使将字典的 bytes_allocated 添加到直接联接运行的 memory_usage 中,峰值内存消耗与hash join和parallel hash join相比仍然显著较低。
选择更优的join算法
迄今为止,ClickHouse 已开发出以下 6 种联接算法:Direct join, Hash join, Parallel hash join, Grace hash join, Full sorting merge join, Partial merge join
这些算法决定了联接查询的规划和执行方式。具体的join算法是由join_algorithm参数指定的,默认情况下即join_algorithm使用默认值default时,ClickHouse根据使用的联接类型、严格性和被联接表的引擎使用direct join或hash join。此外,ClickHouse可以根据资源的可用性和使用情况,在运行时自适应地选择和动态更改联接算法,当将join_algorithm设置为 auto 时,对应的代码实现为JoinSwitcher类,ClickHouse首先尝试使用hash join,如果内存超过限制,算法会即时切换到partial merge join。可以通过trace log观察到选择了哪种算法。ClickHouse还允许用户自行指定所需的联接算法。以下图表根据相对内存消耗和执行时间对ClickHouse联接算法进行了概述:
direct join是ClickHouse最快的联接算法,在右侧表的底层存储支持低延迟键值请求且LEFT ANY JOIN语义适用时使用。特别是对于较大的右侧表,直接联接比所有其他ClickHouse联接算法在执行时间上有显著的改进。
ClickHouse的另外三种联接算法是基于内存中的哈希表:
- hash join是快速但受内存限制的算法,是支持所有联接类型和严格性设置的最通用的联接算法。该算法可能受到内存使用的限制。此外,从联接的右侧表创建内存中的哈希表是单线程的,如果右侧表非常大,可能成为联接执行时间的瓶颈。
- parallel hash join可以在右侧表很大的情况下更快地执行,通过同时构建多个哈希表,但它需要更多的内存。
- grace hash join是一种非受内存限制的版本,它会将数据临时溢出到磁盘,而无需对数据进行排序。这克服了其他非受内存限制的ClickHouse联接算法的一些性能挑战,这些算法也会将数据临时溢出到磁盘,但需要对数据进行预排序。
ClickHouse提供了基于外部排序的另外两种非受内存限制的联接算法:
- full sorting merge join基于内存或外部排序,并且可以利用联接表的物理行顺序并跳过排序阶段。在这种情况下,联接性能可以与上面图表中的某些哈希联接算法竞争,同时通常需要较少的主内存。
- partial merge join针对联接大表时最小化内存使用进行了优化,并始终通过外部排序完全对右侧表进行排序。左侧表也始终在内存中以块为单位进行排序。如果左侧表的物理行顺序与联接键排序顺序匹配,则联接匹配过程运行得更高效。
查询性能
对于注重性能的使用场景可以使用下面的决策树:
① 如果右侧表的数据可以预先加载到内存中的低延迟键值数据结构(例如字典)中,并且联接键与底层键值存储的键属性匹配,并且LEFT ANY JOIN语义适用,则可以使用direct join,它提供了最快的方法。
② 如果表的物理行顺序与联接键排序顺序匹配,则情况就不同了。在这种情况下,full sorting merge join可以跳过排序阶段,从而显著减少内存使用量,并且根据数据大小和联接键值分布,比其他哈希联接算法的执行时间更快。然而,如果右侧表可以加载到内存中,parallel hash join或者hash join可能更快。这取决于数据大小、数据类型和联接键列的值分布。
③ 如果右侧表不能加载到内存,则情况又不同了。ClickHouse提供了三种非受内存限制的联接算法。这三种算法都会将数据临时溢出到磁盘。full sorting merge join和partial merge join需要对数据进行预排序,而grace hash join则是从数据中构建哈希表。根据数据的大小、数据类型和联接键列的值分布。
partial merge join在联接大表时优化了内存使用,但联接速度较慢。特别是当左侧表的物理行顺序与联接键排序顺序不匹配时,链接速度更差。
grace hash join是三种非受内存限制的联接算法中最灵活的,通过其grace_hash_join_initial_buckets设置可以很好地控制内存使用量和联接速度。在测试运行中,当将grace hash join的内存使用量配置为与full sorting merge join的内存使用量大致对齐时,full sorting merge join通常更快。
哪种非受内存限制的算法最快最终还是取决于数据的大小、数据类型和联接键列的值分布。最好根据真实数据的实际数据量运行一些基准测试,以确定哪种算法最快。
内存使用
如果想将联接优化为最低的内存使用量而不是最快的执行时间,则可以使用以下决策树:
① 如果使用的表的物理行顺序与联接键排序顺序匹配,则full sorting merge join的内存使用量将会很低。此外,由于跳过了排序阶段,它还具有良好的联接速度。
② 通过配置大量的桶来调优grace hash join可以实现非常低的内存使用量,但联接速度会受到影响。partial merge join也可以使用较少的内存。启用外部排序的full sorting merge join通常比partial merge join使用更多内存(假设行顺序与键排序顺序不匹配),但联接执行时间显著更好。
Create or Drop Database
本文介绍创建和删除数据库的基本语法。
CREATE DATABASE基本语法如下:
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster];
- 如果CREATE 语句中存在IF NOT EXISTS 关键字,则当数据库已经存在时,该语句不会创建数据库,且不会返回任何错误。
- ON CLUSTER 关键字用于指定集群名称。
示例:
CREATE DATABASE db_001 ON CLUSTER default;
DROP DATABASE基本语法如下:
DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster];
- 如果DROP 语句中存在IF EXISTS 关键字,则当数据库不存在时,该语句不会返回任何错误。
- ON CLUSTER 关键字用于指定集群名称。
DROP DATABASE db_001 ON CLUSTER default;
Create or Drop Table
本文介绍创建和删除表和视图。
创建本地表
建表语句基本语法:
CREATE TABLE [IF NOT EXISTS] [db.]table_name ON CLUSTER cluster
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = engine_name()
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...];
选项描述:
- db:指定数据库名称,如果当前语句没有包含‘db’,则默认使用当前选择的数据库为‘db’。
- cluster:指定集群名称,目前固定为default。ON CLUSTER 将在每一个节点上都创建一个本地表。
- type:该列数据类型,例如 UInt32。
- DEFAULT:该列缺省值。如果INSERT中不包含指定的列,那么将通过表达式计算它的默认值并填充它。
- MATERIALIZED:物化列表达式,表示该列不能被INSERT,是被计算出来的; 在INSERT语句中,不需要写入该列;在SELECT *查询语句结果集不包含该列。
- ALIAS :别名列。这样的列不会存储在表中。 它的值不能够通过INSERT写入,同时使用SELECT查询星号时,这些列也不会被用来替换星号。 但是它们可以用于SELECT中,在这种情况下,在查询分析中别名将被替换。
- 物化列与别名列的区别: 物化列是会保存数据,查询的时候不需要计算,而别名列不会保存数据,查询的时候需要计算,查询时候返回表达式的计算结果
以下选项与表引擎相关,只有MergeTree系列表引擎支持:
- PARTITION BY:指定分区键。通常按照日期分区,也可以用其他字段或字段表达式。
- ORDER BY:指定 排序键。可以是一组列的元组或任意的表达式。
- PRIMARY KEY: 指定主键,默认情况下主键跟排序键相同。因此,大部分情况下不需要再专门指定一个 PRIMARY KEY 子句。
- SAMPLE BY :抽样表达式,如果要用抽样表达式,主键中必须包含这个表达式。
- SETTINGS:影响 性能的额外参数。
- GRANULARITY :索引粒度参数。
建表示例:
CREATE TABLE dbname.ontime_local ON CLUSTER default
(
Year UInt16,
Quarter UInt8,
Month UInt8,
DayofMonth UInt8,
DayOfWeek UInt8,
FlightDate Date,
Carrier String,
TailNum String,
FlightNum String,
SecurityDelay Int32,
TotalAddGTime String
)ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/dbname.ontime_local/{shard}',
'{replica}')
PARTITION BY toYYYYMM(FlightDate)
ORDER BY (Carrier,FlightDate)
SETTINGS index_granularity= 8192 ;
说明
高可用集群(双副本),要用ReplicatedMergeTree等Replicated系列引擎,否则副本之间不进行数据复制,导致数据查询结果不一致。
{shard},{replica} 参数不需要赋值。
创建分布式表
基于本地表创建分布式表,基本语法:
CREATE TABLE [db.]table_name ON CLUSTER default
AS db.local_table_name ENGINE = Distributed(<cluster>, <database>, <shard table> [, sharding_key])
参数说明:
- db:数据库名。
- local_table_name:对应的已经创建的本地表表名。
- shard table:同上,对应的已经创建的本地表表名。
- sharding_key:分片表达式。可以是一个字段,例如user_id(integer类型),通过对余数值进行取余分片;也可以是一个表达式,例如rand(),通过rand()函数返回值/shards总权重分片;为了分片更均匀,可以加上hash函数,如intHash64(user_id)。
示例:创建一个分布式表
--建立分布式表
CREATE TABLE db_name.ontime ON CLUSTER default
AS db_name.ontime_local
ENGINE = Distributed(default, db_name, ontime_local, rand());
复制另一个表结构
创建与另一个表相同的表,语法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name ON CLUSTER default AS [db.]name2 [ENGINE = engine];
表引擎可以通过ENGINE=engine字句指定,默认与被复制的表“name2”相同。
示例:
create table db_name2.table2 ON CLUSTER default as db_name1.table1;
通过SELECT语句创建
使用指定的引擎创建一个与SELECT子句的结果具有相同结构的表,并使用SELECT子句的结果填充它。语法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name ON CLUSTER default ENGINE = engine AS SELECT ...
其中ENGINE是需要明确指定的。
示例:
create table t2 ON CLUSTER default ENGINE =MergeTree() as select * from db1.t1 where id<100;
创建视图
创建视图语法如下:
CREATE [MATERIALIZED] VIEW [IF NOT EXISTS] [db.]table_name [TO[db.]name] ON CLUSTER default [ENGINE = engine] [POPULATE] AS SELECT ...
有MATERIALIZED关键字表示是物化视图,否则为普通视图。
假如用以下语句创建了一个视图。
CREATE VIEW view_1 ON CLUSTER default AS SELECT a,b,c,d FROM db1.t1;
那么下列两个语句完全等价。
SELECT a, b, c FROM view_1 ;
SELECT a, b, c FROM (SELECT a,b,c FROM db1.t1);
物化视图存储的数据是由相应的SELECT查询转换得来的。
在创建物化视图时,您还必须指定表的引擎 - 将会使用这个表引擎存储数据。
目前物化视图的工作原理:当将数据写入到物化视图中SELECT子句所指定的表时,插入的数据会通过SELECT子句查询进行转换并将最终结果插入到视图中。
如果创建物化视图时指定了POPULATE子句,则在创建时将该表的数据插入到物化视图中。就像使用CREATE TABLE … AS SELECT …一样。否则,物化视图只会包含在物化视图创建后的新写入的数据。
我们不推荐使用POPULATE,因为在视图创建期间写入的数据将不会写入其中。当一个SELECT子句包含DISTINCT, GROUP BY, ORDER BY, LIMIT时,请注意,这些仅会在插入数据时在每个单独的数据块上执行。例如,如果你在其中包含了GROUP BY,则只会在查询期间进行聚合,但聚合范围仅限于单个批的写入数据。数据不会进一步被聚合。但是当你使用一些其他数据聚合引擎时这是例外的,如:SummingMergeTree。
目前对物化视图执行ALTER是不支持的,因此这可能是不方便的。如果物化视图是使用的TO [db.]name的方式进行构建的,你可以使用DETACH语句现将视图剥离,然后使用ALTER运行在目标表上,然后使用ATTACH将之前剥离的表重新加载进来。视图看起来和普通的表相同。例如,你可以通过SHOW TABLES查看到它们。
没有单独的删除视图的语法。如果要删除视图,请使用DROP TABLE。
删除表
删除表基本语法:
DROP TABLE [IF EXISTS] [db.]name ON CLUSTER cluster;
示例:
DROP TABLE dbname.ontime_local ON CLUSTER default;
删除视图
删除视图基本语法:
DROP VIEW [IF EXISTS] [db.]name ON CLUSTER cluster;
示例:
DROP VIEW view_1 ON CLUSTER default;
Insert into
本文介绍如何用INSERT INTO语句插入数据到表中。
基本语法
INSERT INTO 语句基本格式如下:
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
对于存在于表结构中但不存在于插入列表中的列,它们将会按照如下方式填充数据:
如果存在DEFAULT表达式,根据DEFAULT表达式计算被填充的值。
如果没有定义DEFAULT表达式,则填充零或空字符串。
使用SELECT的结果写入
语法结构如下:
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
写入的列与SELECT的列的对应关系是使用位置来进行对应的,它们在SELECT表达式与INSERT中的名称可以是不同的。需要对它们进行对应的类型转换。
除了VALUES格式之外,其他格式中的数据都不允许出现诸如now(),1 + 2等表达式。VALUES格式允许您有限度的使用这些表达式,但是不建议您这么做,因为执行这些表达式很低效。
影响性能的注意事项
在执行INSERT时将会对写入的数据进行一些处理,比如按照主键排序、按照月份对数据进行分区等。如果在您的写入数据中包含多个月份的混合数据时,将会显著地降低INSERT的性能。为了避免这种情况,通常
采用以下方式:
- 数据总是以尽量大的batch进行写入,如每次写入100,000行。
- 数据在写入ClickHouse前预先对数据进行分组。
在以下的情况下,性能不会下降:
- 数据总是被实时地写入。
- 写入的数据已经按照时间排序。
Select
本文描述如何使用SELECT语句查询数据。
基本语法
SELECT语句基本格式如下:
SELECT [DISTINCT] expr_list
[FROM [db.]table | (subquery) | table_function] [FINAL]
[SAMPLE sample_coeff]
[ARRAY JOIN ...]
[GLOBAL] ANY|ALL INNER|LEFT JOIN (subquery)|table USING columns_list
[PREWHERE expr]
[WHERE expr]
[GROUP BY expr_list] [WITH TOTALS]
[HAVING expr]
[ORDER BY expr_list]
[LIMIT [n, ]m]
[UNION ALL ...]
[INTO OUTFILE filename]
[FORMAT format]
[LIMIT n BY columns]
所有的子句都是可选的,除了SELECT之后的表达式列表(expr_list)。 下面将选择部分子句进行说明。ClickHouse官网中文文档有更详细说明,请参考查询语法。
简单查询语句示例:
SELECT
OriginCityName,
DestCityName,
count(*) AS flights,
bar(flights, 0, 20000, 40)
FROM ontime_distributed
WHERE Year = 1988
GROUP BY OriginCityName, DestCityName
ORDER BY flights DESC
LIMIT 20;
SAMPLE 子句
通过SAMPLE子句用户可以进行近似查询处理,近似查询处理仅能工作在MergeTree*类型的表中,并且在创建表时需要您指定采样表达式。
SAMPLE子句可以使用SAMPLE k来表示,其中k可以是0到1的小数值,或者是一个足够大的正整数值。
当k为0到1的小数时,查询将使用’k’作为百分比选取数据。例如,SAMPLE 0.1查询只会检索数据总量的10%。当k为一个足够大的正整数时,查询将使用’k’作为最大样本数。例如,SAMPLE 10000000查询只会检索最多10,000,000行数据。
示例:
SELECT
Title,
count() * 10 AS PageViews
FROM hits_distributed
SAMPLE 0.1
WHERE
CounterID = 34
AND toDate(EventDate) >= toDate('2013-01-29')
AND toDate(EventDate) <= toDate('2013-02-04')
AND NOT DontCountHits
AND NOT Refresh
AND Title != ''
GROUP BY Title
ORDER BY PageViews DESC LIMIT 1000;
在这个例子中,查询将检索数据总量的0.1(10%)的数据。值得注意的是,查询不会自动校正聚合函数最终的结果,所以为了得到更加精确的结果,需要将count()的结果手动乘以10。
当使用像SAMPLE 10000000这样的方式进行近似查询时,由于没有了任何关于将会处理了哪些数据或聚合函数应该被乘以几的信息,所以这种方式不适合在这种场景下使用。
使用相同的采样率得到的结果总是一致的:如果我们能够看到所有可能存在在表中的数据,那么相同的采样率总是能够得到相同的结果(在建表时使用相同的采样表达式),换句话说,系统在不同的时间,不同的服务器,不同表上总以相同的方式对数据进行采样。
例如,我们可以使用采样的方式获取到与不进行采样相同的用户ID的列表。这将表明,您可以在IN子查询中使用采样,或者使用采样的结果与其他查询进行关联。
ARRAY JOIN 子句
ARRAY JOIN子句可以帮助查询进行与数组和nested数据类型的连接。它有点类似arrayJoin函数,但它的功能更广泛。
ARRAY JOIN本质上等同于INNERT JOIN数组。例如:
CREATE TABLE arrays_test
(
s String,
arr Array(UInt8)
) ENGINE = Memory;
INSERT INTO arrays_test
VALUES ('Hello', [1,2]), ('World', [3,4,5]), ('Goodbye', []);
查询表中数据结果:
┌─s───────────┬─arr─────┐
│ Hello │ [1,2] │
│ World │ [3,4,5] │
│ Goodbye │ [] │
└─────────────┴─────────┘
使用ARRAY JOIN进行查询:
SELECT s, arr
FROM arrays_test
ARRAY JOIN arr;
最终结果数据:
┌─s─────┬─arr─┐
│ Hello │ 1 │
│ Hello │ 2 │
│ World │ 3 │
│ World │ 4 │
│ World │ 5 │
└───────┴─────┘
JOIN 语句Null的处理
JOIN语句中的Null处理,请参考join_use_nulls、Nullable、NULL。
WHERE 子句
如果存在WHERE子句,则在该子句中必须包含一个UInt8类型的表达式。这个表达式通常是一个带有比较和逻辑的表达式。这个表达式将会在所有数据转换前用来过滤数据。
如果在支持索引的数据库表引擎中,这个表达式将被评估是否使用索引。
PREWHERE 子句
这个子句与WHERE子句的意思相同。主要的不同之处在于表数据的读取。当使用PREWHERE时,首先只读取PREWHERE表达式中需要的列。然后在根据PREWHERE执行的结果读取其他需要的列。
如果在过滤条件中有少量不适合索引过滤的列,但是它们又可以提供很强的过滤能力。这时使用PREWHERE是有意义的,因为它将帮助减少数据的读取。
例如:在一个需要提取大量列的查询中为少部分列编写PREWHERE是很有作用的。
TIP
-
PREWHERE仅支持*MergeTree系列引擎。在一个查询中可以同时指定PREWHERE和WHERE,在这种情况下,PREWHERE优先于WHERE执行。
-
PREWHERE不适合用于已经存在于索引中的列,因为当列已经存在于索引中的情况下,只有满足索引的数据块才会被读取。如果将’optimize_move_to_prewhere’设置为1,并且在查询中不包含PREWHERE,则系统将自动的把适合PREWHERE表达式的部分从WHERE中抽离到PREWHERE中。
WITH TOTALS 修饰符
如果您指定了WITH TOTALS修饰符,将会在结果中得到一个被额外计算出的行。在这一行中将包含所有key的默认值(零或者空值),以及所有聚合函数对所有被选择数据行的聚合结果。
该行仅在JSON*, TabSeparated*, Pretty*输出格式中与其他行分开输出。
说明:提供HTTP和TCP两个连接端口,请按需选择。
- 根据操作系统类型,ClickHouse版本下载对应客户端。
下载方式:在生产环境下载如下4个rpm包,然后执行"rpm -ivh *.rpm"命令安装
rpm包
wget --content-disposition https://packagecloud.io/Altinity/clickhouse-altinity-stable/packages/el/7/clickhouse-server-20.8.12.2-1.el7.x86_64.rpm/download.rpm
wget --content-disposition https://packagecloud.io/Altinity/clickhouse-altinity-stable/packages/el/7/clickhouse-server-common-20.8.12.2-1.el7.x86_64.rpm/download.rpm
wget --content-disposition https://packagecloud.io/Altinity/clickhouse-altinity-stable/packages/el/7/clickhouse-common-static-20.8.12.2-1.el7.x86_64.rpm/download.rpm
wget --content-disposition https://packagecloud.io/Altinity/clickhouse-altinity-stable/packages/el/7/clickhouse-client-20.8.12.2-1.el7.x86_64.rpm/download.rpm
- 使用下列命令连接集群:
./clickhouse-client --host=<host> --port=<port> --user=<user> --password=<password>
创建表
分成单副本集群和双副本集群创建ClickHouse数据表语句。
单副本集群创建ClickHouse数据表
- 创建本地表:
CREATE TABLE default.sls_test_single_local on cluster default(
`v1` Int8,
`v2` Int16,
`v3` Int32,
`v4` Int64,
`v5` UInt8,
`v6` UInt16,
`v7` UInt32,
`v8` UInt64,
`v9` Decimal(10, 2),
`v10` Float32,
`v11` Float64,
`v12` String,
`v13` FixedString(10),
`v14` UUID, `v15` Date,
`v16` DateTime,
`v17` Enum8('hello' = 1, 'world' = 2),
`v18` Enum16('hello' = 1, 'world' = 2),
`v19` IPv4,
`v20` IPv6)
ENGINE = MergeTree() PARTITION BY toYYYYMMDD(v16) ORDER BY v4 SETTINGS index_granularity = 8192;
- 创建分布式表,方便数据的写入和查询。
CREATE TABLE sls_test_single_d ON CLUSTER default as sls_test_single_local ENGINE = Distributed(default, default, sls_test_single_local, rand());
双副本集群创建ClickHouse数据表
- 创建本地表
CREATE TABLE default.sls_test_local ON CLUSTER default (
`v1` Int8,
`v2` Int16,
`v3` Int32,
`v4` Int64,
`v5` UInt8,
`v6` UInt16,
`v7` UInt32,
`v8` UInt64,
`v9` Decimal(10, 2),
`v10` Float32,
`v11` Float64,
`v12` String,
`v13` FixedString(10),
`v14` UUID, `v15` Date,
`v16` DateTime,
`v17` Enum8('hello' = 1, 'world' = 2),
`v18` Enum16('hello' = 1, 'world' = 2),
`v19` IPv4,
`v20` IPv6)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/sls_test_local/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(v16) ORDER BY v4 SETTINGS index_granularity = 8192
- 创建分布式表,方便数据的写入和查询。
CREATE TABLE sls_test_d ON CLUSTER default as sls_test_local ENGINE = Distributed(default, default, sls_test_local, rand());
关于建表的细节说明
- 分区键需要采用低基数的字段,一般采用日期,月份来分区。类型最好为Date/DateTime类型,例如:
CREATE TABLE test.test ON CLUSTER default (
`id` Int8,
`dayno` Date)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/sls_test_local/{shard}', '{replica}')
PARTITION BY dayno ORDER BY v4 SETTINGS index_granularity = 8192
没有Date/DateTime的分区 可以用其他类型的低基数作为分区。例如
CREATE TABLE test.test ON CLUSTER default (
`id` Int8,
`type` int)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/sls_test_local/{shard}', '{replica}')
PARTITION BY type ORDER BY v4 SETTINGS index_granularity = 8192
如果没有低基数的分区 可以采用取模的方式,控制分区键在10个左右,例如:
CREATE TABLE test.test ON CLUSTER default (
`id` Int8,
`imei` String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/sls_test_local/{shard}', '{replica}')
PARTITION BY (id % 10) ORDER BY v4 SETTINGS index_granularity = 8192
或者直接不分区,例如:
CREATE TABLE test.test ON CLUSTER default (
`id` Int8,
`imei` String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/sls_test_local/{shard}', '{replica}')
ORDER BY v4 SETTINGS index_granularity = 8192
- 注意:切不可使用tuple()或者高基数的字段直接作为分区键。除此之外,分区键字段个数不要过多,建议不超过2个,否则会给系统和ZK带来很大的压力。
- 分布式的shard key,一般建议使用高基数的字段做hash,这样可以让数据分布更均匀。 例如
CREATE TABLE test.test_distribution ON CLUSTER default (
`id` Int8,
`imei` String)
ENGINE = Distribute('default', 'test', 'test_local', CityHash(imei));
或者采用rand(),防止数据分布不均造成clickhouse单机热点
CREATE TABLE test.test_distribution ON CLUSTER default ( `id` Int8, `imei` String) ENGINE = Distribute('default', 'test', 'test_local', rand());
标签:rows,join,简介,查询,内存,query,联接,clickhouse From: https://www.cnblogs.com/quyf/p/18244089