首页 > 编程语言 >【源码阅读】5. Broker Load 导入任务的执行流程

【源码阅读】5. Broker Load 导入任务的执行流程

时间:2023-06-25 19:14:34浏览次数:40  
标签:Load opt list Broker 源码 KW RESULT properties desc

load_stmt ::=
    KW_LOAD KW_LABEL job_label:label
    LPAREN data_desc_list:dataDescList RPAREN
    opt_broker:broker
    opt_properties:properties
    {:
        RESULT = new LoadStmt(label, dataDescList, broker, properties);
    :}
    | KW_LOAD KW_LABEL job_label:label
    LPAREN data_desc_list:dataDescList RPAREN
    resource_desc:resource
    opt_properties:properties
    {:
        RESULT = new LoadStmt(label, dataDescList, resource, properties);
  :};
    
  
data_desc_list ::=
    data_desc:desc
    {:
        RESULT = Lists.newArrayList(desc);
    :}
    | data_desc_list:list COMMA data_desc:desc
    {:
        list.add(desc);
        RESULT = list;
    :}
    ;      
data_desc ::=
    opt_merge_type:mergeType
    KW_DATA KW_INFILE LPAREN string_list:files RPAREN
    opt_negative:isNeg
    KW_INTO KW_TABLE ident:tableName
    opt_partition_names:partitionNames
    opt_field_term:colSep                          # COLUMNS TERMINATED BY
    opt_file_format:fileFormat                     # FORMAT AS
    opt_col_list:colList                           # 列顺序 
    opt_columns_from_path:columnsFromPath          # COLUMNS FROM PATH AS  
    opt_col_mapping_list:colMappingList            # SET (column_mapping)
    pre_filter_clause:preFilterExpr                # PRECEDING FILTER
    where_clause:whereExpr                         # WHERE predicate
    delete_on_clause:deleteExpr                    # DELETE ON
    sequence_col_clause:sequenceColName            # ORDER BY
    opt_properties:properties
    {:
        RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat,
        columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties);
    :}
    | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName
    opt_negative:isNeg
    KW_INTO KW_TABLE ident:tableName
    opt_partition_names:partitionNames
    opt_col_mapping_list:colMappingList
    where_clause:whereExpr
    delete_on_clause:deleteExpr
    opt_properties:properties
    {:
        RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr,
        mergeType, deleteExpr, properties);
    :}
    ;    
    
// Broker或者Resource对应Broker系和SparkLoad系    
        opt_broker ::=
            {:
                RESULT = null;
            :}
            | KW_WITH KW_S3 LPAREN key_value_map:properties RPAREN
            {:
                RESULT = new BrokerDesc("S3", StorageBackend.StorageType.S3, properties);
            :}
            | KW_WITH KW_HDFS LPAREN key_value_map:properties RPAREN
            {:
                RESULT = new BrokerDesc("HDFS", StorageBackend.StorageType.HDFS, properties);
            :}
            | KW_WITH KW_LOCAL LPAREN key_value_map:properties RPAREN
            {:
                RESULT = new BrokerDesc("LOCAL", StorageBackend.StorageType.LOCAL, properties);
            :}
            | KW_WITH KW_BROKER ident_or_text:name
            {:
                RESULT = new BrokerDesc(name, null);
            :}
            | KW_WITH KW_BROKER ident_or_text:name LPAREN key_value_map:properties RPAREN
            {:
                RESULT = new BrokerDesc(name, properties);
            :}
        ;
            
    resource_desc ::=                                                               
    KW_WITH KW_RESOURCE ident_or_text:resourceName                                 
    {:                                                                               
        RESULT = new ResourceDesc(resourceName, null);                               
    :}                                                                               
    | KW_WITH KW_RESOURCE ident_or_text:resourceName LPAREN key_value_map:properties RPAREN
    {:                                                                               
        RESULT = new ResourceDesc(resourceName, properties);                         
    :}                                                                               
            ; 

 

 

LOAD语法分为四段:

● KW_LOAD KW_LABEL job_label:label

● data_desc_list - 语法核心映射,可定义多个,以逗号分隔

● opt_broker / resource_desc - 同时定义了BrokerLoad和SparkLoad的设置

○ opt_broker - WITH xxx可以是Broker或者是HDFS,S3,最终都转成BrokerDesc (BrokerLoad专用)

○ resource_desc - Resource,也可额外加一些属性(SparkLoad专用)

● opt_properties - PROPERTIES (xxx)

 

Load系通用流程

BrokerLoadJob定义与提交

Load作业的执行是异步的,首先会生成LoadJob(BrokerLoadJob或SparkLoadJob),然后提交到LoadJobScheduler中,具体:

● BulkLoadJob.fromLoadStmt生成BrokerLoadJob

○ 生成BrokerLoadJob,并设置Broker信息

○ 设置opt_properties信息

○ 设置data_desc_list信息

● 加入到内存映射中

● 提交到LoadJobScheduler中

 

BrokerLoadJob执行,生成BrokerLoadPendingTask

