首页 > 其他分享 >Doris:数据导入导出

Doris:数据导入导出

时间:2024-05-24 14:57:20浏览次数:18  
标签:hdfs 导出 指定 导入 file my Doris

数据导入

导入(Load)功能就是将用户的原始数据导入到 Doris 中。导入成功后,用户即可通过

Mysql 客户端查询数据。为适配不同的数据导入需求,Doris 系统提供了 6 种不同的导入方式(Broker、Stream、Insert、Multi、Routine、S3)。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。

所有导入方式都支持 csv 数据格式。其中 Broker load 还支持 parquet 和 orc 数据格式。

Broker Load

Broker Load为异步导入,官方推荐Broker的应用场景是在数据量到达几十到百GB级别。

用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给多个 BE 执行,每个 BE 执行一部分导入数据。

BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。

基本语法

这里主要看一下官方的讲解,官网已经说的很详细了。

该命令主要用于通过 Broker 服务进程读取远端存储(如S3、HDFS)上的数据导入到 Doris 表里。

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH BROKER broker_name
[broker_properties]
[load_properties]
[COMMENT "comments"];
  • load_label

    每个导入需要指定一个唯一的 Label。后续可以通过这个 label 来查看作业进度。

    [database.]label_name

  • data_desc1

    用于描述一组需要导入的文件。

    [MERGE|APPEND|DELETE]
    DATA INFILE
    (
    "file_path1"[, file_path2, ...]
    )
    [NEGATIVE]
    INTO TABLE `table_name`
    [PARTITION (p1, p2, ...)]
    [COLUMNS TERMINATED BY "column_separator"]
    [LINES TERMINATED BY "line_delimiter"]
    [FORMAT AS "file_type"]
    [COMPRESS_TYPE AS "compress_type"]
    [(column_list)]
    [COLUMNS FROM PATH AS (c1, c2, ...)]
    [SET (column_mapping)]
    [PRECEDING FILTER predicate]
    [WHERE predicate]
    [DELETE ON expr]
    [ORDER BY source_sequence]
    [PROPERTIES ("key1"="value1", ...)]
    
    • [MERGE|APPEND|DELETE]

      数据合并类型。默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合 [DELETE ON] 语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。

    • DATA INFILE

      指定需要导入的文件路径。可以是多个。可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入会失败。

    • NEGATIVE

      该关键词用于表示本次导入为一批”负“导入。这种方式仅针对具有整型 SUM 聚合类型的聚合数据表。该方式会将导入数据中,SUM 聚合列对应的整型数值取反。主要用于冲抵之前导入错误的数据。

    • PARTITION(p1, p2, ...)

      可以指定仅导入表的某些分区。不在分区范围内的数据将被忽略。

    • COLUMNS TERMINATED BY

      指定列分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。

    • LINES TERMINATED BY

      指定行分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。

    • FORMAT AS

      指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。

    • COMPRESS_TYPE AS 指定文件压缩类型, 支持GZ/BZ2/LZ4FRAME。

    • column list

      用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。

      (k1, k2, tmpk1)

    • COLUMNS FROM PATH AS

      指定从导入文件路径中抽取的列。

    • SET (column_mapping)

      指定列的转换函数。

    • PRECEDING FILTER predicate

      前置过滤条件。数据首先根据 column listCOLUMNS FROM PATH AS 按顺序拼接成原始数据行。然后按照前置过滤条件进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。

    • WHERE predicate

      根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。

    • DELETE ON expr

      需配合 MEREGE 导入模式一起使用,仅针对 Unique Key 模型的表。用于指定导入数据中表示 Delete Flag 的列和计算关系。

    • ORDER BY

      仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。

    • PROPERTIES ("key1"="value1", ...)

      指定导入的format的一些参数。如导入的文件是json格式,则可以在这里指定json_rootjsonpathsfuzzy_parse等参数。

      • enclose

        包围符。当csv数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为",",包围符为"'",数据为"a,'b,c'",则"b,c"会被解析为一个字段。 注意:当 enclose 设置为"时,trim_double_quotes 一定要设置为 true。

      • escape

        转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c被作为一个字段解析,则需要指定单字节转义符,例如"",然后将数据修改为"a,'b,'c'"。

  • WITH BROKER broker_name

    指定需要使用的 Broker 服务名称。在公有云 Doris 中。Broker 服务名称为 bos

  • broker_properties

    指定 broker 所需的信息。这些信息通常被用于 Broker 能够访问远端存储系统。如 BOS 或 HDFS。关于具体信息,可参阅 Broker 文档。

    (
        "key1" = "val1",
        "key2" = "val2",
        ...
    )
    
    • load_properties

      指定导入的相关参数。目前支持以下参数:

      • timeout

        导入超时时间。默认为 4 小时。单位秒。

      • max_filter_ratio

        最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。取值范围为 0 到 1。

      • exec_mem_limit

        导入内存限制。默认为 2GB。单位为字节。

      • strict_mode

        是否对数据进行严格限制。默认为 false。

      • partial_columns

        布尔类型,为 true 表示使用部分列更新,默认值为 false,该参数只允许在表模型为 Unique 且采用 Merge on Write 时设置。

      • timezone

        指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 时区 文档。如果不指定,则使用 "Asia/Shanghai" 时区

      • load_parallelism

        导入并发度,默认为1。调大导入并发度会启动多个执行计划同时执行导入任务,加快导入速度。

      • send_batch_parallelism

        用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 max_send_batch_parallelism_per_job,那么作为协调点的 BE 将使用 max_send_batch_parallelism_per_job 的值。

      • load_to_single_tablet

        布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,作业的任务数取决于整体并发度。该参数只允许在对带有random分桶的olap表导数的时候设置。

      • priority

        设置导入任务的优先级,可选 HIGH/NORMAL/LOW 三种优先级,默认为 NORMAL,对于处在 PENDING 状态的导入任务,更高优先级的任务将优先被执行进入 LOADING 状态。

  • comment

    指定导入任务的备注信息。可选参数。

