首页 > 其他分享 >Spark核心原理

Spark核心原理

时间:2023-02-03 09:44:05浏览次数:48  
标签:核心 RDD master executor spark 原理 Spark 节点

 1. 概要介绍

1.1 master节点和work节点

master和worker是物理节点  
  • spark集群有一个master节点和多个worker节点。Standalone模式下可以通过zookeeper对master做靠可用配置,当master宕机了之后重新选举一个master。
  • master节点常驻master守护进程,负责管理worker节点,我们从master节点提交应用
  • worker节点常驻worker守护进程,与master节点通信,并且管理executor进程。
  • 一台机器可以同时作为master和worker节点(举个例子:你有四台机器,你可以选择一台设置为master节点,然后剩下三台设为worker节点,也可以把四台都设为worker节点,这种情况下,有一个机器既是master节点又是worker节点)

1.2 driver与executor

driver与executor是进程 driver进程就是应用的main()函数并且构建SparkContext对象,当我们提交了应用之后,便会启动一个对应的driver进程,driver本身会根据设置的参数占有一定的资源(主要指cpu core和memory)。   driver可以运行在master上,也可以运行worker上(根据部署模式的不同)。driver首先会向集群管理者(standalone、yarn,mesos)申请spark应用所需的资源,也就是executor,然后集群管理者会根据spark应用所设置的参数在各个worker上分配一定数量的executor,每个executor都占用一定数量的cpu和memory。在申请到应用所需的资源以后,driver就开始调度和执行我们编写的应用代码了。driver进程会将我们编写的spark应用代码拆分成多个stage,每个stage执行一部分代码片段,并为每个stage创建一批tasks,然后将这些tasks分配到各个executor中执行。   executor进程宿主在worker节点上,一个worker可以有多个executor。每个executor持有一个线程池,每个线程可以执行一个task,executor执行完task以后将结果返回给driver,每个executor执行的task都属于同一个应用。此外executor还有一个功能就是为应用程序中要求缓存的 RDD 提供内存式存储,RDD 是直接缓存在executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算

1.3 application与client

  • 提交一次计算时,一般要提交一个jar包,包里会有一个main函数,这次计算任务叫application
  • 提交任务所在的那个机器就是client,client可以不属于集群内的节点
spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ --queue thequeue \ examples/target/scala-2.11/jars/spark-examples*.jar 10

1.4 运行架构

Spark运行架构基本由三部分组成,包括SparkContext(驱动程序)、ClusterManager(集群资源管理器)和Executor(任务执行进程)。 其中,SparkContext用于负责与ClusterManager通信,进行资源的申请、任务的分配和监控等,负责作业执行的全生命周期管理。ClusterManager提供了资源分配和管理,在不同的运行模式下,担任的角色有所不同,在Local与Standalone模式下由Master提供,在YARN运行模式下由Resource Manager担任,在Mesos运行模式下由Mesos Manager负责。当SparkContext对运行的作业划分并分配资源之后,会把任务发送Executor进行运行。 每个应用程序获取专属的Executor进程,这些进程在应用程序运行的过程中一直驻留,并以多线程方式运行任务。这种机制使得Driver的调度与Executor运行在不同的JVM中,进行了隔离。也意味着Spark不能跨Executor共享数据,除非将数据写入到外部存储系统。
  • Local运行模式:Spark运行在同一个进程中,该运行模式一般用于测试等用途。Spark默认就是Local运行模式。
  • Local-Cluster运行模式:Spark运行在同一台机器的多个进程中模拟集群,伪分布式运行流程同Standalone运行模式。
  • Standalone运行模式:正式的集群管理,资源调度由Master来完成,轻量。
  • YARN运行模式:YARN是大数据家族中专门用于资管管理的组件,YARN是集群模式比较重,Spark兼容了YARN的资源管理
  • Mesos运行模式:Apache旗下开源的类似YARN的资源管理组件,Mesos本身也是Master/Slave的集群模式

