大数据技术之Hive源码2
接上文2.4 HQL生成AST(抽象语法树)
2.5 对AST进一步解析
接下来的步骤包括:
1)将AST转换为QueryBlock进一步转换为OperatorTree;
2)对OperatorTree进行逻辑优化(LogicalOptimizer);
3)将OperatorTree转换为TaskTree(任务树);
4)对TaskTree进行物理优化(PhysicalOptimizer)。
之所以将这4个步骤写在一起,是因为这几个步骤在源码中存在于一个方法中。
2.5.1 compile方法(接2.4.2节compile方法继续往下)
private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
lDrvState.stateLock.lock();
... ...
//HQL生成AST
ASTNode tree;
try {
tree = ParseUtils.parse(command, ctx);
} catch (ParseException e) {
parseError = true;
throw e;
} finally {
hookRunner.runAfterParseHook(command, parseError);
}
// Do semantic analysis and plan generation
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
if (!retrial) {
openTransaction();
generateValidTxnList();
}
//进一步解析抽象语法树
sem.analyze(tree, ctx);// 2.5.2 analyze方法
}
2.5.2 analyze方法
public void analyze(ASTNode ast, Context ctx) throws SemanticException {
initCtx(ctx);
init(true);
analyzeInternal(ast);// 2.5.3 analyzeInternal方法
}
2.5.3 analyzeInternal方法
public abstract void analyzeInternal(ASTNode ast) throws SemanticException;
此方法为“org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer”抽象类的抽象方法,我们进入实现类“org.apache.hadoop.hive.ql.parse.SemanticAnalyzer”的analyzeInternal方法。
public void analyzeInternal(ASTNode ast) throws SemanticException {
analyzeInternal(ast, new PlannerContextFactory() {
@Override
public PlannerContext create() {
return new PlannerContext();
}
});
}
2.5.4 继续调用重载的analyzeInternal方法
注意:该段源码中出现的“1,2,3,4…11”均为源码所定义步骤,该方法代码虽然很长,但是由于存在官方提供的步骤注释,其实读懂并不难。
void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException {
LOG.info("Starting Semantic Analysis");
// 1. Generate Resolved Parse tree from syntax tree
boolean needsTransform = needsTransform();
//change the location of position alias process here
processPositionAlias(ast);
PlannerContext plannerCtx = pcf.create();
//处理AST,转换为QueryBlock
if (!genResolvedParseTree(ast, plannerCtx)) {
return;
}
... ...
// 2. Gen OP Tree from resolved Parse Tree
Operator sinkOp = genOPTree(ast, plannerCtx);
// 3. Deduce Resultset Schema:定义输出数据的Schema
… …
// 4. Generate Parse Context for Optimizer & Physical compiler
copyInfoToQueryProperties(queryProperties);// ParseContextd中都是优化器的参数,一个个轮询查看是否开启(设为trut)以进行优化而改变逻辑执行计划。
ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,
new HashSet<JoinOperator>(joinContext.keySet()),
new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,
globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,
queryProperties, viewProjectToTableSchema, acidFileSinks);
... ...
// 5. Take care of view creation:处理视图相关
… …
// 6. Generate table access stats if required
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {
TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);
setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());
}
// 7. Perform Logical optimization:对操作树执行逻辑优化
if (LOG.isDebugEnabled()) {
LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
//创建优化器
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
optm.initialize(conf);
//执行优化
pCtx = optm.optimize();
if (pCtx.getColumnAccessInfo() != null) {
// set ColumnAccessInfo for view column authorization
setColumnAccessInfo(pCtx.getColumnAccessInfo());
}
if (LOG.isDebugEnabled()) {
LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
// 8. Generate column access stats if required - wait until column pruning
// takes place during optimization
boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()
&& HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);
if (isColumnInfoNeedForAuth
|| HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);
// view column access info is carried by this.getColumnAccessInfo().
setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo()));
}
// 9. Optimize Physical op tree & Translate to target execution engine (MR,
// TEZ..):执行物理优化
if (!ctx.getExplainLogical()) {
TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
compiler.init(queryState, console, db);
//compile为抽象方法,对应的实现类分别为MapReduceCompiler、TezCompiler和SparkCompiler
compiler.compile(pCtx, rootTasks, inputs, outputs);
fetchTask = pCtx.getFetchTask();
}
//find all Acid FileSinkOperatorS
QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId());
// 10. Attach CTAS/Insert-Commit-hooks for Storage Handlers
... ...
LOG.info("Completed plan generation");
// 11. put accessed columns to readEntity
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
putAccessedColumnsToReadEntity(inputs, columnAccessInfo);
}
if (isCacheEnabled && lookupInfo != null) {
if (queryCanBeCached()) {
QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo);
// Specify that the results of this query can be cached.
setCacheUsage(new CacheUsage(
CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));
}
}
}
2.5.5 提交任务并执行(接2.3.8节runInternal方法继续往下)
此处接2.3.8节中的第二步:
//2.执行
execute();
2.5.6 execute方法
private void execute() throws CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
... ...
//1.构建任务:根据任务树构建MrJob
setQueryDisplays(plan.getRootTasks());
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int jobs = mrJobs + Utilities.getTezTasks(plan.getRootTasks()).size()
+ Utilities.getSparkTasks(plan.getRootTasks()).size();
if (jobs > 0) {
logMrWarning(mrJobs);
console.printInfo("Query ID = " + queryId);
console.printInfo("Total jobs = " + jobs);
}
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
while (driverCxt.isRunning()) {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
//2.启动任务
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);// 2.5.7 launchTask方法
if (!runner.isRunning()) {
break;
}
}
... ...
//打印结果中最后的OK
if (console != null) {
console.printInfo("OK");
}
}
2.5.7 launchTask方法
private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
String jobname, int jobs, DriverContext cxt) throws HiveException {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
}
if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
if (noName) {
conf.set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")");
}
conf.set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId());
Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
tsk.initialize(queryState, plan, cxt, ctx.getOpContext());
TaskRunner tskRun = new TaskRunner(tsk);
//添加启动任务
cxt.launching(tskRun);
// Launch Task:根据是否可以并行来决定是否并行启动Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
//可并行任务启动,实际上还是执行tskRun.runSequential();
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in serial mode");
}
//不可并行任务,则按照序列顺序执行任务
tskRun.runSequential();//2.5.8 runSequential方法
}
return tskRun;
}
2.5.8 runSequential方法
public void runSequential() {
int exitVal = -101;
try {
exitVal = tsk.executeTask(ss == null ? null : ss.getHiveHistory());
} catch (Throwable t) {//2.5.9 executeTask方法
if (tsk.getException() == null) {
tsk.setException(t);
}
LOG.error("Error in executeTask", t);
}
result.setExitVal(exitVal);
if (tsk.getException() != null) {
result.setTaskError(tsk.getException());
}
}
2.5.9 executeTask方法
public int executeTask(HiveHistory hiveHistory) {
try {
this.setStarted();
if (hiveHistory != null) {
hiveHistory.logPlanProgress(queryPlan);
}
int retval = execute(driverContext);// 2.5.10 execute方法
this.setDone();
if (hiveHistory != null) {
hiveHistory.logPlanProgress(queryPlan);
}
return retval;
} catch (IOException e) {
throw new RuntimeException("Unexpected error: " + e.getMessage(), e);
}
}
2.5.10 execute方法
protected abstract int execute(DriverContext driverContext);
此时我们进入了一个抽象“org.apache.hadoop.hive.ql.exec.Task”的“execute”方法,我们则需要找到一个实现类的“execute”方法,此处我选择“org.apache.hadoop.hive.ql.exec.mr.MapRedTask”这个类。
public int execute(DriverContext driverContext) {
Context ctx = driverContext.getCtx();
boolean ctxCreated = false;
try {
... ...
if (!runningViaChild) {
// since we are running the mapred task in the same jvm, we should update the job conf
// in ExecDriver as well to have proper local properties.
if (this.isLocalMode()) {
// save the original job tracker
ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job));
// change it to local
ShimLoader.getHadoopShims().setJobLauncherRpcAddress(job, "local");
}
// we are not running this mapred task via child jvm
// so directly invoke ExecDriver
//设置MR任务的InputFormat、OutputFormat等等这些MRJob的执行类
int ret = super.execute(driverContext);
// restore the previous properties for framework name, RM address etc.
if (this.isLocalMode()) {
// restore the local job tracker back to original
ctx.restoreOriginalTracker();
}
return ret;
}
... ...
//构建执行MR任务的命令
String isSilent = "true".equalsIgnoreCase(System
.getProperty("test.silent")) ? "-nolog" : "";
String jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;
String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "
+ planPath.toString() + " " + isSilent + " " + hiveConfArgs;
... ...
// Run ExecDriver in another JVM
executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
}
第3章 Hive源码Debug介绍
3.1 Debug环境准备
3.1.1 下载源码包
下载Hive源码包,自行编译一下,建议在Linux环境下编译,然后将整个编译好的包全部拷贝到IDEA工作目录中并使用IDEA打开。该文档是以Hive3.1.2版本作为讲解的。在资料包中提供了已经编译好的Hive源码包。
3.1.2 打开项目配置项
3.1.3 添加远程连接配置组
3.1.4 添加配置信息
3.2 测试
3.2.1 在CliDriver类的run方法中随意打上断点
3.2.2 开启Hive客户端Debug模式
$HIVE_HOME/bin/hive –debug
3.2.3 使用debug模式启动本地项目
3.2.4 在Hive客户端中执行HQL,切换到IDEA中查看
1)在IDEA中查看断点
2)在Hive Debug模式客户端查看
说明:此文档部分图片来自于美团帖子
链接:https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html
标签:总结,...,tsk,Hive3,ctx,源码,new,pCtx,2.5 From: https://www.cnblogs.com/kkk247843405/p/16884845.html