首页 > 其他分享 >SparkCore(四)

SparkCore(四)

时间:2022-10-27 20:47:52浏览次数:57  
标签:task shuffle 分区 SparkCore RDD spark Spark

【理解】Spark内核原理

RDD 依赖

  • RDD的5大特性中,第三个是【与父RDD的依赖关系】

  • 依赖关系可以按照是否有shuffle进一步分类

  • 窄依赖:【没有】shuffle,父RDD的一个分区只会被子RDD的【1】个分区依赖,(或父RDD的一个分区的数据整个都进入到了子RDD的1个分区中)。特点是下面的父RDD的分区的出去的线条只有【1】条就行了,不能有分叉。

  • 宽依赖:【有】shuffle,父RDD的一个分区会被子RDD的【多】个分区所依赖(或父RDD的一个分区的数据分发到了子RDD的多个分区中)。特点是下面的父RDD的分区的出去的线条有【多】条。

DAG和Stage

  • DAG

  • 有向无环图

    • 有【1】个大方向,无【闭环】。如我们之前的wordcount的的执行流程就是一个DAG

    • DAG存在的形式很多样,有很多种画法,过程都一样。

  • DAG如何划分stage

    • 从加载源数据开始,到【action】算子结束,这是一个计算链条。会生成一个【job】,一个job对应一个【DAG】。
    • 自Action算子从后往前依次判断,如果遇到【宽】依赖算子(比如【reduceByKey】,【join】这些算子)就断开形成2个【stage】。继续往前,如果是【窄】依赖算子(比如【map】,【flapMap】,【filter】等算子)则加入到同一个【stage】中。
    • 如下图,同一个stage内的多个RDD间都是【窄】依赖,分区数不变,可以对同一个分区内的多个窄依赖算子【合并】操作,形成pipeline(管道),每条数据都可以一次性计算完成,每个分区是一个【管道】,作为一个【task】被1个(线程、core)执行完成。

Spark基本概念/名词/术语解释

1.Application:应用,一个完整的main方法程序

有哪些命令会产生一个Application

  • spark/bin/pyspark 【输入exit()就退出】
  • spark/bin/spark-submit 【main方法跑完就退出】
  • spark/bin/spark-shell 【输入:quit就退出】
  • spark/bin/spark-sql 【输入exit; 就退出】
  • 在Pycharm中写完代码,右键执行run
  • 【了解】spark/sbin/start-thriftserver.sh
  • 只要上面的命令都运行,就可以在webui页面查看运行情况。(一般运行时页面端口是【4040】,也可能会轮询到4041等)
  • 只要上面的命令都运行完毕或强行突出,那么Application就结束。同时webui运行时页面也退出。可以去Spark历史日志页面查看,端口是【18080】。

2.Driver:JVM进程,驱动,就是用来执行main方法的, 里面会执行一些Driver端的代码,如创建SparkContext,设置应用名,设置日志级别...

3.Executor:运行在Worker Node中的JVM进程!,执行【由Driver发送过来的lambda或def函数,分布式代码】。

  • 比如 rdd1.map(lambda x:x+1) 其中的lambda函数会被作为task分发到【Executor的core】上运行。一个RDD的分区有多少个,则就有多少个task,他们lambda逻辑一样,但计算的数据块不一样。

4.SparkContext: 任何Spark 的Application必须有一个SparkContext上下文, Spark运行时的上下文环境。旗下自动创建了2个对象,TaskScheduler和DAGScheduler。

5.ClusterManager:集群的资源管理器,【了解】对于Standalone模式,就是【Master】,对于Yarn模式就是【ResourceManager】,统一管理集群的资源。

6.Worker Node:工作节点,是拥有CPU/内存的机器,是真正干活的节点,【了解】对于Standalone模式,就是【Worker】进程,对于Yarn模式就是【NodeManager】进程

7.RDD:弹性分布式数据集

8.DAG:有向无环图,从加载数据到Action输出操作的计算链条。形成的RDD的执行流程图---静态的图

9.Job:作业,按照DAG进行执行就形成了Job---按照图动态的执行

10.Stage:DAG中,根据【宽】依赖划分出来的一个个的执行阶段!

11.Task:一个分区上的一系列操作(pipline上的一系列操作)就是一个Task,同一个Stage中的多个Task可以并行执行!(每一个Task由线程执行),所以也可以这样说:Task(线程)是运行在Executor(进程)中的最小单位!

12.TaskSet:任务集,就是同一个Stage中的各个【Task】组成的集合,其实可以认为它就是stage

Spark调度流程

  • 在创建SparkContext对象时,就会自动生成2个内置对象,【DAGScheduler】和【TaskScheduler】。
  • DAGScheduler:stage级的调度器:
    • 主要是将DAG依据【shuffle】依赖切分成若干【stage】,
    • 并将每个Stage打包成TaskSet交给TaskScheduler调度。
  • TaskScheduler:Task级的调度器:将DAGScheduler每次传过来的一个TaskSet里面的task们按照指定的调度策略分发到【Executor上的线程池】上执行

