首页 > 其他分享 >Doris(五) -- 数据的导入导出

Doris(五) -- 数据的导入导出

时间:2023-05-30 20:11:07浏览次数:43  
标签:canal hdfs name -- 导入 id Doris

数据导入

使用 Insert 方式同步数据

用户可以通过 MySQL 协议,使用 INSERT 语句进行数据导入
INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。 INSERT 语句支持以下两种语法:

INSERT INTO table SELECT ...
INSERT INTO table VALUES(...)

对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。

因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频次的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。

该方式仅用于线下简单测试或低频少量的操作。
或者可以使用以下方式进行批量的插入操作:

INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);

Stream Load

用于将本地文件导入到doris中。Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。
该方式中涉及 HOST:PORT 都是对应的HTTP 协议端口。
• BE 的 HTTP 协议端口,默认为 8040。
• FE 的 HTTP 协议端口,默认为 8030。
但须保证客户端所在机器网络能够联通FE, BE 所在机器。

-- 创建表
drop table if exists load_local_file_test;
CREATE TABLE IF NOT EXISTS load_local_file_test
(
    id INT,
    name VARCHAR(50),
    age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
# 创建文件
1,zss,28
2,lss,28
3,ww,88

# 导入数据
## 语法示例
 curl \
 -u user:passwd \  # 账号密码
 -H "label:load_local_file_test" \  # 本次任务的唯一标识
 -T 文件地址 \
 http://主机名:端口号/api/库名/表名/_stream_load
# user:passwd 为在 Doris 中创建的用户。初始用户为 admin / root,密码初始状态下为空。
# host:port 为 BE 的 HTTP 协议端口,默认是 8040,可以在 Doris 集群 WEB UI页面查看。
# label: 可以在 Header 中指定 Label 唯一标识这个导入任务。


curl \
 -u root:123 \
 -H "label:load_local_file" \
 -H "column_separator:," \
 -T /root/data/loadfile.txt \
http://doitedu01:8040/api/test/load_local_file_test/_stream_load

curl的一些可配置的参数

  1. label: 导入任务的标签,相同标签的数据无法多次导入。(标签默认保留30分钟)
  2. column_separator:用于指定导入文件中的列分隔符,默认为\t。
  3. line_delimiter:用于指定导入文件中的换行符,默认为\n。
  4. columns:用于指定文件中的列和table中列的对应关系,默认一一对应
  5. where: 用来过滤导入文件中的数据
  6. max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。数据不规范不包括通过 where 条件过滤掉的行。
  7. partitions: 用于指定这次导入所设计的partition。如果用户能够确定数据对应的partition,推荐指定该项。不满足这些分区的数据将被过滤掉。
  8. timeout: 指定导入的超时时间。单位秒。默认是 600 秒。可设置范围为 1 秒 ~ 259200 秒。
  9. timezone: 指定本次导入所使用的时区。默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。
  10. exec_mem_limit: 导入内存限制。默认为 2GB。单位为字节。
  11. format: 指定导入数据格式,默认是csv,支持json格式。
  12. read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。
  13. merge_type: 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 示例:-H "merge_type: MERGE" -H "delete: flag=1"
  14. delete: 仅在 MERGE下有意义, 表示数据的删除条件 function_column.sequence_col: 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。

建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。

导入json数据

# 准备数据
{"id":1,"name":"liuyan","age":18}
{"id":2,"name":"tangyan","age":18}
{"id":3,"name":"jinlian","age":18}
{"id":4,"name":"dalang","age":18}
{"id":5,"name":"qingqing","age":18}

curl \
 -u root: \
 -H "label:load_local_file_json_20221126" \
 -H "columns:id,name,age" \
 -H "max_filter_ratio:0.1" \
 -H "timeout:1000" \
 -H "exec_mem_limit:1G" \
 -H "where:id>1" \
 -H "format:json" \
 -H "read_json_by_line:true" \
 -H "merge_type:delete" \
 -T /root/data/json.txt \
http://doitedu01:8040/api/test/load_local_file_test/_stream_load
  -H "merge_type:append" \
  # 会把id = 3 的这条数据删除
  -H "merge_type:MERGE" \
  -H "delete:id=3"

外部存储数据导入(hdfs)

适用场景

• 源数据在 Broker 可以访问的存储系统中,如 HDFS。
• 数据量在几十到百 GB 级别。

基本原理

  1. 创建提交导入的任务
  2. FE生成执行计划并将执行计划分发到多个BE节点上(每个BE节点都导入一部分数据)
  3. BE收到执行计划后开始执行,从broker上拉取数据到自己的节点上
  4. 所有BE都完成后,FE决定是否导入成功,返回结果给客户端
-- 新建一张表
drop table if exists load_hdfs_file_test1;
CREATE TABLE IF NOT EXISTS load_hdfs_file_test1
(
    id INT,
    name VARCHAR(50),
    age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
将本地的数据导入到hdfs上面
hdfs dfs -put ./loadfile.txt  hdfs://linux01:8020/
hdfs dfs -ls  hdfs://linux01:8020/
-- 导入语法
LOAD LABEL test.label_202204(
[MERGE|APPEND|DELETE]  -- 不写就是append
DATA INFILE
(
"file_path1"[, file_path2, ...]  -- 描述数据的路径   这边可以写多个 ,以逗号分割
)
[NEGATIVE]               -- 负增长
INTO TABLE `table_name`  -- 导入的表名字
[PARTITION (p1, p2, ...)] -- 导入到哪些分区,不符合这些分区的就会被过滤掉
[COLUMNS TERMINATED BY "column_separator"]  -- 指定分隔符
[FORMAT AS "file_type"] -- 指定存储的文件类型
[(column_list)] -- 指定导入哪些列 
[COLUMNS FROM PATH AS (c1, c2, ...)]  -- 从路劲中抽取的部分列
[SET (column_mapping)] -- 对于列可以做一些映射,写一些函数
-- 这个参数要写在要写在set的后面
[PRECEDING FILTER predicate]  -- 在mapping前做过滤做一些过滤
[WHERE predicate]  -- 在mapping后做一些过滤  比如id>10 
[DELETE ON expr] --根据字段去做一些抵消消除的策略  需要配合MERGE
[ORDER BY source_sequence] -- 导入数据的时候保证数据顺序
[PROPERTIES ("key1"="value1", ...)]  -- 一些配置参数
-- 将hdfs上的数据load到表中
LOAD LABEL test.label_20221125
(
DATA INFILE("hdfs://linux01:8020/test.txt")
INTO TABLE `load_hdfs_file_test`
COLUMNS TERMINATED BY ","
(id,name,age)
)
with HDFS (
"fs.defaultFS"="hdfs://linux01:8020",
"hadoop.username"="root"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);


-- 这是一个异步的操作,所以需要去查看下执行的状态
show load order by createtime desc limit 1\G;

从 HDFS 导入数据,使用通配符匹配两批两批文件。分别导入到两个表中

LOAD LABEL example_db.label2
(
    DATA INFILE("hdfs://linux01:8020/input/file-10*")
    INTO TABLE `my_table1`
    PARTITION (p1)
    COLUMNS TERMINATED BY ","
    FORMAT AS "parquet"  
    (id, tmp_salary, tmp_score) 
    SET (
        salary= tmp_salary + 1000,
        score = tmp_score + 10
    ),
    DATA INFILE("hdfs://linux01:8020/input/file-20*")
    INTO TABLE `my_table2`
    COLUMNS TERMINATED BY ","
    (k1, k2, k3)
)
with HDFS (
"fs.defaultFS"="hdfs://linux01:8020",
"hadoop.username"="root"
)


-- 导入数据,并提取文件路径中的分区字段
LOAD LABEL example_db.label10
(
    DATA INFILE("hdfs://linux01:8020/user/hive/warehouse/table_name/dt=20221125/*")
    INTO TABLE `my_table`
    FORMAT AS "csv"
    (k1, k2, k3)
    COLUMNS FROM PATH AS (dt)
)
WITH BROKER hdfs
(
    "username"="root",
    "password"="123"
);

-- 对待导入数据进行过滤。
LOAD LABEL example_db.label6
(
    DATA INFILE("hdfs://linux01:8020/input/file")
    INTO TABLE `my_table`
    (k1, k2, k3)
    SET (
        k2 = k2 + 1
    )
        PRECEDING FILTER k1 = 1  ==》前置过滤
    WHERE k1 > k2   ==》 后置过滤
)
WITH BROKER hdfs
(
    "username"="root",
    "password"="123"
);

-- 只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。

取消导入任务

当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。
取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 HELP CANCEL LOAD 查看。

CANCEL LOAD [FROM db_name] WHERE LABEL="load_label"; 

通过外部表同步数据

Doris 可以创建外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT 的方式导入外部表的数据。

Doris 外部表目前支持的数据源包括:MySQL,Oracle,Hive,PostgreSQL,SQLServer,Iceberg,ElasticSearch

-- 整体语法
CREATE [EXTERNAL] TABLE table_name ( 
 col_name col_type [NULL | NOT NULL] [COMMENT "comment"] 
) ENGINE=HIVE
[COMMENT "comment"]
PROPERTIES (
-- 我要映射的hive表在哪个库里面
-- 映射的表名是哪一张
-- hive的元数据服务地址
 'property_name'='property_value',
 ...
);

-- 参数说明:
-- 1.外表列 
-- 	列名要与 Hive 表一一对应
-- 	列的顺序需要与 Hive 表一致
-- 	必须包含 Hive 表中的全部列
-- 	Hive 表分区列无需指定,与普通列一样定义即可。 
-- 2.ENGINE 需要指定为 HIVE 
-- 3.PROPERTIES 属性: 
-- 	hive.metastore.uris:Hive Metastore 服务地址 
-- 	database:挂载 Hive 对应的数据库名 
-- 	table:挂载 Hive 对应的表名 

完成在 Doris 中建立 Hive 外表后,除了无法使用 Doris 中的数据模型(rollup、预聚合、物化视图等)外,与普通的 Doris OLAP 表并无区别

-- 在Hive 中创建一个测试用表:
CREATE TABLE `user_info` ( 
 `id` int, 
 `name` string, 
 `age` int
) stored as orc;

insert into user_info values (1,'zss',18);
insert into user_info values (2,'lss',20);
insert into user_info values (3,'ww',25);

-- Doris 中创建外部表
CREATE EXTERNAL TABLE `hive_user_info` (
 `id` int, 
 `name` varchar(10), 
 `age` int 
) ENGINE=HIVE 
PROPERTIES ( 
'hive.metastore.uris' = 'thrift://linux01:9083', 
'database' = 'db1', 
'table' = 'user_info' 
);

外部表创建好后,就可以直接在doris中对这个外部表进行查询了
直接查询外部表,无法利用到doris自身的各种查询优化机制!

select * from hive_user_info;

-- 将数据从外部表导入内部表
-- 数据从外部表导入内部表后,就可以利用doris自身的查询优势了!
-- 假设要导入的目标内部表为: doris_user_info  (需要提前创建)

CREATE TABLE IF NOT EXISTS doris_user_info
(
    id INT,
    name VARCHAR(50),
    age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;

-- 就是用sql查询,从外部表中select出数据后,insert到内部表即可
insert into doris_user_info
select
 *
from hive_user_info;

注意:
Hive 表 Schema 变更不会自动同步,需要在 Doris 中重建 Hive 外表。
当前 Hive 的存储格式仅支持 Text,Parquet 和 ORC 类型

Binlog Load

Binlog Load提供了一种使Doris增量同步用户在Mysql数据库中对数据更新操作的CDC(Change Data Capture)功能。

基本原理
当前版本设计中,Binlog Load需要依赖canal作为中间媒介,让canal伪造成一个从节点去获取Mysql主节点上的Binlog并解析,再由Doris去获取Canal上解析好的数据,主要涉及Mysql端、Canal端以及Doris端

  1. FE会为每个数据同步作业启动一个canal client,来向canal server端订阅并获取数据。
  2. client中的receiver将负责通过Get命令接收数据,每获取到一个数据batch,都会由consumer根据对应表分发到不同的channel,每个channel都会为此数据batch产生一个发送数据的子任务Task。
  3. 在FE上,一个Task是channel向BE发送数据的子任务,里面包含分发到当前channel的同一个batch的数据。
  4. channel控制着单个表事务的开始、提交、终止。一个事务周期内,一般会从consumer获取到多个batch的数据,因此会产生多个向BE发送数据的子任务Task,在提交事务成功前,这些Task不会实际生效。
  5. 满足一定条件时(比如超过一定时间、达到提交最大数据大小),consumer将会阻塞并通知各个channel提交事务。
  6. 当且仅当所有channel都提交成功,才会通过Ack命令通知canal并继续获取并消费数据。
  7. 如果有任意channel提交失败,将会重新从上一次消费成功的位置获取数据并再次提交(已提交成功的channel不会再次提交以保证幂等性)。
  8. 整个数据同步作业中,FE通过以上流程不断的从canal获取数据并提交到BE,来完成数据同步。

Mysql端

在Mysql Cluster模式的主从同步中,二进制日志文件(Binlog)记录了主节点上的所有数据变化,数据在Cluster的多个节点间同步、备份都要通过Binlog日志进行,从而提高集群的可用性。架构通常由一个主节点(负责写)和一个或多个从节点(负责读)构成,所有在主节点上发生的数据变更将会复制给从节点。
注意:目前必须要使用Mysql 5.7及以上的版本才能支持Binlog Load功能。

# 打开mysql的二进制binlog日志功能,则需要编辑my.cnf配置文件设置一下。
find / -name my.cnf
/etc/my.cnf
# 修改mysqld中的一些配置文件
[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW

#binlog-format 的三种模式
#ROW   记录每一行数据的信息
#Statement  记录sql语句
#Mixed   上面两种的混合

# 重启 MySQL 使配置生效
systemctl restart mysqld 
-- 创建用户并授权
-- 设置这些参数可以使得mysql的密码简单化
set global validate_password_length=4; 
set global validate_password_policy=0; 
-- 新增一个canal的用户,让他监听所有库中的所有表,并且设置密码为canal
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
-- 刷新一下权限
FLUSH PRIVILEGES;

-- 准备测试表
CREATE TABLE `user_doris2` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `gender` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

配置 Canal 端

Canal 是属于阿里巴巴 otter 项目下的一个子项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,用于解决跨机房同步的业务场景,建议使用 canal 1.1.5及以上版本。

下载地址:https://github.com/alibaba/canal/releases

# 上传并解压 canal deployer压缩包
mkdir /opt/apps/canal
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/apps/canal

# 在 conf 文件夹下新建目录并重命名 
# 一个 canal 服务中可以有多个 instance,conf/下的每一个目录即是一个实例,每个实例下面都有独立的配置文件
mkdir /opt/apps/canel/conf/doris

# 拷贝配置文件模板 
cp /opt/apps/canal/conf/example/instance.properties /opt/apps/canal/conf/doris/
# 修改 conf/canal.properties 的配置
vi canal.properties

# 进入找到canal.destinations = example
# 将其修改为 我们自己配置的目录
canal.destinations = doris
# 修改 instance 配置文件
vi instance.properties 
# 修改:
canal.instance.master.address=doitedu01:3306
# 启动
sh bin/startup.sh

注意:canal client 和 canal instance 是一一对应的,Binlog Load 已限制多个数据同步作 业不能连接到同一个 destination。

配置目标表

基本语法:
CREATE SYNC [db.]job_name
(
 channel_desc,
 channel_desc
 ...
)
binlog_desc
-- 参数说明:
-- job_name:是数据同步作业在当前数据库内的唯一标识
-- channel_desc :用来定义任务下的数据通道,可表示 MySQL 源表到 doris 目标表的映射关系。在设置此项时,如果存在多个映射关系,必须满足 MySQL 源表应该与 doris 目标表是一一对应关系,其他的任何映射关系(如一对多关系),检查语法时都被视为不合法。
-- column_mapping:主要指MySQL源表和doris目标表的列之间的映射关系,如果不指定,FE 会默认源表和目标表的列按顺序一一对应。但是我们依然建议显式的指定列的映射关系,这样当目标表的结构发生变化(比如增加一个 nullable 的列),数据同步作业依然可以进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。 
-- binlog_desc:定义了对接远端 Binlog 地址的一些必要信息,目前可支持的对接类型只有 canal 方式,所有的配置项前都需要加上 canal 前缀。
-- 	canal.server.ip: canal server 的地址 
-- 	canal.server.port: canal server 的端口 
-- 	canal.destination: 前文提到的 instance 的字符串标识 
-- 	canal.batchSize: 每批从 canal server 处获取的 batch 大小的最大值,默认 8192 
-- 	canal.username: instance 的用户名 
-- 	canal.password: instance 的密码 
-- 	canal.debug: 设置为 true 时,会将 batch 和每一行数据的详细信息都打印出来,会影响性能。



-- Doris 创建与 Mysql 对应的目标表
CREATE TABLE `binlog_mysql` ( 
 `id` int(11) NOT NULL COMMENT "", 
 `name` VARCHAR(50) NOT NULL COMMENT "", 
 `age` int(11) NOT NULL COMMENT "" ,
 `gender` VARCHAR(50) NOT NULL COMMENT ""
) ENGINE=OLAP 
UNIQUE KEY(`id`) 
DISTRIBUTED BY HASH(`id`) BUCKETS 1; 


CREATE SYNC test.job20221228
(
 FROM test.binlog_test INTO binlog_test
)
FROM BINLOG 
(
 "type" = "canal",
 "canal.server.ip" = "linux01",
 "canal.server.port" = "11111",
 "canal.destination" = "doris",
 "canal.username" = "canal",
 "canal.password" = "canal"
);

-- 查看作业状态
-- 展示当前数据库的所有数据同步作业状态。 
SHOW SYNC JOB; 
-- 展示数据库 `test_db` 下的所有数据同步作业状态。 
SHOW SYNC JOB FROM `test`; 

-- 停止名称为 `job_name` 的数据同步作业 
STOP SYNC JOB [db.]job_name 

-- 暂停名称为 `job_name` 的数据同步作业 
PAUSE SYNC JOB [db.]job_name 

-- 恢复名称为 `job_name` 的数据同步作业 
RESUME SYNC JOB `job_name` 

数据导出

数据导出(Export)是 Doris 提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据,以文本的格式,通过 Broker 进程导出到远端存储上,如 HDFS / 对象存储(支持S3协议) 等。
原理

  1. 用户提交一个 Export 作业到 FE。
  2. FE 的 Export 调度器会通过两阶段来执行一个 Export 作业:
  3. PENDING:FE 生成 ExportPendingTask,向 BE 发送 snapshot 命令,对所有涉及到的 Tablet 做一个快照。并生成多个查询计划。
  4. EXPORTING:FE 生成 ExportExportingTask,开始执行查询计划。

查询计划拆分

Export 作业会生成多个查询计划,每个查询计划负责扫描一部分 Tablet。每个查询计划扫描的 Tablet 个数由 FE 配置参数 export_tablet_num_per_task 指定,默认为 5。即假设一共 100 个 Tablet,则会生成 20 个查询计划。用户也可以在提交作业时,通过作业属性 tablet_num_per_task 指定这个数值。
一个作业的多个查询计划顺序执行

查询计划执行

一个查询计划扫描多个分片,将读取的数据以行的形式组织,每 1024 行为一个 batch,调用 Broker 写入到远端存储上。
查询计划遇到错误会整体自动重试 3 次。如果一个查询计划重试 3 次依然失败,则整个作业失败。
Doris 会首先在指定的远端存储的路径中,建立一个名为 __doris_export_tmp_12345 的临时目录(其中 12345 为作业 id)。导出的数据首先会写入这个临时目录。每个查询计划会生成一个文件,文件名示例:

export-data-c69fcf2b6db5420f-a96b94c1ff8bccef-1561453713822

其中 c69fcf2b6db5420f-a96b94c1ff8bccef 为查询计划的 query id。1561453713822 为文件生成的时间戳。当所有数据都导出后,Doris 会将这些文件 rename 到用户指定的路径中

示例:导出到hdfs

EXPORT TABLE test.event_info_log1 -- 库名.表名
to "hdfs://linux01:8020/event_info_log1"  -- 导出到那里去
PROPERTIES
(
    "label" = "event_info_log1",
    "column_separator"=",",
    "exec_mem_limit"="2147483648",
    "timeout" = "3600"
)
WITH BROKER "broker_name"
(
    "username" = "root",
    "password" = ""
);

-- 1.label:本次导出作业的标识。后续可以使用这个标识查看作业状态。
-- 2.column_separator:列分隔符。默认为 \t。支持不可见字符,比如 '\x07'。
-- 3.columns:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列。
-- 4.line_delimiter:行分隔符。默认为 \n。支持不可见字符,比如 '\x07'。
-- 5.exec_mem_limit: 表示 Export 作业中,一个查询计划在单个 BE 上的内存使用限制。默认 2GB。单位字节。
-- 6.timeout:作业超时时间。默认 2小时。单位秒。
-- 7.tablet_num_per_task:每个查询计划分配的最大分片数。默认为 5。


-- 查看导出状态
show EXPORT \G;

注意事项

  1. 不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 GB。过大的导出会导致更多的垃圾文件和更高的重试成本。
  2. 如果表数据量过大,建议按照分区导出。
  3. 在 Export 作业运行过程中,如果 FE 发生重启或切主,则 Export 作业会失败,需要用户重新提交。
  4. 如果 Export 作业运行失败,在远端存储中产生的 __doris_export_tmp_xxx 临时目录,以及已经生成的文件不会被删除,需要用户手动删除。
  5. 如果 Export 作业运行成功,在远端存储中产生的 __doris_export_tmp_xxx 目录,根据远端存储的文件系统语义,可能会保留,也可能会被清除。比如在百度对象存储(BOS)中,通过 rename 操作将一个目录中的最后一个文件移走后,该目录也会被删除。如果该目录没有被清除,用户可以手动清除
  6. 当 Export 运行完成后(成功或失败),FE 发生重启或切主,则 SHOW EXPORT展示的作业的部分信息会丢失,无法查看。
  7. Export 作业只会导出 Base 表的数据,不会导出 Rollup Index 的数据。
  8. Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟

查询结果导出

SELECT INTO OUTFILE 语句可以将查询结果导出到文件中。目前支持通过 Broker进程, 通过 S3 协议, 或直接通过 HDFS 协议,导出到远端存储,如 HDFS,S3,BOS,COS (腾讯云)上。

-- 语法
query_stmt  -- 查询语句
INTO OUTFILE "file_path"  --导出文件的路劲
[format_as]  -- 指定文件存储的格式
[properties]  -- 一些配置文件

file_path:指向文件存储的路径以及文件前缀。如 hdfs://path/to/my_file_.最终的文件名将由 my_file_,文件序号以及文件格式后缀组成。其中文件序号由 0 开始,数量为文件被分割的数量

-- 如
my_file_abcdefg_0.csv 
my_file_abcdefg_1.csv 
my_file_abcdegf_2.csv 

-- [format_as]:指定导出格式。默认为 CSV
-- [properties]:指定相关属性。目前支持通过 Broker 进程,hdfs协议等
-- Broker 相关属性需加前缀 broker.
-- HDFS 相关属性需加前缀 hdfs. 其中hdfs.fs.defaultFS 用于填写 namenode地址和端口,属于必填项。
-- 如:
("broker.prop_key" = "broker.prop_val", ...)
("hdfs.fs.defaultFS" = "xxx", "hdfs.hdfs_user" = "xxx")

-- 其他属性:
-- column_separator:列分隔符,仅对 CSV 格式适用。默认为 \t。 
-- line_delimiter:行分隔符,仅对 CSV 格式适用。默认为 \n。 
-- max_file_size:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。
-- schema:PARQUET 文件 schema 信息。仅对 PARQUET 格式适用。导出文件格式为 PARQUET 时,必须指定 schema。

使用 broker 方式,将简单查询结果导出

select * from log_detail where id >2
INTO OUTFILE "hdfs://doitedu01:8020/doris-out/broker_a_" 
FORMAT AS CSV 
PROPERTIES 
( 
"broker.name" = "broker_name", 
"column_separator" = ",", 
"line_delimiter" = "\n", 
"max_file_size" = "100MB" 
); 

使用 HDFS 方式导出

EXPLAIN SELECT * FROM log_detail
INTO OUTFILE "hdfs://doris-out/hdfs_" 
FORMAT AS CSV 
PROPERTIES 
( 
"fs.defaultFS" = "hdfs://doitedu01:8020", 
"hadoop.username" = "root",
"column_separator" = ","
);

标签:canal,hdfs,name,--,导入,id,Doris
From: https://www.cnblogs.com/paopaoT/p/17444248.html

相关文章

  • 中兴新支点系统离线安装ceph 16.2.10
    微信公众号:运维开发故事,作者:wanger关于中兴新支点系统中兴新支点操作系统基于Linux稳定内核,分为嵌入式操作系统(NewStartCGEL)、服务器操作系统(NewStartCGSL)、桌面操作系统(NewStartNSDL),经过近10年专业研发团队的积累和发展,产品形成安全加固、自主可控、易用管理的突出优势。目前,......
  • java中什么是锁消除和锁粗化?
    锁消除和锁粗化都是Java中针对锁的优化技术。锁消除:在Java编译时,JIT编译器可以通过静态分析发现不必要的同步,然后将其消除。这样会使得代码执行更快,因为它减少了线程的上下文切换和锁处理的开销。锁粗化:在Java中,每次加锁和解锁都需要进行系统调用。如果加锁和解锁的代码很近,那么......
  • 消防安全知识答题活动小程序v4.2.0
    消防安全知识答题活动小程序v4.2.0v4.2.01)JavaScript-数组乱序数组乱序在实际开发过程中是可能碰到的,比如消防安全知识答题活动小程序中实现乱序抽题或者选项乱序。我们一开始可能会想到利用数组的sort方法,判断随机出来的0-1的值与0.5的大小,实现排序。该方法实现如下:vararr=[1,......
  • 随机森林模型 的数学原理
    随机森林是一种基于决策树的集成学习方法,其基本思想是通过构建多个决策树来进行分类和回归。随机森林中的每一棵决策树都是在随机样本和随机特征的条件下构建出来的,整个建模过程相当于将多个弱分类器组合成一个强分类器。其主要数学原理如下:1.决策树:随机森林是由多个决策树构成......
  • MacBook 配置远程 jupyter lab
    在从事数据分析/处理过程中,JupyterLab是一个常见且便捷的工具,它属于JupyterNotebook的升级版本。除了可以在本机搭建Jupyter环境,它还支持通过网络远程访问,从而实现在本地编写查看代码,在远程服务器上运行代码的功能。需求背景我日常使用的是一台M1版本的MacBookAir,它......
  • php页面加密码
    以下是一个简单的示例代码,用于在PHP页面中添加密码:<?php$password="mypassword";//设置密码if($_POST['password']!=$password){//如果表单提交的密码不正确if(isset($_POST['submit'])){//如果表单已经提交过echo"InvalidPassword";//显示错......
  • 小灰灰深度学习day5——数据预处理
    内容简介:1.将数据写入.csv文件中    2.将数据从.csv文件中读出  3.利用插值法处理缺失的数据  4.将数据类型转化为torch张量类型代码如下:importosos.makedirs(os.path.join('..','data'),exist_ok=True)data_file=os.path.join('..','data','house_......
  • Feign使用实践
    Feign是一个声明式的HTTP客户端,用于简化微服务架构中的服务调用。它基于注解和接口定义,可以与服务发现组件(例如Eureka)和负载均衡组件(例如Ribbon)集成,提供了更简洁、可读性更高的代码来实现服务间的通信。下面是使用Java代码实现Feign入门示例的详细步骤:添加依赖项:在您的Java......
  • Ribbon使用实践
    Ribbon是Netflix开源的一个负载均衡客户端库,用于在微服务架构中实现客户端的负载均衡。它可以与服务发现组件(例如Eureka)集成,自动地根据可用的服务实例来分发请求。下面是使用Java代码实现Ribbon入门示例的详细步骤:添加依赖项:在您的Java项目中,添加以下依赖项以使用Ribbon客户......
  • Nacos使用实践
    Nacos(全称为"阿里巴巴服务注册中心和配置中心")是一个开源的分布式服务发现和配置管理系统,由阿里巴巴集团开发。它提供了服务注册与发现、动态配置管理、服务健康监测等功能,旨在帮助构建和管理云原生应用。下面是使用Java代码实现Nacos入门示例的详细步骤:准备工作:下载Nacos:从......