首页 > 编程语言 >Spark Core源码分析: RDD基础

Spark Core源码分析: RDD基础

时间:2023-08-03 16:01:57浏览次数:33  
标签:Core false rdd 子类 RDD 源码 new true


RDD

 

RDD初始参数:上下文和一组依赖



1. abstract class
2. @transient private
3. @transient private
4. extends



 

以下需要仔细理清:

A list of Partitions
Function to compute split (sub RDD impl)
A list of Dependencies
Partitioner for K-V RDDs (Optional)
Preferred locations to compute each spliton (Optional)

 

Dependency

 

Dependency代表了RDD之间的依赖关系,即血缘

 

RDD中的使用



RDD给子类提供了getDependencies方法来制定如何依赖父类RDD





1. protected



事实上,在获取first parent的时候,子类经常会使用下面这个方法



1. protected[spark] def firstParent[U: ClassTag] = {  
2.   dependencies.head.rdd.asInstanceOf[RDD[U]]  
3. }


可以看到,Seq里的第一个dependency应该是直接的parent,从而从第一个dependency类里获得了rdd,这个rdd就是父RDD。



一般的RDD子类都会这么实现compute和getPartition方法,以SchemaRDD举例:


1. override def compute(split: Partition, context: TaskContext): Iterator[Row] =  
2.     firstParent[Row].compute(split, context).map(_.copy())  
3.   
4. override def getPartitions: Array[Partition] = firstParent[Row].partitions



compute()方法调用了第一个父类的compute,把结果RDD copy返回

getPartitions返回的就是第一个父类的partitions

 

下面看一下Dependency类及其子类的实现。



宽依赖和窄依赖



1. abstract class Dependency[T](val rdd: RDD[T]) extends


Dependency里传入的rdd,就是父RDD本身。

继承结构如下:



Spark Core源码分析: RDD基础_Dependency



 



NarrowDependency代表窄依赖,即父RDD的分区,最多被子RDD的一个分区使用。所以支持并行计算。

子类需要实现方法:



1. def getParents(partitionId: Int): Seq[Int]



OneToOneDependency表示父RDD和子RDD的分区依赖是一对一的。

 

RangeDependency表示在一个range范围内,依赖关系是一对一的,所以初始化的时候会有一个范围,范围外的partitionId,传进去之后返回的是Nil。



下面介绍宽依赖。



1. class
2. @transient
3.     val partitioner: Partitioner,  
4. null)  
5. extends
6.   
7. // 上下文增量定义的Id
8.   val shuffleId: Int = rdd.context.newShuffleId()  
9.   
10. // ContextCleaner的作用和实现在SparkContext章节叙述
11. this))  
12. }




宽依赖针对的RDD是KV形式的,需要一个partitioner指定分区方式(下一节介绍),需要一个序列化工具类,序列化工具目前的实现如下:



Spark Core源码分析: RDD基础_Dependency_02



 



宽依赖和窄依赖对失败恢复时候的recompute有不同程度的影响,宽依赖可能是要全部计算的。

 

Partition



Partition具体表示RDD每个数据分区。

Partition提供trait类,内含一个index和hashCode()方法,具体子类实现与RDD子类有关,种类如下:



Spark Core源码分析: RDD基础_子类_03



在分析每个RDD子类的时候再涉及。



Partitioner



Partitioner决定KV形式的RDD如何根据key进行partition



Spark Core源码分析: RDD基础_Spark_04





1. abstract class Partitioner extends
2. // 总分区数
3.   def getPartition(key: Any): Int  
4. }



在ShuffleDependency里对应一个Partitioner,来完成宽依赖下,子RDD如何获取父RDD。



默认Partitioner



Partitioner的伴生对象提供defaultPartitioner方法,逻辑为:

传入的RDD(至少两个)中,遍历(顺序是partition数目从大到小)RDD,如果已经有Partitioner了,就使用。如果RDD们都没有Partitioner,则使用默认的HashPartitioner。而HashPartitioner的初始化partition数目,取决于是否设置了spark.default.parallelism,如果没有的话就取RDD中partition数目最大的值。

如果上面这段文字看起来费解,代码如下:




1. def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {  
2.   val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse  
3. for (r <- bySize if
4. return
5.   }  
6. if (rdd.context.conf.contains("spark.default.parallelism")) {  
7. new
8. else
9. new
10.   }  
11. }






HashPartitioner



HashPartitioner基于java的Object.hashCode。会有个问题是Java的Array有自己的hashCode,不基于Array里的内容,所以RDD[Array[_]]或RDD[(Array[_], _)]使用HashPartitioner会有问题。

 

顾名思义,getPartition方法实现如下



1. def getPartition(key: Any): Int = key match {  
2. case null => 0
3. case
4. }



