首页 > 编程语言 >Hive3源码总结2

Hive3源码总结2

时间:2022-11-12 22:24:31浏览次数:43  
标签:总结 ... tsk Hive3 ctx 源码 new pCtx 2.5

大数据技术之Hive源码2

接上文2.4 HQL生成AST(抽象语法树)

2.5 对AST进一步解析

接下来的步骤包括:

1)将AST转换为QueryBlock进一步转换为OperatorTree;

2)对OperatorTree进行逻辑优化(LogicalOptimizer);

3)将OperatorTree转换为TaskTree(任务树);

4)对TaskTree进行物理优化(PhysicalOptimizer)。

之所以将这4个步骤写在一起,是因为这几个步骤在源码中存在于一个方法中。

2.5.1 compile方法(接2.4.2节compile方法继续往下)

private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse {

PerfLogger perfLogger = SessionState.getPerfLogger(true);

perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);

perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);

lDrvState.stateLock.lock();

... ...

//HQL生成AST

ASTNode tree;

try {

tree = ParseUtils.parse(command, ctx);

} catch (ParseException e) {

parseError = true;

throw e;

} finally {

hookRunner.runAfterParseHook(command, parseError);

}

// Do semantic analysis and plan generation

BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);

if (!retrial) {

openTransaction();

generateValidTxnList();

}

//进一步解析抽象语法树

sem.analyze(tree, ctx);// 2.5.2 analyze方法

}

2.5.2 analyze方法

public void analyze(ASTNode ast, Context ctx) throws SemanticException {

initCtx(ctx);

init(true);

analyzeInternal(ast);// 2.5.3 analyzeInternal方法

}

2.5.3 analyzeInternal方法

public abstract void analyzeInternal(ASTNode ast) throws SemanticException;

此方法为“org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer”抽象类的抽象方法,我们进入实现类“org.apache.hadoop.hive.ql.parse.SemanticAnalyzer”的analyzeInternal方法。

public void analyzeInternal(ASTNode ast) throws SemanticException {

analyzeInternal(ast, new PlannerContextFactory() {

@Override

public PlannerContext create() {

return new PlannerContext();

}

});

}

2.5.4 继续调用重载的analyzeInternal方法

注意:该段源码中出现的“1,2,3,4…11”均为源码所定义步骤,该方法代码虽然很长,但是由于存在官方提供的步骤注释,其实读懂并不难。

void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException {

LOG.info("Starting Semantic Analysis");

// 1. Generate Resolved Parse tree from syntax tree

boolean needsTransform = needsTransform();

//change the location of position alias process here

processPositionAlias(ast);

PlannerContext plannerCtx = pcf.create();

//处理AST,转换为QueryBlock

if (!genResolvedParseTree(ast, plannerCtx)) {

return;

}

... ...

// 2. Gen OP Tree from resolved Parse Tree

Operator sinkOp = genOPTree(ast, plannerCtx);

// 3. Deduce Resultset Schema:定义输出数据的Schema

… …

// 4. Generate Parse Context for Optimizer & Physical compiler

copyInfoToQueryProperties(queryProperties);// ParseContextd中都是优化器的参数,一个个轮询查看是否开启(设为trut)以进行优化而改变逻辑执行计划。

ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,

new HashSet<JoinOperator>(joinContext.keySet()),

new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),

loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,

listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,

globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,

viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,

analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,

queryProperties, viewProjectToTableSchema, acidFileSinks);

... ...

// 5. Take care of view creation:处理视图相关

… …

// 6. Generate table access stats if required

if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {

TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);

setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());

}

// 7. Perform Logical optimization:对操作树执行逻辑优化

if (LOG.isDebugEnabled()) {

LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));

}

//创建优化器

Optimizer optm = new Optimizer();

optm.setPctx(pCtx);

optm.initialize(conf);

//执行优化

pCtx = optm.optimize();

if (pCtx.getColumnAccessInfo() != null) {

// set ColumnAccessInfo for view column authorization

setColumnAccessInfo(pCtx.getColumnAccessInfo());

}

if (LOG.isDebugEnabled()) {

LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));

}

// 8. Generate column access stats if required - wait until column pruning

// takes place during optimization

boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()

&& HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);

if (isColumnInfoNeedForAuth

|| HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {

ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);

// view column access info is carried by this.getColumnAccessInfo().

setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo()));

}

// 9. Optimize Physical op tree & Translate to target execution engine (MR,

// TEZ..):执行物理优化

if (!ctx.getExplainLogical()) {

TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);

compiler.init(queryState, console, db);

//compile为抽象方法,对应的实现类分别为MapReduceCompiler、TezCompiler和SparkCompiler

compiler.compile(pCtx, rootTasks, inputs, outputs);

fetchTask = pCtx.getFetchTask();

}

//find all Acid FileSinkOperatorS

QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId());

// 10. Attach CTAS/Insert-Commit-hooks for Storage Handlers

... ...

LOG.info("Completed plan generation");

// 11. put accessed columns to readEntity

if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {

putAccessedColumnsToReadEntity(inputs, columnAccessInfo);

}

if (isCacheEnabled && lookupInfo != null) {

if (queryCanBeCached()) {

QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo);

// Specify that the results of this query can be cached.

setCacheUsage(new CacheUsage(

CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));

}

}

}

2.5.5 提交任务并执行(接2.3.8节runInternal方法继续往下)

此处接2.3.8节中的第二步:

//2.执行

execute();

2.5.6 execute方法

private void execute() throws CommandProcessorResponse {

PerfLogger perfLogger = SessionState.getPerfLogger();

perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);

... ...

