首页 > 其他分享 >DStream与RDD关系

DStream与RDD关系

时间:2024-01-23 17:36:38浏览次数:19  
标签:关系 compute parent RDD time DStream def

RDD是怎么生成的?

RDD依靠什么生成?根据DStream来的

RDD生成的依据是什么?

Spark Streaming中RDD的执行是否和Spark Core中的RDD执行有所不同?

运行之后我们对RDD怎么处理?

ForEachDStream不一定会触发Job的执行,但是它一定会触发job的产生,和Job是否执行没有关系;

问:RDD依靠什么生成的?

      下面以案例来研究RDD是依靠DStream产生的

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
    val sc = new StreamingContext(conf, Duration(3000))
    val streamDs: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 9999)//输入的DStream
    val words: DStream[String] = streamDs.flatMap(_.split(" "))//输入和输出之间的都是transformation的DStream
    val word: DStream[(String, Int)] = words.map((_, 1))
    val wordCount: DStream[(String, Int)] = word.reduceByKey((x, y) => {
      x + y
    })
    wordCount.print()// 内部会导致Action级别的触发  print()输出的DStream
    sc.start()
    sc.awaitTermination()

  }

从代码中分析代码中此案例依次产生了如下DStream,并且它们是从后往前依赖的:

ReceiverInputDStream-》new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))-》new MappedDStream(this, context.sparkContext.clean(mapFunc))-》new ShuffledDStream[K, V, C](self,cleanedCreateCombiner,cleanedMergeValue,cleanedMergeCombiner,partitioner, mapSideCombine)-》ForEachDStream
简化一下就是:ReceiverInputDStream-->FlatMappedDStream-->MappedDStream-->ShuffledDStream-->ForEachDStream
那么怎么证明Stream之间是相互依赖的,我们挑选一个Dstream为入口进行分析,比如MappedDStream:
  /** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }

MappedDStream类中
private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  MappedDStream类中的compute方法会获取parent dstream,然后基于其结果进行map操作,mapFunc就是我们需要传入的业务逻辑,这就证明了dstream的依赖关系 } }
问:DStream为什么要从后往前依赖呢?
因为DStream代表Spark Streaming业务逻辑,RDD是从后往前依赖的,DStream是lazy级别的。DStream的依赖关系必须和RDD的依赖关系保持高度一致
A DStream internally is characterized by a few basic properties:
A list of other DStreams that the DStream depends on
A time interval at which the DStream generates an RDD
A function that is used to generate an RDD after each time interval
大致意思是:

   1.DStream依赖于其他DStream,除了第一个DStream,因为第一个DStream基于数据源产生,用于接收数据,所以无其他依赖;进一步证明了DStream是从后往前依赖!!

  2.基于DStream怎么产生RDD?每隔BatchDuration,DStream生成一个RDD;

   3.每隔BatchDuration,DStream内部函数会生成RDD

abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
// RDDs generated, marked as private[streaming] so that testsuites can access it
//DStream是RDD的模板,每隔一个batchInterval会根据DStream模板生成一个对应的RDD。然后将RDD存储到DStream中的generatedRDDs数据结构中
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
}
 

到此,我们验证了RDD是DStream是产生的结论!

下一节我们分析DStream是到底怎么生成RDD的?

stream中RDD的生成:

//DStream是RDD的模板,每隔一个batchInterval会根据DStream模板生成一个对应的RDD。然后将RDD存储到DStream中的generatedRDDs数据结构中
 @transient
 private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()  generatedRDDs在哪里被实例化的?搞清楚了这里的HashMap在哪里被实例化的话,就知道RDD是怎么产生的

1.进入DStream的getOrCompute方法:

  /**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.先根据时间判断HashMap中是否已存在该时间对应的RDD,如果没有则调用compute得到RDD,并放入到HashMap中
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD/看缓存中是否有,有的话直接获取
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            compute(time)
      //根据时间计算产生RDD } }     //rddOption里面有RDD生成的逻辑,然后生成的RDD,会put到generatedRDDs中 rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }

进入compute方法,发现其并没有具体的实现,说明在其子类中有重写并生成rdd

/** Method that generates a RDD for the given time */
def compute(validTime: Time): Option[RDD[T]]

