load_stmt ::= KW_LOAD KW_LABEL job_label:label LPAREN data_desc_list:dataDescList RPAREN opt_broker:broker opt_properties:properties {: RESULT = new LoadStmt(label, dataDescList, broker, properties); :} | KW_LOAD KW_LABEL job_label:label LPAREN data_desc_list:dataDescList RPAREN resource_desc:resource opt_properties:properties {: RESULT = new LoadStmt(label, dataDescList, resource, properties); :}; data_desc_list ::= data_desc:desc {: RESULT = Lists.newArrayList(desc); :} | data_desc_list:list COMMA data_desc:desc {: list.add(desc); RESULT = list; :} ; data_desc ::= opt_merge_type:mergeType KW_DATA KW_INFILE LPAREN string_list:files RPAREN opt_negative:isNeg KW_INTO KW_TABLE ident:tableName opt_partition_names:partitionNames opt_field_term:colSep # COLUMNS TERMINATED BY opt_file_format:fileFormat # FORMAT AS opt_col_list:colList # 列顺序 opt_columns_from_path:columnsFromPath # COLUMNS FROM PATH AS opt_col_mapping_list:colMappingList # SET (column_mapping) pre_filter_clause:preFilterExpr # PRECEDING FILTER where_clause:whereExpr # WHERE predicate delete_on_clause:deleteExpr # DELETE ON sequence_col_clause:sequenceColName # ORDER BY opt_properties:properties {: RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties); :} | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName opt_negative:isNeg KW_INTO KW_TABLE ident:tableName opt_partition_names:partitionNames opt_col_mapping_list:colMappingList where_clause:whereExpr delete_on_clause:deleteExpr opt_properties:properties {: RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr, mergeType, deleteExpr, properties); :} ; // Broker或者Resource对应Broker系和SparkLoad系 opt_broker ::= {: RESULT = null; :} | KW_WITH KW_S3 LPAREN key_value_map:properties RPAREN {: RESULT = new BrokerDesc("S3", StorageBackend.StorageType.S3, properties); :} | KW_WITH KW_HDFS LPAREN key_value_map:properties RPAREN {: RESULT = new BrokerDesc("HDFS", StorageBackend.StorageType.HDFS, properties); :} | KW_WITH KW_LOCAL LPAREN key_value_map:properties RPAREN {: RESULT = new BrokerDesc("LOCAL", StorageBackend.StorageType.LOCAL, properties); :} | KW_WITH KW_BROKER ident_or_text:name {: RESULT = new BrokerDesc(name, null); :} | KW_WITH KW_BROKER ident_or_text:name LPAREN key_value_map:properties RPAREN {: RESULT = new BrokerDesc(name, properties); :} ; resource_desc ::= KW_WITH KW_RESOURCE ident_or_text:resourceName {: RESULT = new ResourceDesc(resourceName, null); :} | KW_WITH KW_RESOURCE ident_or_text:resourceName LPAREN key_value_map:properties RPAREN {: RESULT = new ResourceDesc(resourceName, properties); :} ;
LOAD语法分为四段:
● KW_LOAD KW_LABEL job_label:label
● data_desc_list - 语法核心映射,可定义多个,以逗号分隔
● opt_broker / resource_desc - 同时定义了BrokerLoad和SparkLoad的设置
○ opt_broker - WITH xxx可以是Broker或者是HDFS,S3,最终都转成BrokerDesc (BrokerLoad专用)
○ resource_desc - Resource,也可额外加一些属性(SparkLoad专用)
● opt_properties - PROPERTIES (xxx)
Load系通用流程
BrokerLoadJob定义与提交
Load作业的执行是异步的,首先会生成LoadJob(BrokerLoadJob或SparkLoadJob),然后提交到LoadJobScheduler中,具体:
● BulkLoadJob.fromLoadStmt生成BrokerLoadJob
○ 生成BrokerLoadJob,并设置Broker信息
○ 设置opt_properties信息
○ 设置data_desc_list信息
● 加入到内存映射中
● 提交到LoadJobScheduler中
BrokerLoadJob执行,生成BrokerLoadPendingTask
新建BrokerLoadPendingTask,放到pendingLoadTaskScheduler中。
Env中有两个相关属性MasterTaskExecutor pendingLoadTaskScheduler和MasterTaskExecutor loadingLoadTaskScheduler,都是线程池。
BrokerLoadPendingTask执行,生成LoadLoadingTask
分两步:
● BrokerLoadPendingTask执行 - 查询要导入的文件列表
● 执行完成,回调给BrokerLoadJob
○ 创建LoadLoadingTask
○ 初始化LoadLoadingTask
■ 创建LoadingTaskPlanner并执行plan
○ 发送LoadLoadingTask给执行器
LoadingTaskPlanner.plan
LoadLoadingTask执行
分两步:
● LoadLoadingTask执行 - 真正导入文件
● 执行完成后回调给BrokerLoadJob,更新状态
Load管理
几个关键对象Load, LoadJob, LoadManager
异步任务
从大了看,Doris的异步任务称为作业,结构如下:
标签:Load,opt,list,Broker,源码,KW,RESULT,properties,desc From: https://www.cnblogs.com/xutaoustc/p/17503719.html