//1.构建任务:根据任务树构建MrJob

setQueryDisplays(plan.getRootTasks());

int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();

int jobs = mrJobs + Utilities.getTezTasks(plan.getRootTasks()).size()

+ Utilities.getSparkTasks(plan.getRootTasks()).size();

if (jobs > 0) {

logMrWarning(mrJobs);

console.printInfo("Query ID = " + queryId);

console.printInfo("Total jobs = " + jobs);

}

perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);

// Loop while you either have tasks running, or tasks queued up

while (driverCxt.isRunning()) {

// Launch upto maxthreads tasks

Task<? extends Serializable> task;

while ((task = driverCxt.getRunnable(maxthreads)) != null) {

//2.启动任务

TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);// 2.5.7 launchTask方法

if (!runner.isRunning()) {

break;

}

}

... ...

//打印结果中最后的OK

if (console != null) {

console.printInfo("OK");

}

}

2.5.7 launchTask方法

private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,

String jobname, int jobs, DriverContext cxt) throws HiveException {

if (SessionState.get() != null) {

SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());

}

if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {

if (noName) {

conf.set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")");

}

conf.set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId());

Utilities.setWorkflowAdjacencies(conf, plan);

cxt.incCurJobNo(1);

console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);

}

tsk.initialize(queryState, plan, cxt, ctx.getOpContext());

TaskRunner tskRun = new TaskRunner(tsk);

//添加启动任务

cxt.launching(tskRun);

// Launch Task:根据是否可以并行来决定是否并行启动Task

if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) {

// Launch it in the parallel mode, as a separate thread only for MR tasks

if (LOG.isInfoEnabled()){

LOG.info("Starting task [" + tsk + "] in parallel");

}

//可并行任务启动,实际上还是执行tskRun.runSequential();

tskRun.start();

} else {

if (LOG.isInfoEnabled()){

LOG.info("Starting task [" + tsk + "] in serial mode");

}

//不可并行任务,则按照序列顺序执行任务

tskRun.runSequential();//2.5.8 runSequential方法

}

return tskRun;

}

2.5.8 runSequential方法

public void runSequential() {

int exitVal = -101;

try {

exitVal = tsk.executeTask(ss == null ? null : ss.getHiveHistory());

} catch (Throwable t) {//2.5.9 executeTask方法

if (tsk.getException() == null) {

tsk.setException(t);

}

LOG.error("Error in executeTask", t);

}

result.setExitVal(exitVal);

if (tsk.getException() != null) {

result.setTaskError(tsk.getException());

}

}

2.5.9 executeTask方法

public int executeTask(HiveHistory hiveHistory) {

try {

this.setStarted();

if (hiveHistory != null) {

hiveHistory.logPlanProgress(queryPlan);

}

int retval = execute(driverContext);// 2.5.10 execute方法

this.setDone();

if (hiveHistory != null) {

hiveHistory.logPlanProgress(queryPlan);

}

return retval;

} catch (IOException e) {

throw new RuntimeException("Unexpected error: " + e.getMessage(), e);

}

}

2.5.10 execute方法

protected abstract int execute(DriverContext driverContext);

此时我们进入了一个抽象“org.apache.hadoop.hive.ql.exec.Task”的“execute”方法,我们则需要找到一个实现类的“execute”方法,此处我选择“org.apache.hadoop.hive.ql.exec.mr.MapRedTask”这个类。

public int execute(DriverContext driverContext) {

Context ctx = driverContext.getCtx();

boolean ctxCreated = false;

try {

... ...

if (!runningViaChild) {

// since we are running the mapred task in the same jvm, we should update the job conf

// in ExecDriver as well to have proper local properties.

if (this.isLocalMode()) {

// save the original job tracker

ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job));

// change it to local

ShimLoader.getHadoopShims().setJobLauncherRpcAddress(job, "local");

}

// we are not running this mapred task via child jvm

// so directly invoke ExecDriver

//设置MR任务的InputFormat、OutputFormat等等这些MRJob的执行类

int ret = super.execute(driverContext);

// restore the previous properties for framework name, RM address etc.

if (this.isLocalMode()) {

// restore the local job tracker back to original

ctx.restoreOriginalTracker();

}

return ret;

}

... ...

//构建执行MR任务的命令

String isSilent = "true".equalsIgnoreCase(System

.getProperty("test.silent")) ? "-nolog" : "";

String jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;

String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "

+ planPath.toString() + " " + isSilent + " " + hiveConfArgs;

... ...

// Run ExecDriver in another JVM

executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));

}

第3章 Hive源码Debug介绍

3.1 Debug环境准备

3.1.1 下载源码包

下载Hive源码包,自行编译一下,建议在Linux环境下编译,然后将整个编译好的包全部拷贝到IDEA工作目录中并使用IDEA打开。该文档是以Hive3.1.2版本作为讲解的。在资料包中提供了已经编译好的Hive源码包。

3.1.2 打开项目配置项

3.1.3 添加远程连接配置组

3.1.4 添加配置信息

3.2 测试

3.2.1 在CliDriver类的run方法中随意打上断点

3.2.2 开启Hive客户端Debug模式

$HIVE_HOME/bin/hive –debug

3.2.3 使用debug模式启动本地项目

3.2.4 在Hive客户端中执行HQL,切换到IDEA中查看

1)在IDEA中查看断点

2)在Hive Debug模式客户端查看

说明:此文档部分图片来自于美团帖子

链接:https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html

标签:总结,...,tsk,Hive3,ctx,源码,new,pCtx,2.5
From: https://www.cnblogs.com/kkk247843405/p/16884845.html

相关文章