Example

  1. 从 HDFS 导入一批数据

    LOAD LABEL example_db.label1
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt")
        INTO TABLE `my_table`
        COLUMNS TERMINATED BY ","
    )
    WITH BROKER hdfs
    (
        "username"="hdfs_user",
        "password"="hdfs_password"
    );
    

    导入文件 file.txt,按逗号分隔,导入到表 my_table

  2. 从 HDFS 导入数据,使用通配符匹配两批文件。分别导入到两个表中。

    LOAD LABEL example_db.label2
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
        INTO TABLE `my_table1`
        PARTITION (p1)
        COLUMNS TERMINATED BY ","
        (k1, tmp_k2, tmp_k3)
        SET (
            k2 = tmp_k2 + 1,
            k3 = tmp_k3 + 1
        )
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*")
        INTO TABLE `my_table2`
        COLUMNS TERMINATED BY ","
        (k1, k2, k3)
    )
    WITH BROKER hdfs
    (
        "username"="hdfs_user",
        "password"="hdfs_password"
    );
    

    使用通配符匹配导入两批文件 file-10*file-20*。分别导入到 my_table1my_table2 两张表中。其中 my_table1 指定导入到分区 p1 中,并且将导入源文件中第二列和第三列的值 +1 后导入。

  3. 从 HDFS 导入一批数据。

    LOAD LABEL example_db.label3
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*")
        INTO TABLE `my_table`
        COLUMNS TERMINATED BY "\\x01"
    )
    WITH BROKER my_hdfs_broker
    (
        "username" = "",
        "password" = "",
        "fs.defaultFS" = "hdfs://my_ha",
        "dfs.nameservices" = "my_ha",
        "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
        "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
        "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
        "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
    

    指定分隔符为 Hive 的默认分隔符 \\x01,并使用通配符 * 指定 data 目录下所有目录的所有文件。使用简单认证,同时配置 namenode HA。

  4. 导入 Parquet 格式数据,指定 FORMAT 为 parquet。默认是通过文件后缀判断

    LOAD LABEL example_db.label4
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file")
        INTO TABLE `my_table`
        FORMAT AS "parquet"
        (k1, k2, k3)
    )
    WITH BROKER hdfs
    (
        "username"="hdfs_user",
        "password"="hdfs_password"
    );
    
  5. 导入数据,并提取文件路径中的分区字段

    LOAD LABEL example_db.label10
    (
        DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*")
        INTO TABLE `my_table`
        FORMAT AS "csv"
        (k1, k2, k3)
        COLUMNS FROM PATH AS (city, utc_date)
    )
    WITH BROKER hdfs
    (
        "username"="hdfs_user",
        "password"="hdfs_password"
    );
    

    my_table 表中的列为 k1, k2, k3, city, utc_date

    其中 hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing 目录下包括如下文件:

    hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv
    

    文件中只包含 k1, k2, k3 三列数据,city, utc_date 这两列数据会从文件路径中提取。

  6. 对待导入数据进行过滤。

    LOAD LABEL example_db.label6
    (
        DATA INFILE("hdfs://host:port/input/file")
        INTO TABLE `my_table`
        (k1, k2, k3)
        SET (
            k2 = k2 + 1
        )
        PRECEDING FILTER k1 = 1
        WHERE k1 > k2
    )
    WITH BROKER hdfs
    (
        "username"="user",
        "password"="pass"
    );
    

    只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。

  7. 导入数据,提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换)

    LOAD LABEL example_db.label7
    (
        DATA INFILE("hdfs://host:port/user/data/*/test.txt") 
        INTO TABLE `tbl12`
        COLUMNS TERMINATED BY ","
        (k2,k3)
        COLUMNS FROM PATH AS (data_time)
        SET (
            data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
        )
    )
    WITH BROKER hdfs
    (
        "username"="user",
        "password"="pass"
    );
    

    路径下有如下文件:

    /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
    /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
    

    表结构为:

    data_time DATETIME,
    k2        INT,
    k3        INT
    
  8. 从 HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker 的 broker。简单认证。并且将原有数据中与 导入数据中v2 大于100 的列相匹配的列删除,其他列正常导入

    LOAD LABEL example_db.label8
    (
        MERGE DATA INFILE("HDFS://test:802/input/file")
        INTO TABLE `my_table`
        (k1, k2, k3, v2, v1)
        DELETE ON v2 > 100
    )
    WITH HDFS
    (
        "hadoop.username"="user",
        "password"="pass"
    )
    PROPERTIES
    (
        "timeout" = "3600",
        "max_filter_ratio" = "0.1"
    );
    

    使用 MERGE 方式导入。my_table 必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。

    导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。

  9. 导入时指定source_sequence列,保证UNIQUE_KEYS表中的替换顺序:

    LOAD LABEL example_db.label9
    (
        DATA INFILE("HDFS://test:802/input/file")
        INTO TABLE `my_table`
        COLUMNS TERMINATED BY ","
        (k1,k2,source_sequence,v1,v2)
        ORDER BY source_sequence
    ) 
    WITH HDFS
    (
        "hadoop.username"="user",
        "password"="pass"
    )
    

    my_table 必须是 Unique Key 模型表,并且指定了 Sequcence Col。数据会按照源数据中 source_sequence 列的值来保证顺序性。

  10. 从 HDFS 导入一批数据,指定文件格式为 json 并指定 json_rootjsonpaths

    LOAD LABEL example_db.label10
    (
        DATA INFILE("HDFS://test:port/input/file.json")
        INTO TABLE `my_table`
        FORMAT AS "json"
        PROPERTIES(
          "json_root" = "$.item",
          "jsonpaths" = "[$.id, $.city, $.code]"
        )       
    )
    with HDFS (
    "hadoop.username" = "user"
    "password" = ""
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );
    

    jsonpaths 可与 column listSET (column_mapping)配合:

    LOAD LABEL example_db.label10
    (
        DATA INFILE("HDFS://test:port/input/file.json")
        INTO TABLE `my_table`
        FORMAT AS "json"
        (id, code, city)
        SET (id = id * 10)
        PROPERTIES(
          "json_root" = "$.item",
          "jsonpaths" = "[$.id, $.code, $.city]"
        )       
    )
    with HDFS (
    "hadoop.username" = "user"
    "password" = ""
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );
    
  11. 从腾讯云cos中以csv格式导入数据。

    LOAD LABEL example_db.label10
    (
    DATA INFILE("cosn://my_bucket/input/file.csv")
    INTO TABLE `my_table`
    (k1, k2, k3)
    )
    WITH BROKER "broker_name"
    (
        "fs.cosn.userinfo.secretId" = "xxx",
        "fs.cosn.userinfo.secretKey" = "xxxx",
        "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
    )
    
  12. 导入CSV数据时去掉双引号, 并跳过前5行。

    LOAD LABEL example_db.label12
    (
    DATA INFILE("cosn://my_bucket/input/file.csv")
    INTO TABLE `my_table`
    (k1, k2, k3)
    PROPERTIES("trim_double_quotes" = "true", "skip_lines" = "5")
    )
    WITH BROKER "broker_name"
    (
        "fs.cosn.userinfo.secretId" = "xxx",
        "fs.cosn.userinfo.secretKey" = "xxxx",
        "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
    )
    