2 DEMO: WordCount

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}   object sparkTest { def main(args: Array[String]): Unit = { //1.创建sparkConf val sparkConf = new SparkConf().setMaster("spark://192.168.17.151:7077") .setAppName("WordCount")   //2.创建sparkContext,代表一个application,会在一个driver上运行(不一定是master) val sc = new SparkContext(sparkConf)   //3.读取数据 var rdd0:RDD[String] = sc.textFile("hdfs://192.168.17.151:9000/word.txt")   //4.执行分布式计算 var result:Array[(String,Int)] = rdd0.flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) .sortBy(_._2, false) .collect()   //8.打印结果 result.foreach(println(_)) } } 下面把第4步计算过程拆开(4-8步骤) //1.创建sparkConf val sparkConf = new SparkConf().setMaster("spark://192.168.17.151:7077") .setAppName("WordCount")   //2.创建sparkContext,代表一个application,会在一个driver上运行(不一定是master) val sc = new SparkContext(sparkConf)   //3.读取数据 var rdd0:RDD[String] = sc.textFile("hdfs://192.168.17.151:9000/word.txt")   //4.拆分数据 var rdd1:RDD[String] = rdd0.flatMap(_.split(" "))   //5.map var rdd2:RDD[(String,Int)] = rdd1.map((_,1))   //6.根据单词进行reduce,会执行shuffle操作 var rdd3:RDD[(String,Int)] = rdd2.reduceByKey(_+_)   //7.根据单词量排序 var rdd4:RDD[(String,Int)] = rdd3.sortBy(_._2, false)   //8.rdd的记录转成list发给driver,是一个action算子 var result:Array[(String,Int)] = rdd4.collect()   // 还可以 rdd4.saveAsTextFile("/output")   //9.打印结果 result.foreach(println(_))
  1. 编程模型

 
编号 概念 解释
01 RDD    
  • Resilient Distributed Dataset
  • 理解成单机程序中的List就可以,不过他是分布式存储的。
  • 常用RDD的类型:MapPartitionsRDD、ShuffledRDD、SchemaRDD
02 Operation RDD操作   RDD有四类操作(Operation):
  1. Create:把从内存和磁盘数据生成到RDD中
  • 从文件:sc.textFile(fileName:String),fileName可以是普通该文件如"file:\\xxx",也可以是这种hadoop文件如“hdfs:\\xxxx”
  • 从集合:sc.parallelize(list:Set[T])与sc.makeRDD(list:Set[T]),前者使用系统配置的分区数,后者使用自动计算的分区数。可以通过ORM从数据库读取实体集合后再生成RDD,一般如果数据比较多的情况下也是并行再多个Node上读取的。
  • 从其他RDD算子产生,包括Transformation如map(x->...)或Action如reduceByKey(v1,v2-> v1 + v2)
  1. Control:
  • rdd.cache(),比如首次从hadoop文件读取数据,然后缓存,下次再次计算时从缓存读数据,则会比直接从hadoop中读取性能大大提高。
  • rdd.persist(),也看成是一种缓存,只是缓存的选项比较多,一般时保存到磁盘上。如果选择缓存到内存中则等同于cache()。persit可以在宽依赖的时候设置checkpoint用于诊断。
  • 缓存的是RDD,Job清理之后RDD都会清除。下个Job再用到需要重新计算,所以使用cache避免重复计算。
  • application结束需要清除缓存的数据
  • Control操作时惰性操作
  1. Transformation:把RDD变换成新的RDD,是惰性操作并不立即执行
  2. Action:能够触发Spark运行的操作,是急迫性操作。一种是生成新的集合和变量,一种是把RDD保存到外部文件系统或数据库(区别于Control操作,这里是任务最终的保存)。
03 分区 (partition)
  • 每个RDD都会被划分成分布式的多个分区,保存到集群的多个节点
  • 一个节点可能有多个分区,一个分区一般和一个Task对应
04 算子 RDD四种操作中的Transformation与Action,spark中提供80多种算子
  1. Transformation(变换):
  • 把RDD变换成新的RDD,是惰性操作并不立即执行。等到有action算子才真正执行,是一个pipline。
  • 主要包括:map、mapPartitions、flatMap、filter、union、groupByKey、repartition、cache等。
  • RDD之间有依赖关系,根据父亲RDD生成子RDD时,是否父RDD的一条记录对应多个子RDD记录分成两种依赖情况:
    • 窄依赖(narrow dependency):如map、union等操作
    • 宽依赖(wide dependency):如groupByKey等操作,会产生大量的网络IO,内存交换等操作。
  1. Action(行动):是急迫性操作。分两类Action算子:
  • 一种是根据RDD生成scala普通的的集合或变量,返回给driver。如reduce、collect、show、count、foreach等。
  • 一种是把RDD保存到外部文件系统或数据库(区别于Control操作,这里是任务最终的保存)。如saveAsTextFile
05 DAGScheduler
  • 在driver上运行
  • 把一个application根据RDD的依赖关系生成Job、Stage、Task三级执行单位进行拆分,形成一个DAG
  • 并根据ShuffleDependency来进行stage的划分。stage包含多个tasks,个数由该stage的finalRDD决定,stage里面的task完全相同。
  • DAGScheduler完成stage的划分后基于每个Stage生成TaskSet,并提交给TaskScheduler。TaskScheduler负责具体的task的调度,在Executor上启动执行task。
06 Job
  • 当在程序中遇到一个action算子的时候,就会提交一个job,执行前面的一系列操作。
  • 通常一个application会有多个job,job之间是按照串行的方式执行的。一个job执行完成后,才会起下一个job。
  • Spark支持一个或多个Job失败后重新启动计算
  • 每当一个Job计算完成,其内部的所有RDD都会被清除,如果在下一个Job中有用到其他Job中的RDD,会引发该RDD的再次计算,为避免这种情况,我们可以使用Persist
07 Stage
  • Stage是执行的物理单位,且Stage节点组成了Job的DAG,所以一个Job通常会包括一个或多个Stage。
  • 一个job会有多个算子操作。这些算子都是将一个父RDD转换成子RDD。这个过程中,会有两种情况:父RDD中的数据是否进入不同的子RDD。如果一个父RDD的数据只进入到一个子RDD,比如map、union等操作,称之为narrow dependency(窄依赖)。否则,就会形成wide dependency( 宽依赖),一般也成为shuffle依赖,比如groupByKey等操作。
  • job中stage的划分就是根据shuffle依赖进行的。shuffle依赖是两个stage的分界点。shuffle操作一般都是任务中最耗时耗资源的部分。因为数据可能存放在HDFS不同的节点上,下一个stage的执行首先要去拉取上一个stage的数据(shuffle read操作),保存在自己的节点上,就会增加网络通信和IO。
  • 父RDD转成子RDD,形成的继承关系称之为血统
08 Task  
  • Stage继续往下分解,就是Task。Task应该是spark最细的执行单元了。Task的数量其实就是stage的并行度。
  • RDD在计算的时候,每个分区都会起一个task,所以rdd的分区数目决定了总的的task数目。每个Task执行的结果就是生成了目标RDD的一个partiton。
  • 在map阶段partition数目保持不变。在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数coalesce算子同样可以改变partition的数目,不过只能减少不能增加。repartition和coalesce算子的区别在于前者会引发shuffle,后者则不会。
  1. Spark SQL

SparkSQL产生的背景 RDD的API对于初学者来说复杂;数据分析人员所熟悉传统的R和Pandas工具API更直观,但是只能处理单机任务。基于上述两点,Spark提供了DataFrame与DataSet支持分布式大数据处理 Spark SQL相对于RDD的优点
  1. 学习成本低
基于SQL学习成本低
  1. 执行效率高
RDD基于不可变对象,每个变形都会产生新的RDD,使得Spark在运行过程中会产生大量的临时对象,对GC造成压力大。基于DataFrame打破了不变性,所以产生临时对象更少。 特别是基于Python RDD API操作时,因为Spark是使用scala语言开发的,要进行python VM与JVM需要进行大量的跨进程交换,而使用DataFrame API实际上仅仅组装了一段体积小巧的逻辑查询计划,Python只需要将查询计划发送到JVM端即可。
  1. 减少数据读取
DataFrame使用Row和Column来表示数据,那么就可以做更多的存取优化,比如使用列式的存储,索引效率更高。

4.1 DataFrame

DataFrame的每行数据记录类型是Row val spark = SparkSession.builder().appName("Spark SQL basic example") .enableHiveSupport() .getOrCreate()   import spark.implicits._ val   val df = spark.read.format("jdbc") .option("url", "jdbc:mysql://localhost:3306/test" ) .option("user", "root") .option("password", "admin") .option("dbtable", "pivot") .load()   df.select("user","type").show() df.filter("user=1 or type ='助手1'").show() df.orderBy(df("visittime").desc).show(false) df.groupBy("user")

4.2 DataSet

DataSet每行数据记录类型是自定义实体类 //样例类 case class Person(name: String, age: Int, height: Int) case class People(age: Int, names: String) case class Score(name: String, grade: Int)   //1、DataSet存储类型 val seq1 = Seq( Person("xzw", 24, 183), Person("yxy", 24, 178), Person("lzq", 25, 168)) val ds1 = spark.createDataset(seq1) ds1.show() ds1.checkpoint() ds1.cache() ds1.persist() ds1.count() ds1.unpersist(true)   //2、DataSet结构属性 ds1.columns ds1.dtypes ds1.explain()   //3、DataSet rdd数据互换 val rdd1 = ds1.rdd val ds2 = rdd1.toDS() ds2.show() val df2 = rdd1.toDF() df2.show()   //4、保存文件 df2.select("name", "age", "height").write.format("csv").save("./save")

4.3 Spark SQL运行架构

4.3.1与RDD的区别

Spark SQL不使用DAGScheduler,而是使用Catalyst优化器生成执行计划 RDD运行流程 Spark SQL运行流程

4.3.2 Catalyst优化器

  1. 基于规则优化/Rule Based Optimizer/RBO
  2. 基于代价优化/Cost Based Optimizer/CBO
        参考资料

标签:核心,RDD,master,executor,spark,原理,Spark,节点
From: https://www.cnblogs.com/Netsharp/p/17088095.html

相关文章

  • 686~687 Servlet执行原理 AND Servlet生命周期方法
    Servlet执行原理1.当服务器接受到客户端的请求后,会解析请求URL路径,获取访问的Servlet的资源路径2.查找web.xml文件,是否有对应的<url-pattern>标签体内容3.......
  • 《RPC实战与核心原理》学习笔记Day16
    23|如何在没有接口的情况下进行RPC调用?我们什么情况下需要在没有接口时进行RPC调用?列举2个典型场景:我们搭建一个测试平台,允许各个业务方在测试凭条上通过输入接口、......
  • 软考复盘:系统架构设计师核心考点总结
    大家好,我是Edison。去年(2022)复习备考参加了软考高级资格中的系统架构设计师考试。在系统架构设计师考试中,软件架构设计这一部分绝对是重点中的重点。这里,我总结了一下软......
  • Request-原理和Request继承体系
    Request-原理Request继承体系request对象继承体系结构:ServletRequest--接口|继承HttpServletRequest--接口......
  • WGCLOUD的原理和使用分享 - 实时监测服务器CPU温度
    WGCLOUD具备自动监测主机CPU温度的能力,不用配置,只要启动被控端agent就行了,它会自动采集CPU温度指标数据,如下图不过测试中,发现貌似虚拟机采集不到CPU温度,实体机是可以采集CPU......
  • 《谁有惠根斯原理的电磁场叠加证明?要完整数学证明过程》 回复
    《谁有惠根斯原理的电磁场叠加证明?要完整数学证明过程》     https://tieba.baidu.com/p/8241825902    。  用 麦克斯韦方程 证明啊,  这不......
  • 基于MT8788安卓核心板平板电脑
    联发科MT8788基带处理器介绍MT8788设备具有集成的蓝牙、fm、wlan和gps模块,是一个高度集成的基带平台,包括调制解调器和应用处理子系统,启用LTE/LTE-A和C2K智能设备应用程序。......
  • 计算机网络-DNS原理和解析过程
    1、DNS系统的简介:DNS是一套从域名到IP的映射系统。TCP/IP中使用IP地址和端口号来确定网络上的一台主机的一个程序,但是IP地址不方便记忆。于是人们发明了一种叫主机名的东西......
  • 小程序扫码登录网页端原理
    目录一、问题引入二、几个难题1.网页端是怎么知道哪个用户扫描的二维码?2.小程序扫码,扫出来的是什么东西?3.小程序扫到二维码以后,做了什么事情,怎么和网页端通讯的......
  • Mysql事务底层原理
    本文转载自https://www.cnblogs.com/yidengjiagou/p/16413825.html 事务有四大特性,分别是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability),简称......