RDD
RDD初始参数:上下文和一组依赖
1. abstract class
2. @transient private
3. @transient private
4. extends
以下需要仔细理清:
A list of Partitions
Function to compute split (sub RDD impl)
A list of Dependencies
Partitioner for K-V RDDs (Optional)
Preferred locations to compute each spliton (Optional)
Dependency
Dependency代表了RDD之间的依赖关系,即血缘
RDD中的使用
RDD给子类提供了getDependencies方法来制定如何依赖父类RDD
1. protected
事实上,在获取first parent的时候,子类经常会使用下面这个方法
1. protected[spark] def firstParent[U: ClassTag] = {
2. dependencies.head.rdd.asInstanceOf[RDD[U]]
3. }
可以看到,Seq里的第一个dependency应该是直接的parent,从而从第一个dependency类里获得了rdd,这个rdd就是父RDD。
一般的RDD子类都会这么实现compute和getPartition方法,以SchemaRDD举例:
1. override def compute(split: Partition, context: TaskContext): Iterator[Row] =
2. firstParent[Row].compute(split, context).map(_.copy())
3.
4. override def getPartitions: Array[Partition] = firstParent[Row].partitions
compute()方法调用了第一个父类的compute,把结果RDD copy返回
getPartitions返回的就是第一个父类的partitions
下面看一下Dependency类及其子类的实现。
宽依赖和窄依赖
1. abstract class Dependency[T](val rdd: RDD[T]) extends
Dependency里传入的rdd,就是父RDD本身。
继承结构如下:
NarrowDependency代表窄依赖,即父RDD的分区,最多被子RDD的一个分区使用。所以支持并行计算。
子类需要实现方法:
1. def getParents(partitionId: Int): Seq[Int]
OneToOneDependency表示父RDD和子RDD的分区依赖是一对一的。
RangeDependency表示在一个range范围内,依赖关系是一对一的,所以初始化的时候会有一个范围,范围外的partitionId,传进去之后返回的是Nil。
下面介绍宽依赖。
1. class
2. @transient
3. val partitioner: Partitioner,
4. null)
5. extends
6.
7. // 上下文增量定义的Id
8. val shuffleId: Int = rdd.context.newShuffleId()
9.
10. // ContextCleaner的作用和实现在SparkContext章节叙述
11. this))
12. }
宽依赖针对的RDD是KV形式的,需要一个partitioner指定分区方式(下一节介绍),需要一个序列化工具类,序列化工具目前的实现如下:
宽依赖和窄依赖对失败恢复时候的recompute有不同程度的影响,宽依赖可能是要全部计算的。
Partition
Partition具体表示RDD每个数据分区。
Partition提供trait类,内含一个index和hashCode()方法,具体子类实现与RDD子类有关,种类如下:
在分析每个RDD子类的时候再涉及。
Partitioner
Partitioner决定KV形式的RDD如何根据key进行partition
1. abstract class Partitioner extends
2. // 总分区数
3. def getPartition(key: Any): Int
4. }
在ShuffleDependency里对应一个Partitioner,来完成宽依赖下,子RDD如何获取父RDD。
默认Partitioner
Partitioner的伴生对象提供defaultPartitioner方法,逻辑为:
传入的RDD(至少两个)中,遍历(顺序是partition数目从大到小)RDD,如果已经有Partitioner了,就使用。如果RDD们都没有Partitioner,则使用默认的HashPartitioner。而HashPartitioner的初始化partition数目,取决于是否设置了spark.default.parallelism,如果没有的话就取RDD中partition数目最大的值。
如果上面这段文字看起来费解,代码如下:
1. def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
2. val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
3. for (r <- bySize if
4. return
5. }
6. if (rdd.context.conf.contains("spark.default.parallelism")) {
7. new
8. else
9. new
10. }
11. }
HashPartitioner
HashPartitioner基于java的Object.hashCode。会有个问题是Java的Array有自己的hashCode,不基于Array里的内容,所以RDD[Array[_]]或RDD[(Array[_], _)]使用HashPartitioner会有问题。
顾名思义,getPartition方法实现如下
1. def getPartition(key: Any): Int = key match {
2. case null => 0
3. case
4. }
RangePartitioner
RangePartitioner处理的KV RDD要求Key是可排序的,即满足Scala的Ordered[K]类型。所以它的构造如下:
1. class
2. partitions: Int,
3. @transient
4. private val ascending: Boolean = true)
5. extends
内部会计算一个rangBounds(上界),在getPartition的时候,如果rangBoundssize小于1000,则逐个遍历获得;否则二分查找获得partitionId。
Persist
默认cache()过程是将RDD persist在内存里,persist()操作可以为RDD重新指定StorageLevel,
1. class StorageLevel private(
2. private
3. private
4. private
5. private
6. private var replication_ : Int = 1)
1. object StorageLevel {
2. new StorageLevel(false, false, false, false)
3. new StorageLevel(true, false, false, false)
4. new StorageLevel(true, false, false, false, 2)
5. new StorageLevel(false, true, false, true)
6. new StorageLevel(false, true, false, true, 2)
7. new StorageLevel(false, true, false, false)
8. new StorageLevel(false, true, false, false, 2)
9. new StorageLevel(true, true, false, true)
10. new StorageLevel(true, true, false, true, 2)
11. new StorageLevel(true, true, false, false)
12. new StorageLevel(true, true, false, false, 2)
13. new StorageLevel(false, false, true, false) // Tachyon
RDD的persist()和unpersist()操作,都是由SparkContext执行的(SparkContext的persistRDD和unpersistRDD方法)。
Persist过程是把该RDD存在上下文的TimeStampedWeakValueHashMap里维护起来。也就是说,其实persist并不是action,并不会触发任何计算。
Unpersist过程如下,会交给SparkEnv里的BlockManager处理。
1. private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
2. env.blockManager.master.removeRdd(rddId, blocking)
3. persistentRdds.remove(rddId)
4. listenerBus.post(SparkListenerUnpersistRDD(rddId))
5. }
Checkpoint
RDD Actions api里提供了checkpoint()方法,会把本RDD save到SparkContext CheckpointDir
目录下。建议该RDD已经persist在内存中,否则需要recomputation。
如果该RDD没有被checkpoint过,则会生成新的RDDCheckpointData。RDDCheckpointData类与一个RDD关联,记录了checkpoint相关的信息,并且记录checkpointRDD的一个状态,
[ Initialized --> marked for checkpointing--> checkpointing in progress --> checkpointed
内部有一个doCheckpoint()方法(会被下面调用)。
执行逻辑
真正的checkpoint触发,在RDD私有方法doCheckpoint()里。doCheckpoint()会被DAGScheduler调用,且是在此次job里使用这个RDD完毕之后,此时这个RDD就已经被计算或者物化过了。可以看到,会对RDD的父RDD进行递归。
1. private[spark] def doCheckpoint() {
2. if
3. true
4. if
5. checkpointData.get.doCheckpoint()
6. else
7. dependencies.foreach(_.rdd.doCheckpoint())
8. }
9. }
10. }
RDDCheckpointData的doCheckpoint()方法关键代码如下:
1. // Create the output path for the checkpoint
2. val path = new Path(rdd.context.checkpointDir.get, "rdd-"
3. val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
4. if
5. throw new SparkException("Failed to create checkpoint path "
6. }
7.
8. // Save to file, and reload it as an RDD
9. val broadcastedConf = rdd.context.broadcast(
10. new
11. // 这次runJob最终调的是dagScheduler的runJob
12. rdd.context.runJob(rdd,
13. CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
14. // 此时rdd已经记录到磁盘上
15. val newRDD = new
16. if
17. throw new SparkException("xxx")
18. }
runJob最终调的是dagScheduler的runJob。做完后,生成一个CheckpointRDD。
具体CheckpointRDD相关内容可以参考其他章节。
API
子类需要实现的方法
1. // 计算某个分区
2. def compute(split: Partition, context: TaskContext): Iterator[T]
3.
4. protected
5. // 依赖的父RDD,默认就是返回整个dependency序列
6. protected
7.
8. protected
Transformations
略。
Actions
SubRDDs
部分RDD子类的实现分析,包括以下几个部分:
1)子类本身构造参数
2)子类的特殊私有变量
3)子类的Partitioner实现
4)子类的父类函数实现
1. def compute(split: Partition, context: TaskContext): Iterator[T]
2. protected
3. protected
4. protected
CheckpointRDD
1. class
2. extends
CheckpointRDDPartition继承自Partition,没有什么增加。
有一个被广播的hadoop conf变量,在compute方法里使用(readFromFile的时候用)
1. val broadcastedConf = sc.broadcast(
2. new
getPartitions: Array[Partition]方法:
根据checkpointPath去查看Path下有多少个partitionFile,File个数为partition数目。getPartitions方法返回的Array[Partition]内容为New CheckpointRDDPartition(i),i为[0, 1, …, partitionNum]
getPreferredLocations(split:Partition): Seq[String]方法:
文件位置信息,借助hadoop core包,获得block location,把得到的结果按照host打散(flatMap)并过滤掉localhost,返回。
compute(split: Partition, context:TaskContext): Iterator[T]方法:
调用CheckpointRDD.readFromFile(file, broadcastedConf,context)方法,其中file为hadoopfile path,conf为广播过的hadoop conf。
Hadoop文件读写及序列化
伴生对象提供writeToFile方法和readFromFile方法,主要用于读写hadoop文件,并且利用env下的serializer进行序列化和反序列化工作。两个方法具体实现如下:
1. def writeToFile[T](
2. path: String,
3. broadcastedConf: Broadcast[SerializableWritable[Configuration]],
4. 1
5. )(ctx: TaskContext, iterator: Iterator[T]) {
创建hadoop文件的时候会若存在会抛异常。把hadoop的outputStream放入serializer的stream里,serializeStream.writeAll(iterator)写入。
writeToFile的调用在RDDCheckpointData类的doCheckpoint方法里,如下:
1. rdd.context.runJob(rdd,
2. CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
1. def readFromFile[T](
2. path: Path,
3. broadcastedConf: Broadcast[SerializableWritable[Configuration]],
4. context: TaskContext
5. ): Iterator[T] = {
打开Hadoop的inutStream,读取的时候使用env下的serializer得到反序列化之后的流。返回的时候,DeserializationStream这个trait提供了asIterator方法,每次next操作可以进行一次readObject。
在返回之前,调用了TaskContext提供的addOnCompleteCallback回调,用于关闭hadoop的inputStream。
NewHadoopRDD
1. class
2. sc : SparkContext,
3. inputFormatClass: Class[_ <: InputFormat[K, V]],
4. keyClass: Class[K],
5. valueClass: Class[V],
6. @transient
7. extends
8. with SparkHadoopMapReduceUtil
1. private[spark] class
2. rddId: Int,
3. val index: Int,
4. @transient
5. extends
6.
7. new
8.
9. 41 * (41
10. }
getPartitions操作:
根据inputFormatClass和conf,通过hadoop InputFormat实现类的getSplits(JobContext)方法得到InputSplits。(ORCFile在此处的优化)
这样获得的split同RDD的partition直接对应。
compute操作:
针对本次split(partition),调用InputFormat的createRecordReader(split)方法,
得到RecordReader<K,V>。这个RecordReader包装在Iterator[(K,V)]类内,复写Iterator的next()和hasNext方法,让compute返回的InterruptibleIterator[(K,V)]能够被迭代获得RecordReader取到的数据。
getPreferredLocations(split: Partition)操作:
1. theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
在NewHadoopPartition里SerializableWritable将split序列化,然后调用InputSplit本身的getLocations接口,得到有数据分布节点的nodes name列表。
WholeTextFileRDD
NewHadoopRDD的子类
1. private[spark] class
2. sc : SparkContext,
3. inputFormatClass: Class[_ <: WholeTextFileInputFormat],
4. keyClass: Class[String],
5. valueClass: Class[String],
6. @transient
7. minSplits: Int)
8. extends
复写了getPartitions方法:
NewHadoopRDD有自己的inputFormat实现类和recordReader实现类。在spark/input package下专门写了这两个类的实现。感觉是种参考。
InputFormat
WholeTextFileRDD在spark里实现了自己的inputFormat。读取的File以K,V的结构获取,K为path,V为整个file的content。
复写createRecordReader以使用WholeTextFileRecordReader
复写setMaxSplitSize方法,由于用户可以传入minSplits数目,计算平均大小(splits files总大小除以split数目)的时候就变了。
RecordReader
复写nextKeyValue方法,会读出指定path下的file的内容,生成new Text()给value,结果是String。如果文件正在被别的进行打开着,会返回false。否则把file内容读进value里。
使用场景
在SparkContext下提供wholeTextFile方法,
1. def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits):
2. RDD[(String, String)]
用于读取一个路径下的所有text文件,以K,V的形式返回,K为一个文件的path,V为文件内容。比较适合小文件。
标签:Core,false,rdd,子类,RDD,源码,new,true From: https://blog.51cto.com/u_2650279/6950073