Keywords

BROKER, LOAD

Best Practice

  1. 查看导入任务状态

    Broker Load 是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过 SHOW LOAD 命令查看。

  2. 取消导入任务

    已提交切尚未结束的导入任务可以通过 CANCEL LOAD 命令取消。取消后,已写入的数据也会回滚,不会生效。

  3. Label、导入事务、多表原子性

    Doris 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 导入事务和原子性 文档。

  4. 列映射、衍生列和过滤

    Doris 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 列的映射,转换与过滤 文档。

  5. 错误数据过滤

    Doris 的导入任务可以容忍一部分格式错误的数据。容忍了通过 max_filter_ratio 设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果用户希望忽略部分有问题的数据行,可以将次参数设置为 0~1 之间的数值,Doris 会自动跳过哪些数据格式不正确的行。

    关于容忍率的一些计算方式,可以参阅 列的映射,转换与过滤 文档。

  6. 严格模式

    strict_mode 属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。关于严格模式的具体说明,可参阅 严格模式 文档。

  7. 超时时间

    Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。

  8. 数据量和任务数限制

    Broker Load 适合在一个导入任务中导入100GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加。

    同时受限于集群规模,我们限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。

    Doris 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。

数据导出

Export

Description

EXPORT 命令用于将指定表的数据导出为文件到指定位置。目前支持通过 Broker 进程, S3 协议或HDFS 协议,导出到远端存储,如 HDFS,S3,BOS,COS(腾讯云)上。