结合yarn-cluster图片资料,全流程回顾调度过程(重要)

【了解】SparkShuffle

  • SparkShuffle

    • Spark1.2版本前使用【HashShuffle机制】
    • Spark1.2之后版本使用【SortShuffle机制】
  • MR的shuffle回顾

  • Spark的shuffle简介

    • Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做【map】工作,下游Stage做【reduce】工作,其本质上还是MapReduce计算框架。【shuffle】是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等
    • Spark的Shuffle的上游Stage的最后一步是【Shuffle Write】,下游Stage第一步是【Shuffle Read】。
  • HashshuffleManager(已经被舍弃)

    • Hash体现在会按照maptask处理的数据的key的Hash值 除以 下游reducetask的个数 的余数(取模)

    • 未经优化的HashShuffle,缺点是中间层的小文件非常多,中间小文件个数=【上游的maptask数】*【下游的reducetask数】,非常之多

    • groupby和join都能触发shuffle。下图是group by的场景

    • 优化后的HashShuffle(被舍弃),中间小文件个数=【上游的Executor数量】*【下游的reducetask数】,数目成倍减少。

      • 引入了新概念shuffleFileGroup
      • 相当于上面Hashshuffle,将中间层的小文件,都合并追加成一个大文件,以减少小文件数目.
  • sortshuffleManager(选用)

    • 普通机制(需要排序)
      • 1-定义数据结构:如果是【reduceByKey】这种聚合类的shuffle算子触发的,它会预聚合,那么会选用Map(dict)数据结构,如果是【join】这种普通shuffle算子触发的,那么会选用Array(list)数据结构,初始大小是【5】M

      • 2-申请的内存=当前的数据内存情况*2-上一次的内存情况。如果达到临界阈值的话,则将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

      • 3-排序:在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行【排序】。

      • 4-溢写磁盘:排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。

      • 5-合并:对上面的小文件进行合并成1个文件,并建立索引,在索引文件中的start offset与end offset表示文件索引

    • ByPass机制(无需排序)
      • 就是无需排序了
      • 需要满足下面2点才会触发bypass机制:
      • 1.当shuffle 的reducetask的数量小于等于参数【spark.shuffle.sort.bypassMergeThreshold】的值时(默认为【200】)(因为认为reducetask小于200时,不排序的性能更优)
      • 2.且算子不能是【带有map端预聚合的算子】。
        • reduceByKey是带有map端聚合的shuffle算子。(不会走bypass机制)
        • groupBykey(只分组不聚合)不是map端聚合的shuffle算子。(有机会走bypass机制)
        • 就是排序步骤去掉,拉取数据时还是使用hash值拉取,其他一样.

Spark 并行度

资源并行度与数据并行度

  • 资源的并行度

    • 默认是提交应用时申请的总core数。
    • 如果提交到的是local[k]: 资源的并行度默认就是【k】
    • 如果提交到的是yarn:资源的并行度就是:【--num-executors】 *【--executor-cores】
  • 【了解】默认的并行度参数:spark.default.parallelism,和shuffle有关。

    • 可以手动指定比如数字n

      • conf.set("spark.defalut.parallelism", n)

      • 他等于=n

      • rdd2=rdd1.reduceByKey(lambda a, b: a + b), rdd2的分区数=【n】

    • 如果不手动指定,

      • 他等于None

      • 也就是 conf.get("spark.defalut.parallelism") 会返回None

      • rdd2=rdd1.reduceByKey(lambda a, b: a + b), rdd2的分区数=【资源的并行度】

  • 数据的并行度,(运行时可以手动改变)

    • stage中的task的个数,或RDD的分区partition个数

      task又分为map时的task和reduce(shuffle)时的task;

      task的数目和很多因素有关:

      • 申请的资源的总core数,
      • (如果是SparkCore)spark.default.parallelism参数,
      • (如果是SparkSQL)spark.sql.shuffle.partitions参数,
      • 读取数据源的类型(数据源是很多个小文件的话,一个小文件一个task),
      • shuffle方法的第二个参数, 如rdd2=rdd1.reduceByKey(lambda, num )
      • repartition的数目等等。rdd2=rdd.repartition(num)
  • 最好将RDD的分区数(即数据的并行度或称Task数量)设置为:资源的并行度的【2~3倍

  • 因为: 首先,如果task数少了,有资源(cpu core) 空闲,其次,刚好的话,由于每个task处理的文件大小不同,可能有些很大,有些很小,其他的处理完了,还得等那个没处理完的,又空闲了,还不如拆了,能继续接着处理,最后,分布式集群,每个资源机器不同,显然,有的core处理快一些(电脑好呀),有些很慢,所以task设置为2-3倍,是比较合理的,处理快的,当然要处理多一点,不能让它闲下来.

