首页 > 其他分享 >Spark中RDD的特殊算子和重要概念

Spark中RDD的特殊算子和重要概念

时间:2023-07-04 19:46:41浏览次数:30  
标签:触发 Task checkpoint RDD 算子 Action Spark Stage

RDD特殊的算子

cache、persist

将数据缓存到内存,第一次触发Action,才会将数据放入内存,以后在触发Action,可以复用前面内存中缓存的数据,可以提升技术效率
cache和persist的使用场景:一个application多次触发Action,为了复用前面RDD的数据,避免反复读取HDFS(数据源)中的数据和重复计算,可以将数据缓存到内存或磁盘【executor所在的磁盘】,第一次触发action才放入到内存或磁盘,以后会缓存的RDD进行操作可以复用缓存的数据。
一个RDD多次触发Action缓存才有意义,如果将数据缓存到内存,内存不够,以分区位单位,只缓存部分分区的数据,cache底层调用persist,可以指定更加丰富的存储基本,支持多种StageLevel,可以将数据序列化,默认放入内存使用的是java对象存储,但是占用空间大,优点速度快,也可以使用其他的序列化方式
cache和persist方法,严格来说,不是Transformation,应为没有生成新的RDD,只是标记当前rdd要cache或persist

checkpoint

checkpoint使用场景:适合复杂的计算【机器学习、迭代计算】,为了避免中间结果数据丢失重复计算,可以将宝贵的中间结果保存到hdfs中,保证中间结果安全。
在调用rdd的checkpint方法之前,一定要指定checkpoint的目录sc.setCheckPointDir,指的HDFS存储目录,为保证中间结果安全,将数据保存到HDFS中
第一次触发Action,才做checkpoint,会额外触发一个job,这个job的目的就是将结果保存到HDFS中
如果RDD做了checkpoint,这个RDD以前的依赖关系就不在使用了,触发多次Action,checkpoint才有意义,多用于迭代计算
checkpoint严格的说,不是Transformation,只是标记当前RDD要做checkpoint

Spark中的一些重要概念

Application

使用SparkSubmit提交的个计算应用,一个Application中可以触发多次Action,触发一次Action产生一个Job,一个Application中可以有一到多个Job

Job

Driver向Executor提交的作业,触发一次Acition形成一个完整的DAG,一个DAG对应一个Job,一个Job中有一到多个Stage,一个Stage中有一到多个Task

DAG

概念:有向无环图,是对多个RDD转换过程和依赖关系的描述,触发Action就会形成一个完整的DAG,一个DAG对应一个Job

Stage

概念:任务执行阶段,Stage执行是有先后顺序的,先执行前的,在执行后面的,一个Stage对应一个TaskSet,一个TaskSet中的Task的数量取决于Stage中最后一个RDD分区的数量

Task

概念:Spark中任务最小的执行单元,Task分类两种,即ShuffleMapTask和ResultTask
Task其实就是类的实例,有属性(从哪里读取数据),有方法(如何计算),Task的数量决定并行度,同时也要考虑可用的cores

TaskSet

保存同一种计算逻辑多个Task的集合,一个TaskSet中的Task计算逻辑都一样,计算的数据不一样

任务执行原理分析

WordCount程序有多个RDD

该Job中,有多少个RDD,多少个Stage,多少个TaskSet,多个Task,Task的类型有哪些?

 object WordCount {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    val sc: SparkContext = new SparkContext(conf)
    
    sc.textFile(args(0))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_)
      .saveAsTextFile(args(1))
    
    sc.stop()
  }
}

RDD的数量分析

• 读取hdfs中的目录有两个输入切片,最原始的HadoopRDD的分区为2,以后没有改变RDD的分区数量,RDD的分区数量都是2
• 在调用reduceByKey方法时,有shuffle产生,要划分Stage,所以有两个Stage
• 第一个Stage的并行度为2,所以有2个Task,并且为ShuffleMapTask。第二个Stage的并行度也为2,所以也有2个Task,并且为ResultTask,所以一共有4个Task

spark的任务上次的逻辑计划图

image

下面的的物理执行计划图,会生成Task,生成的Task如下