2.进入ReceiverInputDStream的compute方法

  /**
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
// 创建并返回BlockRDD,由于ReceiverInputDStream没有父依赖,所以自己生成RDD。<br>        // 如果没有输入数据会产生一系列空的RDD<br>        createBlockRDD(validTime, blockInfos)
createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }

 

注意:Spark Streaming实际上在没有输入数据的时候仍然会产生RDD(空的BlockRDD),所以可以在此修改源码,提升性能。反过来仔细思考一下,流处理实际上就是时间极短的情况下完成的批处理!!

3.再进入MappedDStream的compute方法

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {
  <br>  //除了第一个DStream产生RDD之外,其他的DStream都是从前面DStream产生的RDD开始计算
  override def dependencies: List[DStream[_]] = List(parent)
 
  override def slideDuration: Duration = parent.slideDuration
   
  override def compute(validTime: Time): Option[RDD[U]] = {   
}

    // getOrCompute是对RDD进行操作,后面的map就是对RDD进行操作
    // DStream里面的计算其实是对RDD进行计算,而mapFunc就是我们要操作的具体业务逻辑

 
  parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}

 


4.进入ForEachDStream的compute的方法:


  发现其compute方法没有任何操作,但是重写了generateJob方法!

private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {
 
  override def dependencies: List[DStream[_]] = List(parent)
 
  override def slideDuration: Duration = parent.slideDuration
 
  override def compute(validTime: Time): Option[RDD[Unit]] = None
 
  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

    //此时考虑jobFunc中一定有action操作
              //因此jobFunc被调用的时候就会触发action操

5.从Job生成入手,JobGenerator的generateJobs方法,内部调用的DStreamGraph的generateJobs方法:

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {<br>      <em>//根据特定的时间获取具体的数据</em>
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch<br><em>      //调用DStreamGraph的generateJobs生成Job</em>
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

 DStreamGraph的generateJobs方法调用了OutputStream的generateJob方法,OutputStream就是ForEachDStream:

def generateJobs(time: Time): Seq[Job] = {
   logDebug("Generating jobs for time " + time)
   val jobs = this.synchronized {
     outputStreams.flatMap { outputStream =>
       val jobOption = outputStream.generateJob(time)
       jobOption.foreach(_.setCallSite(outputStream.creationSite))
       jobOption
     }
   }
   logDebug("Generated " + jobs.length + " jobs for time " + time)
   jobs
 }

总结:DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例。DStream的依赖构成了RDD依赖关系,即从后往前计算时,只要对最后一个DStream计算即可。JobGenerator每隔BatchDuration调用DStreamGraph的generateJobs方法,调用了ForEachDStream的generateJob方法,其内部先调用父DStream的getOrCompute方法来获取RDD,然后在进行计算,从后往前推,第一个DStream是ReceiverInputDStream,其comput方法中从receiverTracker中获取对应时间段的metadata信息,然后生成BlockRDD对象,并放入到generatedRDDs中!!

转载:https://www.cnblogs.com/game-bigdata/p/5521660.html


标签:关系,compute,parent,RDD,time,DStream,def
From: https://www.cnblogs.com/huifeidezhuzai/p/17982959

相关文章

  • SQL构建表层次关系,递归累加数据
     构建表的上下级关系      有一个需求,表中数据没有关系,如同一个类型的,有多个出库时间。代码--构建表的上下级关系--可以对同一个产品的,有层次关系--使用ROW_NUMBER(),来构建,最上上一级为0INSERTINTOStock([no]--编号,[quantity]......
  • 实验 4 RDD 编程初级实践
    请到本教程官网的“下载专区”的“数据集”中下载chapter5-data1.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:(1)该系总共有多少学生;(2)该系共开设来多少门课程;  (3)Tom同学的总成绩平均分是多少;(4)求每名同学的选修的课程门数;(Ford,3)(Enoch,3)(Kim,4)(C......
  • 一篇简短的文章把C++左右值关系讲的透透彻彻
     目录前言C++左值和右值二、右值引用二、右值引用 前言对于C++的左值和右值相信很多人都没有一个很透彻的了解,之前我也是不懂的时候查阅了好多文章,但是讲完我还是一头雾水,直到我遇到一篇宝藏文章,讲的左值右值的关系以及Move函数的用法是相当的清楚,文章链接......
  • Linux系统文件属性按列划分以及各个字段的关系及作用
    按列划分s-li#-i显示inode号码33575030-rw-r--r--1rootroot881Mar609:151.log1.第一列:33575030inode号码2.第二列:-文件类型3.第三列:rw-r--r--九位文件权限rwx4.第四列:1文件的硬链接个数5.第五列:root文件......
  • 鸿蒙原生应用/元服务实战-AGC中几个菜单栏的关系
    大家是否清楚AGC这几个菜单栏的相互关系?我的元服务:点击后跳转到“我的应用”列表中的“HarmonyOS”页签,并且过滤出元服务。开发者可以在此模块中管理和运营元服务,例如创建元服务、发布元服务等。我的应用:开发者可以在此模块中管理和运营应用,例如创建应用、配置应用信息、发布应用、......
  • 进程线程关系
    1、什么是进程什么是进程呢?进程是程序的一次启动执行。什么是程序呢?程序是存放在硬盘中的可执行文件,主要包括代码指令和数据。一个进程是一个程序的一次启动和执行,是操作系统将程序装入内存,给程序分配必要的系统资源,并且开始运行程序的指令。进程与程序是什么关系呢?同一......
  • Python、Anaconda、PyCharm和终端的关系及其作用
    Python是一种高级编程语言,广泛应用于数据分析、科学计算、Web开发等领域。为了便于开发和运行Python程序,我们通常会使用一些工具和环境。其中,Anaconda是一个Python发行版,提供了大量的科学计算和数据处理库;PyCharm是一款强大的Python集成开发环境(IDE);终端(或命令行)则是执行Python代码......
  • MySQL的聚簇索引,非聚簇索引,主键索引,唯一索引和普通索引关系
    关系简述MySQL聚簇索引只有一个,优先primarykey,没有就是uniquekey,两个都没有,innoDB自动生成GEN_CLUST_INDEX。唯一索引有可能是非聚簇的,也有可能聚簇的。唯一索引能建多个,是非聚簇的,也能为空,能多个都是空,但是不能重复。和普通索引区别在于不能重复。如果建立了主键索引,那么......
  • IntelliJ IDEA快速查询maven依赖关系
    IntelliJIDEA快速查询maven依赖关系1.在Maven窗口中点击Dependencies->showDependencies2.得到依赖关系图此时原有快捷键Ctrl+f可以查询jar包,如果没有查询菜单出来则设置快捷键方式为File->Settings->Keymap->搜索栏输入find->在MainMenu下Edit下Find下Find双击算则Ad......
  • 【论文笔记#2】Farseg++:用于高空间分辨率遥感图像地理空间对象分割的前景感知关系网络
    论文来源IEEETransactionsonPatternAnalysisandMachineIntelligence作者ZhuoZheng;YanfeiZhong;JunjueWang等发表年代2023使用方法多分支金字塔编码、前景-场景关系、前景感知解码、前景感知优化期刊层次CCFA;计算机科学1区;IF23.6原文链接......