EXPORT是一个异步操作,该命令会提交一个EXPORT JOB到Doris,任务提交成功立即返回。执行后可使用 SHOW EXPORT 命令查看进度。

语法:

EXPORT TABLE table_name
[PARTITION (p1[,p2])]
[WHERE]
TO export_path
[opt_properties]
WITH BROKER/S3/HDFS
[broker_properties];

说明

  • table_name

    当前要导出的表的表名。支持 Doris 本地表、视图View、Catalog外表数据的导出。

  • partition

    可以只导出指定表的某些指定分区,只对Doris本地表有效。

  • export_path

    导出的文件路径。可以是目录,也可以是文件目录加文件前缀,如hdfs://path/to/my_file_

  • opt_properties

    用于指定一些导出参数。

    [PROPERTIES ("key"="value", ...)]
    

    可以指定如下参数:

    • label: 可选参数,指定此次Export任务的label,当不指定时系统会随机生成一个label。
    • column_separator:指定导出的列分隔符,默认为\t,支持多字节。该参数只用于csv文件格式。
    • line_delimiter:指定导出的行分隔符,默认为\n,支持多字节。该参数只用于csv文件格式。
    • columns:指定导出表的某些列。
    • format:指定导出作业的文件格式,支持:parquet, orc, csv, csv_with_names、csv_with_names_and_types。默认为csv格式。
    • max_file_size:导出作业单个文件大小限制,如果结果超过这个值,将切割成多个文件。max_file_size取值范围是[5MB, 2GB], 默认为1GB。(当指定导出为orc文件格式时,实际切分文件的大小将是64MB的倍数,如:指定max_file_size = 5MB, 实际将以64MB为切分;指定max_file_size = 65MB, 实际将以128MB为切分)
    • parallelism:导出作业的并发度,默认为1,导出作业会开启parallelism个数的线程去执行select into outfile语句。(如果parallelism个数大于表的tablets个数,系统将自动把parallelism设置为tablets个数大小,即每一个select into outfile语句负责一个tablets)
    • delete_existing_files: 默认为false,若指定为true,则会先删除export_path所指定目录下的所有文件,然后导出数据到该目录下。例如:"export_path" = "/user/tmp", 则会删除"/user/"下所有文件及目录;"file_path" = "/user/tmp/", 则会删除"/user/tmp/"下所有文件及目录。
    • with_bom: 默认为false,若指定为true,则导出的文件编码为带有BOM的UTF8编码(只对csv相关的文件格式生效)。
    • data_consistency: 可以设置为 none / partition ,默认为 partition 。指示以何种粒度切分导出表,none 代表 Tablets 级别,partition代表 Partition 级别。
    • timeout:导出作业的超时时间,默认为2小时,单位是秒。

    注意:要使用delete_existing_files参数,还需要在fe.conf中添加配置enable_delete_existing_files = true并重启fe,此时delete_existing_files才会生效。delete_existing_files = true 是一个危险的操作,建议只在测试环境中使用。

  • WITH BROKER

    可以通过 Broker 进程写数据到远端存储上。这里需要定义相关的连接信息供 Broker 使用。

    语法:
    WITH BROKER "broker_name"
    ("key"="value"[,...])
    
    Broker相关属性:
      username: 用户名
      password: 密码
      hadoop.security.authentication: 指定认证方式为 kerberos
      kerberos_principal: 指定 kerberos 的 principal
      kerberos_keytab: 指定 kerberos 的 keytab 文件路径。该文件必须为 Broker 进程所在服务器上的文件的绝对路径。并且可以被 Broker 进程访问
    
  • WITH HDFS

    可以直接将数据写到远端HDFS上。

    语法:
    WITH HDFS ("key"="value"[,...])
    
    HDFS 相关属性:
      fs.defaultFS: namenode 地址和端口
      hadoop.username: hdfs 用户名
      dfs.nameservices: name service名称,与hdfs-site.xml保持一致
      dfs.ha.namenodes.[nameservice ID]: namenode的id列表,与hdfs-site.xml保持一致
      dfs.namenode.rpc-address.[nameservice ID].[name node ID]: Name node的rpc地址,数量与namenode数量相同,与hdfs-site.xml保
    
      对于开启kerberos认证的Hadoop 集群,还需要额外设置如下 PROPERTIES 属性:
      dfs.namenode.kerberos.principal: HDFS namenode 服务的 principal 名称
      hadoop.security.authentication: 认证方式设置为 kerberos
      hadoop.kerberos.principal: 设置 Doris 连接 HDFS 时使用的 Kerberos 主体
      hadoop.kerberos.keytab: 设置 keytab 本地文件路径
    
  • WITH S3

    可以直接将数据写到远端S3对象存储上。

    语法:
    WITH S3 ("key"="value"[,...])
    
    S3 相关属性:
      AWS_ENDPOINT
      AWS_ACCESS_KEY
      AWS_SECRET_KEY
      AWS_REGION
      use_path_style: (选填) 默认为false 。S3 SDK 默认使用 virtual-hosted style 方式。但某些对象存储系统可能没开启或不支持virtual-hosted style 方式的访问,此时可以添加 use_path_style 参数来强制使用 path style 访问方式。
    