RangePartitioner



RangePartitioner处理的KV RDD要求Key是可排序的,即满足Scala的Ordered[K]类型。所以它的构造如下:





1. class
2.     partitions: Int,  
3. @transient
4. private val ascending: Boolean = true)  
5. extends



内部会计算一个rangBounds(上界),在getPartition的时候,如果rangBoundssize小于1000,则逐个遍历获得;否则二分查找获得partitionId。



Persist



默认cache()过程是将RDD persist在内存里,persist()操作可以为RDD重新指定StorageLevel,




1. class StorageLevel private(  
2. private
3. private
4. private
5. private
6. private var replication_ : Int = 1)



1. object StorageLevel {  
2. new StorageLevel(false, false, false, false)  
3. new StorageLevel(true, false, false, false)  
4. new StorageLevel(true, false, false, false, 2)  
5. new StorageLevel(false, true, false, true)  
6. new StorageLevel(false, true, false, true, 2)  
7. new StorageLevel(false, true, false, false)  
8. new StorageLevel(false, true, false, false, 2)  
9. new StorageLevel(true, true, false, true)  
10. new StorageLevel(true, true, false, true, 2)  
11. new StorageLevel(true, true, false, false)  
12. new StorageLevel(true, true, false, false, 2)  
13. new StorageLevel(false, false, true, false) // Tachyon


RDD的persist()和unpersist()操作,都是由SparkContext执行的(SparkContext的persistRDD和unpersistRDD方法)。

 

Persist过程是把该RDD存在上下文的TimeStampedWeakValueHashMap里维护起来。也就是说,其实persist并不是action,并不会触发任何计算。

Unpersist过程如下,会交给SparkEnv里的BlockManager处理。



1. private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {  
2.   env.blockManager.master.removeRdd(rddId, blocking)  
3.   persistentRdds.remove(rddId)  
4.   listenerBus.post(SparkListenerUnpersistRDD(rddId))  
5. }



Checkpoint



RDD Actions api里提供了checkpoint()方法,会把本RDD save到SparkContext CheckpointDir

目录下。建议该RDD已经persist在内存中,否则需要recomputation。

 

如果该RDD没有被checkpoint过,则会生成新的RDDCheckpointData。RDDCheckpointData类与一个RDD关联,记录了checkpoint相关的信息,并且记录checkpointRDD的一个状态,

[ Initialized --> marked for checkpointing--> checkpointing in progress --> checkpointed

内部有一个doCheckpoint()方法(会被下面调用)。



执行逻辑



真正的checkpoint触发,在RDD私有方法doCheckpoint()里。doCheckpoint()会被DAGScheduler调用,且是在此次job里使用这个RDD完毕之后,此时这个RDD就已经被计算或者物化过了。可以看到,会对RDD的父RDD进行递归。

1. private[spark] def doCheckpoint() {  
2. if
3. true
4. if
5.       checkpointData.get.doCheckpoint()  
6. else
7.       dependencies.foreach(_.rdd.doCheckpoint())  
8.     }  
9.   }  
10. }



RDDCheckpointData的doCheckpoint()方法关键代码如下:


1. // Create the output path for the checkpoint
2. val path = new Path(rdd.context.checkpointDir.get, "rdd-"
3. val fs = path.getFileSystem(rdd.context.hadoopConfiguration)  
4. if
5. throw new SparkException("Failed to create checkpoint path "
6. }  
7.   
8. // Save to file, and reload it as an RDD
9. val broadcastedConf = rdd.context.broadcast(  
10. new
11. // 这次runJob最终调的是dagScheduler的runJob
12. rdd.context.runJob(rdd,   
13. CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)  
14. // 此时rdd已经记录到磁盘上
15. val newRDD = new
16. if
17. throw new SparkException("xxx")  
18. }



runJob最终调的是dagScheduler的runJob。做完后,生成一个CheckpointRDD。

具体CheckpointRDD相关内容可以参考其他章节。



API



子类需要实现的方法




    1. // 计算某个分区
    2. def compute(split: Partition, context: TaskContext): Iterator[T]  
    3.   
    4. protected
    5. // 依赖的父RDD,默认就是返回整个dependency序列
    6. protected
    7.   
    8. protected



    Transformations



    略。



     



    Actions



     



     



    SubRDDs



    部分RDD子类的实现分析,包括以下几个部分:

    1)子类本身构造参数

    2)子类的特殊私有变量

    3)子类的Partitioner实现

    4)子类的父类函数实现



    1. def compute(split: Partition, context: TaskContext): Iterator[T]  
    2. protected
    3. protected
    4. protected



    CheckpointRDD




    1. class
    2. extends



    CheckpointRDDPartition继承自Partition,没有什么增加。



    有一个被广播的hadoop conf变量,在compute方法里使用(readFromFile的时候用)



    1. val broadcastedConf = sc.broadcast(  
    2. new



    getPartitions: Array[Partition]方法:

    根据checkpointPath去查看Path下有多少个partitionFile,File个数为partition数目。getPartitions方法返回的Array[Partition]内容为New CheckpointRDDPartition(i),i为[0, 1, …, partitionNum]

     

    getPreferredLocations(split:Partition): Seq[String]方法:

    文件位置信息,借助hadoop core包,获得block location,把得到的结果按照host打散(flatMap)并过滤掉localhost,返回。

     

    compute(split: Partition, context:TaskContext): Iterator[T]方法:

    调用CheckpointRDD.readFromFile(file, broadcastedConf,context)方法,其中file为hadoopfile path,conf为广播过的hadoop conf。



    Hadoop文件读写及序列化



    伴生对象提供writeToFile方法和readFromFile方法,主要用于读写hadoop文件,并且利用env下的serializer进行序列化和反序列化工作。两个方法具体实现如下:



      1. def writeToFile[T](  
      2.  path: String,  
      3.  broadcastedConf: Broadcast[SerializableWritable[Configuration]],  
      4. 1
      5. )(ctx: TaskContext, iterator: Iterator[T]) {



      创建hadoop文件的时候会若存在会抛异常。把hadoop的outputStream放入serializer的stream里,serializeStream.writeAll(iterator)写入。

       

      writeToFile的调用在RDDCheckpointData类的doCheckpoint方法里,如下:



      1. rdd.context.runJob(rdd,   
      2. CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)



      1. def readFromFile[T](  
      2.   path: Path,  
      3.   broadcastedConf: Broadcast[SerializableWritable[Configuration]],  
      4.   context: TaskContext  
      5. ): Iterator[T] = {


      打开Hadoop的inutStream,读取的时候使用env下的serializer得到反序列化之后的流。返回的时候,DeserializationStream这个trait提供了asIterator方法,每次next操作可以进行一次readObject。

      在返回之前,调用了TaskContext提供的addOnCompleteCallback回调,用于关闭hadoop的inputStream。



      NewHadoopRDD



      1. class
      2.     sc : SparkContext,  
      3.     inputFormatClass: Class[_ <: InputFormat[K, V]],  
      4.     keyClass: Class[K],  
      5.     valueClass: Class[V],  
      6. @transient
      7. extends
      8.   with SparkHadoopMapReduceUtil



      1. private[spark] class
      2.     rddId: Int,  
      3.     val index: Int,  
      4. @transient
      5. extends
      6.   
      7. new
      8.   
      9. 41 * (41
      10. }


      getPartitions操作:

      根据inputFormatClass和conf,通过hadoop InputFormat实现类的getSplits(JobContext)方法得到InputSplits。(ORCFile在此处的优化)

      这样获得的split同RDD的partition直接对应。

       

      compute操作:

      针对本次split(partition),调用InputFormat的createRecordReader(split)方法,

      得到RecordReader<K,V>。这个RecordReader包装在Iterator[(K,V)]类内,复写Iterator的next()和hasNext方法,让compute返回的InterruptibleIterator[(K,V)]能够被迭代获得RecordReader取到的数据。

       

      getPreferredLocations(split: Partition)操作:



      1. theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")


      在NewHadoopPartition里SerializableWritable将split序列化,然后调用InputSplit本身的getLocations接口,得到有数据分布节点的nodes name列表。



       



      WholeTextFileRDD



      NewHadoopRDD的子类




        1. private[spark] class
        2.     sc : SparkContext,  
        3.     inputFormatClass: Class[_ <: WholeTextFileInputFormat],  
        4.     keyClass: Class[String],  
        5.     valueClass: Class[String],  
        6. @transient
        7.     minSplits: Int)  
        8. extends



        复写了getPartitions方法:

        NewHadoopRDD有自己的inputFormat实现类和recordReader实现类。在spark/input package下专门写了这两个类的实现。感觉是种参考。



         



        InputFormat



        WholeTextFileRDD在spark里实现了自己的inputFormat。读取的File以K,V的结构获取,K为path,V为整个file的content。

         

        复写createRecordReader以使用WholeTextFileRecordReader

         

        复写setMaxSplitSize方法,由于用户可以传入minSplits数目,计算平均大小(splits files总大小除以split数目)的时候就变了。



         



        RecordReader



        复写nextKeyValue方法,会读出指定path下的file的内容,生成new Text()给value,结果是String。如果文件正在被别的进行打开着,会返回false。否则把file内容读进value里。



         



        使用场景



        在SparkContext下提供wholeTextFile方法,



        1. def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits):  
        2.   RDD[(String, String)]



        用于读取一个路径下的所有text文件,以K,V的形式返回,K为一个文件的path,V为文件内容。比较适合小文件。

        标签:Core,false,rdd,子类,RDD,源码,new,true
        From: https://blog.51cto.com/u_2650279/6950073

        相关文章

        • asp.net core之异常处理
          在开发过程中,处理错误是一个重要的方面。ASP.NETCore提供了多种方式来处理错误,以确保应用程序的稳定性和可靠性。TryCatchTryCatch是最常见也是最基础的一种异常处理方式,只需要用TryCatch把执行代码包起来,即可捕获异常。格式如下:try{//执行操作doAny();}catch......
        • 国标GB28181平台LntonGBS(源码版)国标视频平台在连接MySQL数据库时提示“can’t connect
          LntonGBS国标视频云服务平台不仅支持无缝、完整接入内网或者公网的国标设备,还能够实现全平台、全终端输出。该平台支持将GB/T28181的设备/平台推送的PS流转成ES流,并提供RTSP、RTMP、FLV、HLS、WebRTC等多种格式视频流的分发服务,实现Web浏览器、手机浏览器、微信端、PC客户端等各终......
        • AddMvcCore,AddControllers,AddControllersWithViews,AddRazorPages的区别
          AddMvc/AddMvcCore/AddControllers等区别1.services.AddMvcCore()只注册运行 Controller/RazorPages 必要的核心服务,确保 Pipeline 程序可动作,其馀如像 DataAnnotationModelValidation、身分验证等服务要自己加挂,除有特殊客制需求,一般不太常用。2.services.AddControl......
        • 国标GB28181视频平台LntonGBS(源码版)国标平台在登录页添加登录验证的具体操作步骤
          国标视频云服务LntonGBS支持设备/平台通过国标GB28181协议注册接入,实现视频的实时监控直播、录像、检索与回看、语音对讲、云存储、告警、平台级联等功能。该平台部署简单,可拓展性强,能够将接入的视频流进行全终端、全平台分发,支持的视频流格式包括RTSP、RTMP、FLV、HLS、WebRTC等。......
        • 海外直播APP源码的开发给商家们带来了什么
          目前,在海外市场中,TikTok和Facebook无疑是最受欢迎的社交网络直播平台,尤其是欧美社交APP,虽然直播内容更注重娱乐性,但其他地区例如东南亚,海外社交直播渗透率还较低,那么开发海外直播APP源码给商家们带来了哪些好处利益呢?1.海外直播APP源码开发本身具有较大的优势可以提高海外用户的参......
        • 实验室LIMS系统源码
          实验室LIMS系统采用国际规范的业务管理流程和严格的质量控制体系,对每个检测流程节点采用“人、机、料、法、环、测”进行质量控制,可记录,可追溯。强大的数据查询和统计分析能力,提高工作效率;自动化地采集实验室原始数据及处理结果,可减轻繁重的手工抄录数据和减少人为干扰检测结果,使......
        • [回馈]ASP.NET Core MVC开发实战之商城系统(五)
          经过一段时间的准备,新的一期【ASP.NETCoreMVC开发实战之商城系统】已经开始,在之前的文章中,讲解了商城系统的整体功能设计,页面布局设计,环境搭建,系统配置,及首页【商品类型,banner条,友情链接,降价促销,新品爆款】,商品列表页面,商品详情等功能的开发,今天继续讲解购物车功能开发,仅供学习......
        • StoneDB 源码解读系列|Tianmu 引擎工具类模块源码详解(一)
          StoneDB源码解读系列文章正式开启,预计以周更的形式跟大家见面,请多多支持~本篇源码解读内容已进行直播分享,可在视频号观看直播回放,也可点击阅读原文跳转至B站观看回放视频。PPT内容可在社区论坛中查看下载:https://forum.stonedb.io/t/topic/89各个工具类属于Tianmu引擎的......
        • 【HMS Core】【Push Kit】每天只能收到两条推送、状态码80100018
          【问题描述1】每天只能收到2条推送消息,其余的都无法收到【解决方案】1、请是否开通了消息自分类,因为现在是有咨询营销类消息限制的。没有使用自分类权益的话默认是资讯营销类消息。https://developer.huawei.com/consumer/cn/doc/development/HMSCore-Guides/message-restriction-d......
        • 【HMS Core】位置服务逆地理编码请求错误问题
          【关键字】HMS、位置服务、逆地理编码【问题描述】有开发者反馈在集成位置服务-逆地理编码时,出现了请求报错的问题。后端请求逆地理编码错误{   "returnCode":"010010",   "returnDesc":"INVALID_REQUEST"}【问题分析】1、一开始认为是cp的请求参数有误,缺少了必选的参数。......