【理解】Spark内核原理
RDD 依赖
-
RDD的5大特性中,第三个是【与父RDD的依赖关系】
-
依赖关系可以按照是否有shuffle进一步分类
-
窄依赖:【没有】shuffle,父RDD的一个分区只会被子RDD的【1】个分区依赖,(或父RDD的一个分区的数据整个都进入到了子RDD的1个分区中)。特点是下面的父RDD的分区的出去的线条只有【1】条就行了,不能有分叉。
-
宽依赖:【有】shuffle,父RDD的一个分区会被子RDD的【多】个分区所依赖(或父RDD的一个分区的数据分发到了子RDD的多个分区中)。特点是下面的父RDD的分区的出去的线条有【多】条。
DAG和Stage
-
DAG:
-
有向无环图
-
有【1】个大方向,无【闭环】。如我们之前的wordcount的的执行流程就是一个DAG
-
DAG存在的形式很多样,有很多种画法,过程都一样。
-
-
DAG如何划分stage
- 从加载源数据开始,到【action】算子结束,这是一个计算链条。会生成一个【job】,一个job对应一个【DAG】。
- 自Action算子从后往前依次判断,如果遇到【宽】依赖算子(比如【reduceByKey】,【join】这些算子)就断开形成2个【stage】。继续往前,如果是【窄】依赖算子(比如【map】,【flapMap】,【filter】等算子)则加入到同一个【stage】中。
- 如下图,同一个stage内的多个RDD间都是【窄】依赖,分区数不变,可以对同一个分区内的多个窄依赖算子【合并】操作,形成pipeline(管道),每条数据都可以一次性计算完成,每个分区是一个【管道】,作为一个【task】被1个(线程、core)执行完成。
Spark基本概念/名词/术语解释
1.Application:应用,一个完整的main方法程序
有哪些命令会产生一个Application
- spark/bin/pyspark 【输入exit()就退出】
- spark/bin/spark-submit 【main方法跑完就退出】
- spark/bin/spark-shell 【输入:quit就退出】
- spark/bin/spark-sql 【输入exit; 就退出】
- 在Pycharm中写完代码,右键执行run
- 【了解】spark/sbin/start-thriftserver.sh
- 只要上面的命令都运行,就可以在webui页面查看运行情况。(一般运行时页面端口是【4040】,也可能会轮询到4041等)
- 只要上面的命令都运行完毕或强行突出,那么Application就结束。同时webui运行时页面也退出。可以去Spark历史日志页面查看,端口是【18080】。
2.Driver:JVM进程,驱动,就是用来执行main方法的, 里面会执行一些Driver端的代码,如创建SparkContext,设置应用名,设置日志级别...
3.Executor:运行在Worker Node中的JVM进程!,执行【由Driver发送过来的lambda或def函数,分布式代码】。
- 比如 rdd1.map(lambda x:x+1) 其中的lambda函数会被作为task分发到【Executor的core】上运行。一个RDD的分区有多少个,则就有多少个task,他们lambda逻辑一样,但计算的数据块不一样。
4.SparkContext: 任何Spark 的Application必须有一个SparkContext上下文, Spark运行时的上下文环境。旗下自动创建了2个对象,TaskScheduler和DAGScheduler。
5.ClusterManager:集群的资源管理器,【了解】对于Standalone模式,就是【Master】,对于Yarn模式就是【ResourceManager】,统一管理集群的资源。
6.Worker Node:工作节点,是拥有CPU/内存的机器,是真正干活的节点,【了解】对于Standalone模式,就是【Worker】进程,对于Yarn模式就是【NodeManager】进程
7.RDD:弹性分布式数据集
8.DAG:有向无环图,从加载数据到Action输出操作的计算链条。形成的RDD的执行流程图---静态的图
9.Job:作业,按照DAG进行执行就形成了Job---按照图动态的执行
10.Stage:DAG中,根据【宽】依赖划分出来的一个个的执行阶段!
11.Task:一个分区上的一系列操作(pipline上的一系列操作)就是一个Task,同一个Stage中的多个Task可以并行执行!(每一个Task由线程执行),所以也可以这样说:Task(线程)是运行在Executor(进程)中的最小单位!
12.TaskSet:任务集,就是同一个Stage中的各个【Task】组成的集合,其实可以认为它就是stage
Spark调度流程
- 在创建SparkContext对象时,就会自动生成2个内置对象,【DAGScheduler】和【TaskScheduler】。
- DAGScheduler:stage级的调度器:
- 主要是将DAG依据【shuffle】依赖切分成若干【stage】,
- 并将每个Stage打包成TaskSet交给TaskScheduler调度。
- TaskScheduler:Task级的调度器:将DAGScheduler每次传过来的一个TaskSet里面的task们按照指定的调度策略分发到【Executor上的线程池】上执行
结合yarn-cluster图片资料,全流程回顾调度过程(重要)
【了解】SparkShuffle
-
SparkShuffle
- Spark1.2版本前使用【HashShuffle机制】
- Spark1.2之后版本使用【SortShuffle机制】
-
MR的shuffle回顾
-
Spark的shuffle简介
- Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做【map】工作,下游Stage做【reduce】工作,其本质上还是MapReduce计算框架。【shuffle】是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等
- Spark的Shuffle的上游Stage的最后一步是【Shuffle Write】,下游Stage第一步是【Shuffle Read】。
-
HashshuffleManager(已经被舍弃)
-
Hash体现在会按照maptask处理的数据的key的Hash值 除以 下游reducetask的个数 的余数(取模)
-
未经优化的HashShuffle,缺点是中间层的小文件非常多,中间小文件个数=【上游的maptask数】*【下游的reducetask数】,非常之多
-
groupby和join都能触发shuffle。下图是group by的场景
-
优化后的HashShuffle(被舍弃),中间小文件个数=【上游的Executor数量】*【下游的reducetask数】,数目成倍减少。
- 引入了新概念shuffleFileGroup
- 相当于上面Hashshuffle,将中间层的小文件,都合并追加成一个大文件,以减少小文件数目.
-
-
sortshuffleManager(选用)
- 普通机制(需要排序)
-
1-定义数据结构:如果是【reduceByKey】这种聚合类的shuffle算子触发的,它会预聚合,那么会选用Map(dict)数据结构,如果是【join】这种普通shuffle算子触发的,那么会选用Array(list)数据结构,初始大小是【5】M
-
2-申请的内存=当前的数据内存情况*2-上一次的内存情况。如果达到临界阈值的话,则将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
-
3-排序:在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行【排序】。
-
4-溢写磁盘:排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
-
5-合并:对上面的小文件进行合并成1个文件,并建立索引,在索引文件中的start offset与end offset表示文件索引
- ByPass机制(无需排序)
- 就是无需排序了
- 需要满足下面2点才会触发bypass机制:
- 1.当shuffle 的reducetask的数量小于等于参数【spark.shuffle.sort.bypassMergeThreshold】的值时(默认为【200】)(因为认为reducetask小于200时,不排序的性能更优)
- 2.且算子不能是【带有map端预聚合的算子】。
- reduceByKey是带有map端聚合的shuffle算子。(不会走bypass机制)
- groupBykey(只分组不聚合)不是map端聚合的shuffle算子。(有机会走bypass机制)
- 就是排序步骤去掉,拉取数据时还是使用hash值拉取,其他一样.
- 普通机制(需要排序)
Spark 并行度
资源并行度与数据并行度
-
资源的并行度
- 默认是提交应用时申请的总core数。
- 如果提交到的是local[k]: 资源的并行度默认就是【k】
- 如果提交到的是yarn:资源的并行度就是:【--num-executors】 *【--executor-cores】
-
【了解】默认的并行度参数:spark.default.parallelism,和shuffle有关。
-
可以手动指定比如数字n
-
conf.set("spark.defalut.parallelism", n)
-
他等于=n
-
rdd2=rdd1.reduceByKey(lambda a, b: a + b), rdd2的分区数=【n】
-
-
如果不手动指定,
-
他等于None
-
也就是 conf.get("spark.defalut.parallelism") 会返回None
-
rdd2=rdd1.reduceByKey(lambda a, b: a + b), rdd2的分区数=【资源的并行度】
-
-
-
数据的并行度,(运行时可以手动改变)
-
stage中的task的个数,或RDD的分区partition个数
task又分为map时的task和reduce(shuffle)时的task;
task的数目和很多因素有关:
- 申请的资源的总core数,
- (如果是SparkCore)spark.default.parallelism参数,
- (如果是SparkSQL)spark.sql.shuffle.partitions参数,
- 读取数据源的类型(数据源是很多个小文件的话,一个小文件一个task),
- shuffle方法的第二个参数, 如rdd2=rdd1.reduceByKey(lambda, num )
- repartition的数目等等。rdd2=rdd.repartition(num)
-
-
最好将RDD的分区数(即数据的并行度或称Task数量)设置为:资源的并行度的【2~3倍】
-
因为: 首先,如果task数少了,有资源(cpu core) 空闲,其次,刚好的话,由于每个task处理的文件大小不同,可能有些很大,有些很小,其他的处理完了,还得等那个没处理完的,又空闲了,还不如拆了,能继续接着处理,最后,分布式集群,每个资源机器不同,显然,有的core处理快一些(电脑好呀),有些很慢,所以task设置为2-3倍,是比较合理的,处理快的,当然要处理多一点,不能让它闲下来.
【了解】SparkSQL 概述
【了解】发展历史
- 在Spark1.0之前:开源项目【Shark】把hiveSQL语句翻译成【RDD】代码,取代MapReduce引擎,使用Spark引擎计算。但是因为底层基于【hive】的执行引擎修改,修改代码的过程很痛苦,所以很快维护不下去了。
- Spark1时,Spark推出SparkSQL,数据结构是【SchemaRDD】(RDD+字段名称,字段类型)
- Spark1.3时,SparkSQL推出数据抽象【DataFrame】,实现了RDD的大部分功能,并增加了SQL编程操作,但不支持【 泛型】。不用hive框架,用【catalyst】引擎。
- DataFrame=RDD+schema-泛型
- 【了解】Spark1.6:加入新的数据抽象【Dataset】,支持泛型,对jvm对象更高效。
- Dataset=DataFrame+【泛型】
- 【了解】Spark2.0,java版Spark将DataFrame和Dataset统一成【Dataset】,DataFrame只是作为Dataset的一个特例,即【DataFrame】=Dataset[Row](上面图片右上角写错了)
- pyspark不支持Dataset,因为不涉及jvm,pyspark只支持DataFrame,但是pyspark的DataFrame支持pyarrow矢量计算,也是独门武器
- hive和SparkSQL的关系:
- Spark1之前的思维:将hive框架的MapReduce引擎替换成【Spark RDD 】引擎,说白了就是如何优化hive。
- Spark1之后的思维:将hive、mysql等作为第三方数据来源,SparkSQL只提供【计算引擎】,不做单一数据库,而做平台。
SparkSQL数据抽象
将文件上传到hdfs上
hdfs dfs -put /export/server/spark/examples/src/main/resources/* /pydata
/export/server/spark/bin/pyspark
-
DataFrame
-
RDD的缺点是无从知道每个元素的【内部属性】信息。意思是下图不知道Person对象的姓名、年龄等。
-
DataFrame=RDD -【泛型】+ schema + 方便的SQL操作 + 【catalyst】优化
-
DataFrame本质是一个【弹性分布式数据表】
-
-
Schema信息
-
表示表结构,每个字段的【名称】和【类型】
-
查看DataFrame的的Schema信息
-
【df.printSchema()】
-
【df.schema】
-
-
【了解】使用类【StructType】和【StructField】来描述schema
-
上面的第二种写法
-
jsonSchema=StructType( [ StructField('id',StringType,True) , StructField('city',StringType), StructField('pop',LongType), StructField('state',StringType)] )
-
-
-
Row
-
表示每一行数据
-
df.first()
-
-
创建方式
-
Row(字段名1=值1,字段名2=值2,。。。)
-
-
获取字段方式
-
row[下标]
-
row.【字段名】
-
-
SparkSession 应用入口
-
上下文对象统一使用【SparkSession】,代替之前的SparkContext/【SQLContext/HiveContet 】。
- SQLContext/HiveContet这2个上下文对象曾经昙花一现,已经废弃。
- SparkContext尚未废弃。
- 也能从SparkSession对象中获得sparkContext对象,可以认为SparkSession是高级别的,他可以直接获取低级别对象sparkContext。代码为: sc=sparkSession.sparkContext。
-
创建方式采用建造者模式(详见设计模式)