Example

export数据到本地

export数据到本地文件系统,需要在fe.conf中添加enable_outfile_to_local=true并且重启FE。

  1. 将test表中的所有数据导出到本地存储, 默认导出csv格式文件
EXPORT TABLE test TO "file:///home/user/tmp/";
  1. 将test表中的k1,k2列导出到本地存储, 默认导出csv文件格式,并设置label
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "label" = "label1",
  "columns" = "k1,k2"
);
  1. 将test表中的 k1 < 50 的行导出到本地存储, 默认导出csv格式文件,并以,作为列分割符
EXPORT TABLE test WHERE k1 < 50 TO "file:///home/user/tmp/"
PROPERTIES (
  "columns" = "k1,k2",
  "column_separator"=","
);
  1. 将 test 表中的分区p1,p2导出到本地存储, 默认导出csv格式文件
EXPORT TABLE test PARTITION (p1,p2) TO "file:///home/user/tmp/" 
PROPERTIES ("columns" = "k1,k2");
  1. 将test表中的所有数据导出到本地存储,导出其他格式的文件
// parquet格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "columns" = "k1,k2",
  "format" = "parquet"
);

// orc格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "columns" = "k1,k2",
  "format" = "orc"
);

// csv_with_names格式, 以’AA‘为列分割符,‘zz’为行分割符
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "format" = "csv_with_names",
  "column_separator"="AA",
  "line_delimiter" = "zz"
);