新建BrokerLoadPendingTask,放到pendingLoadTaskScheduler中。

Env中有两个相关属性MasterTaskExecutor pendingLoadTaskScheduler和MasterTaskExecutor loadingLoadTaskScheduler,都是线程池。

 

BrokerLoadPendingTask执行,生成LoadLoadingTask

分两步:

● BrokerLoadPendingTask执行 - 查询要导入的文件列表

● 执行完成,回调给BrokerLoadJob

○ 创建LoadLoadingTask

○ 初始化LoadLoadingTask

■ 创建LoadingTaskPlanner并执行plan

○ 发送LoadLoadingTask给执行器

LoadingTaskPlanner.plan

 

 

 

LoadLoadingTask执行

分两步:

● LoadLoadingTask执行 - 真正导入文件

● 执行完成后回调给BrokerLoadJob,更新状态

 

 

 

Load管理

几个关键对象Load, LoadJob, LoadManager

 

 

异步任务

从大了看,Doris的异步任务称为作业,结构如下:

标签:Load,opt,list,Broker,源码,KW,RESULT,properties,desc
From: https://www.cnblogs.com/xutaoustc/p/17503719.html

相关文章

  • 【源码阅读】查询
     总体流程StmtExecutor.execute的过程总体分为三步:● 分析hint● analyze-可能会遇到需要forward到master执行的情况;ShowStmt也可能转成SelectStmt○ Query-analyzeAndGenerateQueryPlan○ 其他Stmt直接调用对应的Stmt的analyze● 执行-handleQueryStmt或其......
  • 【源码阅读】节点管理
     最早的节点管理是在BE节点的配置文件中写入fe节点的地址。BE节点在启动时,将知道fe节点的地址并加入集群。但是这样的机制会有一些问题,有时候一个测试节点接入到了线上集群,这种随意的操作测试会导致集群的拓扑结构不可控。节点管理的目的是对节点进行认证,实现一个节点发现和......
  • GRUB(GNU GRand Unified Bootloader)是一个常用的引导加载程序,用于在计算机启动时加载操
    GRUB(GNUGRandUnifiedBootloader)是一个常用的引导加载程序,用于在计算机启动时加载操作系统。它是开源软件,由GNU项目开发并得到广泛应用。GRUB主要有两个版本:GRUBLegacy和GRUB2。GRUB2是较新的版本,也是目前更常用和推荐的版本。下面主要介绍GRUB2的特点和功能:多操作系统支......
  • 【源码阅读】5. 元数据
     通常操作元数据时,会首先更新一条内存数据,然后写入一条元数据更新日志。这样在重启时,通过顺序回放元数据更新日志,即可在内存中重构完整的元数据。Doris一般使用BDBJE存放元数据的更新日志。在记录到达一定数量会在BDBJE中生成新的DB(本质是checkpoint分割点)............
  • 【源码阅读】90. 插件
     系统相关类PluginLoader:插件的加载类,封装了插件信息、配置加载、安装过程。包含如下组件:● PluginInfo:含有插件的基本信息● Plugin接口:插件初始化接口● AuditPlugin接口:包含审计类型插件关联的操作 初始化PluginMgr.init初始化时将构建内置插件AuditLogBuilde......
  • 【源码阅读】3. 建表
    |KW_CREATEopt_external:isExternalKW_TABLEopt_if_not_exists:ifNotExiststable_name:nameLPARENcolumn_definition_list:columnsCOMMAindex_definition_list:indexesRPARENopt_engine:engineNameopt_keys:keysopt_comment......
  • 【源码阅读】4. Stream Load 导入任务的执行流程
     FE起手路由在访问curl--location-trusted-uroot:-Ttest.csv-H"column_separator:,"http://127.0.0.1:8030/api/demo/example_tbl/_stream_load时,FE如下操作:● 检查用户名密码● 检查权限● 随机选择一个BE,Redirect到这个BE上 BE_stream_load的handler......
  • Bert Pytorch 源码分析:二、注意力层
    #注意力机制的具体模块#兼容单头和多头classAttention(nn.Module):"""Compute'ScaledDotProductAttention""" #QKV尺寸都是BS*ML*ES #(或者多头情况下是BS*HC*ML*HS,最后两维之外的维度不重要) #从输入计算QKV的过程可以统一处理,不必......
  • 【源码阅读】2. Catalog和Database
     Catalog创建|KW_CREATEKW_CATALOGopt_if_not_exists:ifNotExistsident:catalogNameopt_properties:properties{:RESULT=newCreateCatalogStmt(ifNotExists,catalogName,null,properties);:}|KW_CREATEKW_CATALOGopt_if_not_......
  • 【源码阅读】1. 配置、VARIABLE与用户PROPERTY
     配置初始化在FE启动时:● Config类ConfField注解标记的静态属性反射出Field存储到内存confFields,作为一个可读取和修改的属性列表(真正的值存储在Config类的静态属性中,反射出Field并存储到confFields只是一个读取和修改指针而已)● 读取配置文件,根据配置文件内容,设置Confi......