分析源码步骤:
第一步程序入口:
第二步一直查看runjob方法,可以看出collect()是RDD行动算子,与Job运行提交相关
rdd.scala
sparkcontext.scala
sparkcontext.scala
sparkcontext.scala
第三步runJob()与DAG调度有关
sparkcontext.scala
第四步runJob()核心代码 - -查看其中提交作业submitJob()的代码
DAGScheduler.scala
第五步:搜索handleJobSubmitted,handleJobSubmitted中createResultStage()方法会创建ResultStage,即为最后一个阶段finalStage。补充:每一个行动算子都会调用runJob(),最后会new ActiveJob
DAGScheduler.scala
DAGScheduler.scala
DAGScheduler.scala
第六步createResultStage()方法中,先调用getOrCreateParentStages(),获得或创建父阶段,因为只有父阶段先执行完,才会执行当前的阶段。然后再创建ResultStage
DAGScheduler.scala
第七步:核心代码:进入getOrCreateParentStages(),调用getShuffleDependencies()返回值是HashSet,存放是的依赖关系,再对每一个shuffleDep,调用getOrCreateShuffleMapStage()创建shuffle阶段。即一个shuffle依赖就会创建一个shuffle阶段
DAGScheduler.scala
DAGScheduler.scala中 getShuffleDependencies此方法是获取依赖关系
第八步:进入getOrCreateShuffleMapStage(),调用createShuffleMapStage(),创建shuffle阶段new ShuffleMapStage
DAGScheduler.scala
DAGScheduler.scala
标签:sparkcontext,调用,shuffle,scala,RDD,划分,DAGScheduler,阶段,Spark From: https://www.cnblogs.com/huifeidezhuzai/p/18025534