// csv_with_names_and_types格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "format" = "csv_with_names_and_types"
);
  1. 设置max_file_sizes属性
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "format" = "parquet",
  "max_file_size" = "5MB"
);

当导出文件大于5MB时,将切割数据为多个文件,每个文件最大为5MB。

  1. 设置parallelism属性
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
  "format" = "parquet",
  "max_file_size" = "5MB",
  "parallelism" = "5"
);
  1. 设置delete_existing_files属性
EXPORT TABLE test TO "file:///home/user/tmp"
PROPERTIES (
  "format" = "parquet",
  "max_file_size" = "5MB",
  "delete_existing_files" = "true"
);

Export导出数据时会先将/home/user/目录下所有文件及目录删除,然后导出数据到该目录下。

export with S3
  1. 将 s3_test 表中的所有数据导出到 s3 上,以不可见字符 "\x07" 作为列或者行分隔符。如果需要将数据导出到minio,还需要指定use_path_style=true。
EXPORT TABLE s3_test TO "s3://bucket/a/b/c" 
PROPERTIES (
  "column_separator"="\\x07", 
  "line_delimiter" = "\\x07"
) WITH s3 (
  "s3.endpoint" = "xxxxx",
  "s3.region" = "xxxxx",
  "s3.secret_key"="xxxx",
  "s3.access_key" = "xxxxx"
)
export with HDFS
  1. 将 test 表中的所有数据导出到 HDFS 上,导出文件格式为parquet,导出作业单个文件大小限制为512MB,保留所指定目录下的所有文件。
EXPORT TABLE test TO "hdfs://hdfs_host:port/a/b/c/" 
PROPERTIES(
    "format" = "parquet",
    "max_file_size" = "512MB",
    "delete_existing_files" = "false"
)
with HDFS (
"fs.defaultFS"="hdfs://hdfs_host:port",
"hadoop.username" = "hadoop"
);
export with Broker

需要先启动broker进程,并在FE中添加该broker。

  1. 将 test 表中的所有数据导出到 hdfs 上
EXPORT TABLE test TO "hdfs://hdfs_host:port/a/b/c" 
WITH BROKER "broker_name" 
(
  "username"="xxx",
  "password"="yyy"
);
  1. 将 testTbl 表中的分区p1,p2导出到 hdfs 上,以","作为列分隔符,并指定label
EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c" 
PROPERTIES (
  "label" = "mylabel",
  "column_separator"=","
) 
WITH BROKER "broker_name" 
(
  "username"="xxx",
  "password"="yyy"
);
  1. 将 testTbl 表中的所有数据导出到 hdfs 上,以不可见字符 "\x07" 作为列或者行分隔符。
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" 
PROPERTIES (
  "column_separator"="\\x07", 
  "line_delimiter" = "\\x07"
) 
WITH BROKER "broker_name" 
(
  "username"="xxx", 
  "password"="yyy"
)

Keywords

EXPORT

Best Practice

并发执行

一个 Export 作业可以设置parallelism参数来并发导出数据。parallelism参数实际就是指定执行 EXPORT 作业的线程数量。当设置"data_consistency" = "none"时,每一个线程会负责导出表的部分Tablets。

一个 Export 作业的底层执行逻辑实际上是SELECT INTO OUTFILE语句,parallelism参数设置的每一个线程都会去执行独立的SELECT INTO OUTFILE语句。

Export 作业拆分成多个SELECT INTO OUTFILE的具体逻辑是:将该表的所有tablets平均的分给所有parallel线程,如:

  • num(tablets) = 40, parallelism = 3,则这3个线程各自负责的tablets数量分别为 14,13,13个。
  • num(tablets) = 2, parallelism = 3,则Doris会自动将parallelism设置为2,每一个线程负责一个tablets。

当一个线程负责的tablest超过 maximum_tablets_of_outfile_in_export 数值(默认为10,可在fe.conf中添加maximum_tablets_of_outfile_in_export参数来修改该值)时,该线程就会拆分为多个SELECT INTO OUTFILE语句,如:

  • 一个线程负责的tablets数量分别为 14,maximum_tablets_of_outfile_in_export = 10,则该线程负责两个SELECT INTO OUTFILE语句,第一个SELECT INTO OUTFILE语句导出10个tablets,第二个SELECT INTO OUTFILE语句导出4个tablets,两个SELECT INTO OUTFILE语句由该线程串行执行。

当所要导出的数据量很大时,可以考虑适当调大parallelism参数来增加并发导出。若机器核数紧张,无法再增加parallelism 而导出表的Tablets又较多 时,可以考虑调大maximum_tablets_of_outfile_in_export来增加一个SELECT INTO OUTFILE语句负责的tablets数量,也可以加快导出速度。

若希望以 Parition 粒度导出 Table ,可以设置 Export 属性 "data_consistency" = "partition" ,此时 Export 任务并发的线程会以 Parition 粒度来划分为多个 Outfile 语句,不同的 Outfile 语句导出的 Parition 不同,而同一个 Outfile 语句导出的数据一定属于同一个 Partition。如:设置 "data_consistency" = "partition"

  • num(partition) = 40, parallelism = 3,则这3个线程各自负责的 Partition 数量分别为 14,13,13个。
  • num(partition) = 2, parallelism = 3,则 Doris 会自动将 Parallelism 设置为2,每一个线程负责一个 Partition 。
内存限制

通常一个 Export 作业的查询计划只有 扫描-导出 两部分,不涉及需要太多内存的计算逻辑。所以通常 2GB 的默认内存限制可以满足需求。

但在某些场景下,比如一个查询计划,在同一个 BE 上需要扫描的 Tablet 过多,或者 Tablet 的数据版本过多时,可能会导致内存不足。可以调整session变量exec_mem_limit来调大内存使用限制。

注意事项
  • 不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 GB。过大的导出会导致更多的垃圾文件和更高的重试成本。如果表数据量过大,建议按照分区导出。
  • 如果 Export 作业运行失败,已经生成的文件不会被删除,需要用户手动删除。
  • Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟。
  • 目前在export时只是简单检查tablets版本是否一致,建议在执行export过程中不要对该表进行导入数据操作。
  • 一个Export Job允许导出的分区数量最大为2000,可以在fe.conf中添加参数maximum_number_of_export_partitions并重启FE来修改该设置。