【了解】SparkSQL 概述

【了解】发展历史

  • 在Spark1.0之前:开源项目【Shark】把hiveSQL语句翻译成【RDD】代码,取代MapReduce引擎,使用Spark引擎计算。但是因为底层基于【hive】的执行引擎修改,修改代码的过程很痛苦,所以很快维护不下去了。
  • Spark1时,Spark推出SparkSQL,数据结构是【SchemaRDD】(RDD+字段名称,字段类型)
  • Spark1.3时,SparkSQL推出数据抽象【DataFrame】,实现了RDD的大部分功能,并增加了SQL编程操作,但不支持【 泛型】。不用hive框架,用【catalyst】引擎。
    • DataFrame=RDD+schema-泛型
  • 【了解】Spark1.6:加入新的数据抽象【Dataset】,支持泛型,对jvm对象更高效。
    • Dataset=DataFrame+【泛型】
  • 【了解】Spark2.0,java版Spark将DataFrame和Dataset统一成【Dataset】,DataFrame只是作为Dataset的一个特例,即【DataFrame】=Dataset[Row](上面图片右上角写错了)
    • pyspark不支持Dataset,因为不涉及jvm,pyspark只支持DataFrame,但是pyspark的DataFrame支持pyarrow矢量计算,也是独门武器
  • hive和SparkSQL的关系:
    • Spark1之前的思维:将hive框架的MapReduce引擎替换成【Spark RDD 】引擎,说白了就是如何优化hive。
    • Spark1之后的思维:将hive、mysql等作为第三方数据来源,SparkSQL只提供【计算引擎】,不做单一数据库,而做平台。

SparkSQL数据抽象

将文件上传到hdfs上

hdfs dfs -put /export/server/spark/examples/src/main/resources/* /pydata

/export/server/spark/bin/pyspark

  • DataFrame

    • RDD的缺点是无从知道每个元素的【内部属性】信息。意思是下图不知道Person对象的姓名、年龄等。

    • DataFrame=RDD -【泛型】+ schema + 方便的SQL操作 + 【catalyst】优化

    • DataFrame本质是一个【弹性分布式数据表】

  • Schema信息

    • 表示表结构,每个字段的【名称】和【类型】

    • 查看DataFrame的的Schema信息

      • 【df.printSchema()】

      • 【df.schema】

    • 【了解】使用类【StructType】和【StructField】来描述schema

      • 上面的第二种写法

        • jsonSchema=StructType( [ StructField('id',StringType,True) ,
          StructField('city',StringType), 
          StructField('pop',LongType), 
          StructField('state',StringType)] )
          
  • Row

    • 表示每一行数据

      • df.first()

    • 创建方式

      • Row(字段名1=值1,字段名2=值2,。。。)

    • 获取字段方式

      • row[下标]

      • row.【字段名】

SparkSession 应用入口

  • 上下文对象统一使用【SparkSession】,代替之前的SparkContext/【SQLContext/HiveContet 】。

    • SQLContext/HiveContet这2个上下文对象曾经昙花一现,已经废弃。
    • SparkContext尚未废弃。
    • 也能从SparkSession对象中获得sparkContext对象,可以认为SparkSession是高级别的,他可以直接获取低级别对象sparkContext。代码为: sc=sparkSession.sparkContext。
  • 创建方式采用建造者模式(详见设计模式)

标签:task,shuffle,分区,SparkCore,RDD,spark,Spark
From: https://www.cnblogs.com/nanguyhz/p/16833652.html

相关文章

  • SparkCore(二)
    RDD的API操作/方法/算子比如有一个100M的csv文件,需要对它的每个元素操作,比如先+1,再平方,结果保存另一个csv文件。如下图,如果用传统python思维,不仅每个中间容器占用内存,消......
  • SparkCore:累加器和广播变量
    累加器累加器(分布式共享只写变量):用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每......
  • sparkcore案例四:统计每个省份的用户访问量
    题目:/***统计每个省份的用户访问量,最终要求将不同省份用户访问量存放到不同的分区中分区存放规则如下*省份是以包含山0*如果省份包含海1*其他......
  • sparkcore案例三:获取每一种状态码对应的访问量
    题目描述:/***清洗完成的数据中包含一个用户的响应状态码,获取每一种状态码对应的访问量*1、读取清洗完成的数据成为RDD[String]*2、可以把上一步得到的RDD......
  • SparkCore系列(四)函数大全
    有了上面三篇的函数,平时开发应该问题不大了。这篇的主要目的是把所有的函数都过一遍,深入RDD的函数RDD函数大全数据准备        val sparkconf = new Spa......