首页 > 编程语言 >spark中的持久化机制以及lineage和checkpoint(简含源码解析)

spark中的持久化机制以及lineage和checkpoint(简含源码解析)

时间:2023-01-07 02:55:24浏览次数:35  
标签:lineage 存储 originalRDD rdd checkpoint RDD 源码 化机制

spark相比MapReduce最大的优势是,spark是基于内存的计算模型,有的spark应用比较复杂,如果中间出错了,那么只能根据lineage从头开始计算,所以为了避免这种情况,spark提供了两种持久化算子,如果用术语回答,持久化的目的就是为了容错。

存储级别

spark贴心地提供了多种存储级别供选择

这里解释一下StorageLevel的参数是什么
1._useDisk
2._useMemory
3._useOffHeap
4._deserialized
5._replication 默认值为1
这里解释一下什么事OffHeap,我们知道一般来java中对象是分配到jvm中的堆中,而OffHeap就是堆外内存,这片内存直接被操作系统管理而不是受jvm管理,这样的话可以减少GC的影响。
再解释下deserialized,是serialized的反过程,序列化和反序列化指得是对象与字节码文件间转换的过程。

如何选择存储级别

java、scala中的存储级别可以认为是DISK,MEMORY,SERIABLIZED和副本数之间排列组合,如何进行选择在官方文档上(文档地址可以直接搜spark或者去我上篇RDD算子介绍的博客中进入),我这里直接对原文进行硬翻

  1. 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行

  2. 如果内存不能全部存储 RDD,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快

  3. 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。

  4. 如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。

cache和persist算子的区别


可以看出,cache就是persist的偷懒版本,persist可以自定义StorageLevel,但是cache就是默认的MEMORY_ONLY
观察persist的源码

源码本身没什么好讲的,除了那个localcheckPointed,这个可以按下不表,在spark持久化中还有两个重要的概念,
checkpoint, lineage

checkpoint和lineage

checkpoint和lineage都是RDD容错的手段
lineage相当于族谱,记录了RDD变化的过程,这一点与mysql的redo log差不多,在发生错误的时候,通过lineage来重新运算获取数据,lineage描述的是RDD的变化链,有两种路径,一种是窄依赖,另一种是宽依赖,这两种依赖可以想像为独生和超生,一个父只对应一个孩子是窄依赖,一个父对应多个孩子叫做宽依赖。
checkpoint机制是来弥补lineage中宽依赖带来的冗余计算问题(理想中一个孩子丢了只需要“生”出丢失的孩子即可,但是实操如果要复原,只能父重新计算出所有孩子,带来了计算冗余”),对这种宽依赖或者说触发shuffle的算子加checkpoint是可以考虑的事情。checkpoint就是将数据导入可靠的具有容错的存储系统中,一般指定为hdfs,也可以是s3之类的,不过这样做的话会斩断依赖链

重要的代码也就是最后的赋值,这行代码的意思是标记这个RDD需要checkpointing,但是这个并没有立即执行,需要后面action触发才能执行,也就是

中的最后一行(runJob函数是每个action算子中必定执行的函数)


这段代码的意思一个RDD初始时doCheckPointCalled为false,经过一轮后为true,checkpointData.isDefined为true,则说明rdd已经被checkPoint过了,则真正执行进行checkpointData的真实checkpoint()操作,而上面的checkpointAllMarkedAncestors表示此RDD的祖先被checkpoint都通过这个job触发处理,默认是false,也就是说

这个时候只会触发RDD4的checkpoint转存,而不会触发RDD1的转存
下面是真正执行checkpoint操作的函数

首先把cpState(RDDCheckpointData中的一个属性)置为CheckpointingInProgress,然后进行doCheckPoint操作产生newRDD,这里的doCheckPoint操作并没有实现,那么就去寻找它的子类,这时应该想起

这里,ReliableRDDCheckpointData,这个就是RDDCheckPoint的子类,里面实现doCheckpoint操作

很明显,最重要的代码是 ReliableCheckpointRDD.writeRDDToCheckpointDirectory方法,下面给出它的代码,太长了就不截图了

点击查看代码
  def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val checkpointStartTimeNs = System.nanoTime()

    val sc = originalRDD.sparkContext

    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    }

    // Save to file, and reload it as an RDD
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }

    val checkpointDurationMs =
      TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
    logInfo(s"Checkpointing took $checkpointDurationMs ms.")

    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        "Checkpoint RDD has a different number of partitions from original RDD. Original " +
          s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
          s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
          s"${newRDD.partitions.length}].")
    }
    newRDD
  }
这个函数的作用就是将rdd写入checkpointDir中(其实是hdfs中一个路径),然后生产newRDD,newRDD就是以写入hdfs目录中的rdd数据生产的rdd

最后执行下半程序,更新rdd并斩断lineage,斩断也就是将rdd中dependency属性置为null,整个过程就结束了

总结就是,RDD在进行checkPoint操作的时候先进行标记,然后直到action算子中的runJob函数里的docheckPoint才真正执行checkpoint操作,而具体如何checkpoint,依靠子类ReliableRDDCheckpointData中重写的checkpoint函数进行更新和斩断,更新rdd根据写入hdfs中的rdd数据进行生成,而斩断就是将生成的rdd的dependency属性置位null

标签:lineage,存储,originalRDD,rdd,checkpoint,RDD,源码,化机制
From: https://www.cnblogs.com/spark-cc/p/17031953.html

相关文章