RDD特殊的算子
cache、persist
将数据缓存到内存,第一次触发Action,才会将数据放入内存,以后在触发Action,可以复用前面内存中缓存的数据,可以提升技术效率
cache和persist的使用场景:一个application多次触发Action,为了复用前面RDD的数据,避免反复读取HDFS(数据源)中的数据和重复计算,可以将数据缓存到内存或磁盘【executor所在的磁盘】,第一次触发action才放入到内存或磁盘,以后会缓存的RDD进行操作可以复用缓存的数据。
一个RDD多次触发Action缓存才有意义,如果将数据缓存到内存,内存不够,以分区位单位,只缓存部分分区的数据,cache底层调用persist,可以指定更加丰富的存储基本,支持多种StageLevel,可以将数据序列化,默认放入内存使用的是java对象存储,但是占用空间大,优点速度快,也可以使用其他的序列化方式
cache和persist方法,严格来说,不是Transformation,应为没有生成新的RDD,只是标记当前rdd要cache或persist
checkpoint
checkpoint使用场景:适合复杂的计算【机器学习、迭代计算】,为了避免中间结果数据丢失重复计算,可以将宝贵的中间结果保存到hdfs中,保证中间结果安全。
在调用rdd的checkpint方法之前,一定要指定checkpoint的目录sc.setCheckPointDir,指的HDFS存储目录,为保证中间结果安全,将数据保存到HDFS中
第一次触发Action,才做checkpoint,会额外触发一个job,这个job的目的就是将结果保存到HDFS中
如果RDD做了checkpoint,这个RDD以前的依赖关系就不在使用了,触发多次Action,checkpoint才有意义,多用于迭代计算
checkpoint严格的说,不是Transformation,只是标记当前RDD要做checkpoint
Spark中的一些重要概念
Application
使用SparkSubmit提交的个计算应用,一个Application中可以触发多次Action,触发一次Action产生一个Job,一个Application中可以有一到多个Job
Job
Driver向Executor提交的作业,触发一次Acition形成一个完整的DAG,一个DAG对应一个Job,一个Job中有一到多个Stage,一个Stage中有一到多个Task
DAG
概念:有向无环图,是对多个RDD转换过程和依赖关系的描述,触发Action就会形成一个完整的DAG,一个DAG对应一个Job
Stage
概念:任务执行阶段,Stage执行是有先后顺序的,先执行前的,在执行后面的,一个Stage对应一个TaskSet,一个TaskSet中的Task的数量取决于Stage中最后一个RDD分区的数量
Task
概念:Spark中任务最小的执行单元,Task分类两种,即ShuffleMapTask和ResultTask
Task其实就是类的实例,有属性(从哪里读取数据),有方法(如何计算),Task的数量决定并行度,同时也要考虑可用的cores
TaskSet
保存同一种计算逻辑多个Task的集合,一个TaskSet中的Task计算逻辑都一样,计算的数据不一样
任务执行原理分析
WordCount程序有多个RDD
该Job中,有多少个RDD,多少个Stage,多少个TaskSet,多个Task,Task的类型有哪些?
object WordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val sc: SparkContext = new SparkContext(conf)
sc.textFile(args(0))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_+_)
.saveAsTextFile(args(1))
sc.stop()
}
}
RDD的数量分析
• 读取hdfs中的目录有两个输入切片,最原始的HadoopRDD的分区为2,以后没有改变RDD的分区数量,RDD的分区数量都是2
• 在调用reduceByKey方法时,有shuffle产生,要划分Stage,所以有两个Stage
• 第一个Stage的并行度为2,所以有2个Task,并且为ShuffleMapTask。第二个Stage的并行度也为2,所以也有2个Task,并且为ResultTask,所以一共有4个Task
spark的任务上次的逻辑计划图
下面的的物理执行计划图,会生成Task,生成的Task如下
Stage和Task的类型
Stage有两种类型,分别是ShuffleMapStage和ResultStage,ShuffleMapStage生成的Task叫做ShuffleMapTask,ResultStage生成的Task叫做ResultTask
• ShuffleMapTask
1.可以读取各种数据源的数据
2.可以读取Shuffle的中间结果(Shuffle Read)
3.为shuffle做准备,即应用分区器,将数据溢写磁盘(ShuffleWrite),后面一定还会有其他的Stage
• ResultTask
1.可以读取各种数据源的数据
2.可以读取Shuffle的中间结果(Shuffle Read)
3.是整个job中最后一个阶段对应的Task,一定会产生结果数据(就是将产生的结果返回的Driver或写入到外部的存储系统)
多种情况:
第一种:
第二种
第三种