image

Stage和Task的类型

Stage有两种类型,分别是ShuffleMapStage和ResultStage,ShuffleMapStage生成的Task叫做ShuffleMapTask,ResultStage生成的Task叫做ResultTask

• ShuffleMapTask
1.可以读取各种数据源的数据
2.可以读取Shuffle的中间结果(Shuffle Read)
3.为shuffle做准备,即应用分区器,将数据溢写磁盘(ShuffleWrite),后面一定还会有其他的Stage

• ResultTask
1.可以读取各种数据源的数据
2.可以读取Shuffle的中间结果(Shuffle Read)
3.是整个job中最后一个阶段对应的Task,一定会产生结果数据(就是将产生的结果返回的Driver或写入到外部的存储系统)

多种情况:
第一种:
image

第二种
image

第三种
image

标签:触发,Task,checkpoint,RDD,算子,Action,Spark,Stage
From: https://www.cnblogs.com/paopaoT/p/17526802.html

相关文章

  • Spark中RDD的Transformation算子
    RDD的Transformation算子mapmap算子的功能为做映射,即将原来的RDD中对应的每一个元素,应用外部传入的函数进行运算,返回一个新的RDDvalrdd1:RDD[Int]=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),2)valrdd2:RDD[Int]=rdd1.map(_*2)flatMapflatMap算子的功能为扁平......
  • 18、【SparkStreaming】object not serializable (class: org.apache.kafka.clients.c
    背景:当SparkStream连接kafka,消费数据时,报错:objectnotserializable(class:org.apache.kafka.clients.consumer.ConsumerRecord,value:ConsumerRecord分析:消费者的消费记录序列化出现了问题,需要正确的进行序列化。措施:在设置sparkconf的时候,指定序列化方式就可以解......
  • Spark使用Python开发和RDD
    使用PySpark配置python环境在所有节点上按照python3,版本必须是python3.6及以上版本yuminstall-ypython3修改所有节点的环境变量exportJAVA_HOME=/usr/local/jdk1.8.0_251exportPYSPARK_PYTHON=python3exportHADOOP_HOME=/bigdata/hadoop-3.2.1exportHADOOP_CONF_D......
  • Spark编程基础
    Scala编写Spark的WorkCount创建一个Maven项目在pom.xml中添加依赖和插件<!--定义的一些常量--><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><encoding>UTF-8</e......
  • Spark架构体系
    Spark架构体系StandAlone模式是spark自带的集群运行模式,不依赖其他的资源调度框架,部署起来简单。StandAlone模式又分为client模式和cluster模式,本质区别是Driver运行在哪里,如果Driver运行在SparkSubmit进程中就是Client模式,如果Driver运行在集群中就是Cluster模式standalonecl......
  • Spark简介
    SparkSpark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache的顶级项目,2014年5月发布spark1.0,2016年7月发布spark2.0,2020年6月18日发布spark3.0.0特点Hadoop的MapReduce作为第一代分布......
  • Spark基础
    Spark是一种基于内存的快捷、通用、可扩展的大数据分析引擎1.Spark模块SparkCore:Spark核心模块,包含RDD、任务调度、内存管理、错误恢复、与存储系统交互等SparkSQL:用于处理结构化数据的一个模块,提供了2个编程抽象:DataFrameDataSet,并且作为分布式SQL查询引擎的作用。他将H......
  • 算法岗必读中文-0天吃掉pyspark实战
    pyspark......
  • 如何在Databricks中使用Spark进行数据处理与分析
    目录《如何在Databricks中使用Spark进行数据处理与分析》随着大数据时代的到来,数据处理与分析变得越来越重要。在数据处理与分析过程中,数据的存储、处理、分析和展示是不可或缺的关键步骤。在数据处理与分析中,Spark是一个强大的开源计算框架,它可以处理大规模分布式数据集,并提......
  • Apache Spark教程_编程入门自学教程_菜鸟教程-免费教程分享
    教程简介ApacheSpark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UCBerkeleyAMPlab(加州大学伯克利分校的AMP实验室)所开源的类HadoopMapReduce的通用并行框架,Spark,拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存......