标签:hdfs,导出,指定,导入,file,my,Doris
From: https://www.cnblogs.com/Mr-Sponge/p/18210983

相关文章

  • 前端实现导入,导出/纯前端实现将数据导出为excel,将excel数据导入传给后端
    一、下载插件(xlsx如果报错utils未定义就用下面这个版本)[email protected]@2.0.5二、页面中引入import*asXLSXfrom'xlsx';import{saveAs}from'file-saver'; 三、导入/导出表格内容1.导出exportExcel(){  ......
  • 探究-百万级别excel数据的导入导出
    思路:1.从数据库中读,分批次读取2.多线程去写到一个excel中,3.网页如何不卡界面的方式,下载文件界面显示进度条,不影响主界面,下载完成,网站弹出小框口提示百万级别数据导入导出如何优化?excel导入导出;痛点:1.导入百万级别数据(excel)到DB,有哪些痛点?1)一次加载百万级别数据到内存,发生OOM!2)导......
  • 2024版Pycharm导入conda环境
    旧版与新版的区别大致就是旧版借用python.exe文件来导入虚拟环境,而现在的新版本需要借用Anaconda3文件中的condabin文件夹中的conda.bat文件来导入已创建的虚拟环境。(1)进入设置(2)选择interpreter  (3)选择conda环境 首先浏览到condabin的位置,选择conda;然后点击加载环境,而......
  • 从立创导出元件原理图、封装、3D模型到AD的方法
    1.导出原理图搜索器件并点击数据手册再点击立即打开原理图里面点击导出AD,即可下载原理图文件将下载文件用AD打开,并生成原理图库将生成的原理图库文件复制到自己的原理图库即可2.导出PCB步骤同上,不再赘述3.导出3D模型这里就不能用网页版立创,在软件立创EDA里面先随便......
  • 从实验室的数据单向导出,如何防止员工数据夹带?
    实验室在进行各种实验和研究活动时,会产生多种类型的数据。这些数据是实验过程和结果的重要记录,对于科研工作的进展和成果具有重要意义。以下是一些常见的实验室产生的数据类型:1、原始实验数据:这是实验过程中直接观测或测量得到的数据,如电压、电流、温度、压力、光谱数据、质谱数......
  • Android导出所有多语言字串到表格
    这种事情当然要写脚本完成啦,不然一个一个添加嘛?!!!python脚本:##注意:##1.有些语言的符号可能会乱码,比如es--->¡---->?##2.base_strings.xml必须包含所有key值,作为基准,否则可能会楼填。##3.使用:依次将不同语言的xml文件内容复制到target_strings.xml中,运行MergeLa......
  • Doris:概念与基础操作
    Doris一款现代化的MPP分析性数据库产品支持亚秒级响应支持10PB以上数据集兼容MySQL协议基础概念doris有3种基础表:明细表(Duplicate):一张普普通通的表,doris默认表模式,支持数据预排序主键表(Unique):一种特殊的聚合表,如果主键重复,会自动更新其他值聚合表(Aggregate):聚合模......
  • JS之打印导出PDF
    我们可以直接调用浏览器的打印功能来实现。1.直接打印直接调用浏览器的打印功能,打印整个页面functionpreview(){window.print();}2.打印指定区域通过开始标记、结束标记来打印,打印局部页面<!--startprint--><div>打印的内容</div><!--end......
  • Ement-Plus框架的列表table导出excel数据表
    1.页面预览2.搜索条件区域code <!--查询--><divclass="table-container"><el-form:inline="true":model="queryForm"class="query-form"ref="queryFormRef"><el-form-itemlabe......
  • clickhouse表结构导出为
     查看代码#!/bin/bashOUTDIR=/opt/backup/#获取所有数据库并保存到文件中clickhouse-client--userdefault--password123456-q"SHOWDATABASES">/opt/backup/db.txt#读取数据库文件whileread-rdb;do#排除system和INFORMATION_SCHEMA数据库de......