首页 > 其他分享 >摸鱼大数据——Spark Core——Spark内核调度

摸鱼大数据——Spark Core——Spark内核调度

时间:2024-07-06 11:31:01浏览次数:3  
标签:Core Task shuffle 分区 RDD 摸鱼 Spark Stage

1、内容概述

Spark内核调度的任务:

  • 如何构建DAG执行流程图

  • 如何划分Stage阶段

  • Driver底层是如何运转

  • 确定需要构建多少分区(线程)

Spark内核调度的目的:尽可能用最少的资源高效地完成任务计算

2、RDD的依赖

RDD依赖:一个RDD的形成可能是由一个或者多个RDD得到的,此时这个RDD和之前的RDD之间产生依赖关系。

在Spark中,RDD之间的依赖关系,主要有二种类型:

  • 窄依赖:

作用: 能够让Spark程序并行计算。也就是一个分区数据计算出现问题以后,其他的分区计算不受到任何影响
特点: 父RDD的分区和子RDD的分区关系是一对一的关系。也就是父RDD分区的数据会整个被下游子RDD的分区接收

  • 宽依赖:

作用: 划分Stage的重要依据。宽依赖也叫做Shuffle依赖
特点: 父RDD的分区和子RDD的分区关系是一对多的关系。也就是父RDD的分区数据会被分成多份给到下游子RDD的多个分区所接收。
​
注意: 如果有宽依赖,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行。为了避免数据不完整

说明:

在实际使用中,不需要纠结哪些算子会存在shuffle,以需求为目标。虽然shuffle的存在会影响一定的效率, 但是以完成任务为准则,该用那个算子,就使用那个算子即可,不要过分纠结

算子中一般以ByKey结尾的会发生shuffle;另外是重分区算子会发生shuffle

3、DAG和Stage

DAG:有向无环图,主要描述一段执行任务,从开始一直往下走,不允许出现回调操作

Spark应用程序中,遇到一个Action算子,就会触发形成一个Job任务的产生。

对于每一个Job的任务,都会产生一个DAG执行流程图,那么这个流程图是如何形成的呢?

层级关系:
1- 一个application应用程序 -> 遇到一个Action算子,就会触发形成一个Job任务
2- 一个Job任务只有一个DAG有向无环图
3- 一个DAG有向无环图 -> 有多个Stage
4- 一个Stage -> 有多个Task线程
​
5- 一个RDD -> 有多个分区
6- 一个分区会被一个Task线程所处理

DAG执行流程图形成和Stage划分:

1- Spark应用程序遇到Action算子后,就会触发一个Job任务的产生。Job任务会将它所依赖的所有算子全部加载进来,形成一个Stage
​
2- 接着从Action算子从后往前进行回溯,遇到窄依赖就将算子放在同一个Stage当中;如果遇到宽依赖,就划分形成新的Stage。最后一直回溯完成

细化剖析Stage内部的流程:

默认并行度的值确认:

因为是使用textFile读取HDFS上的文件,因此RDD分区数=max(文件的block块的数量, defaultMinPartition)。继续需要知道defaultMinPartition的值是多少。
​
defaultMinPartition=min(spark.default.parallelism,2)取最小值。最终我们确认spark.default.parallelism的参数值就能够最终确认RDD的分区数有多少个
​
spark.default.parallelism参数值确认过程如下:
1- 如果有父RDD,就取父RDD的最大分区数
2- 如果没有父RDD,根据集群模式进行取值:
   2.1- 本地模式:机器的最大CPU核数
   2.2- (了解)Mesos:默认是8
   2.3- 其他模式:所有执行节点上的核总数或2,以较大者为准

4、Spark Shuffle

Spark中shuffle的发展历程:

1- 在1.1版本以前,Spark采用Hash shuffle (优化前 和 优化后)
​
2- 在1.1版本的时候,Spark推出了Sort Shuffle
​
3- 在1.5版本的时候,Spark引入钨丝计划(优化为主)
​
4- 在1.6版本的时候,将钨丝计划合并到sortShuffle中
​
5- 在2.0版本的时候,将Hash Shuffle移除,将Hash shuffle方案移植到Sort Shuffle

未优化的Hash shuffle:

存在的问题:
    上游(map端)的每个Task会产生与下游Task个数相等的小文件个数。这种情况会导致上游有非常多的小文件。另外,下游(reduce端)来拉取文件的时候,会有大量的网络IO和磁盘IO过程,因为要打开和读取多个小文件。

经过优化后的Hash shuffle

变成了由每个Executor进程产生与下游Task个数相等的小文件数。这样可以大量减小小文件的产生,以及降低下游拉取文件时候的网络IO和磁盘IO过程

Sort shuffle:

Sort Shuffle分成了两种: 普通机制和bypass机制。具体使用哪种,由Spark底层决定。
​
普通机制的运行过程: 每个上游Task线程处理数据,数据处理完以后,先放在内存中。接着对内存中的数据进行分区、排序。将内存中的数据溢写到磁盘,形成一个个的小文件。溢写完成以后,会将多个小文件合并成一个大的磁盘文件。并且针对每个大的磁盘文件,会提供一个索引文件。接着是下游Task根据索引文件来读取相应的数据。
​
bypass机制: 就是在普通机制的基础上,省略了排序的过程
​
bypass机制的触发条件是:
1- 上游RDD的分区数量最多不能超过200个
2- 上游不能对数据进行提前聚合操作(因为提前聚合,需要先进行分组操作,而分组的操作实际上是有排序的操作)

5、Job调度流程

主要是讨论:在Driver内部,是如何调度任务

1- Driver进程启动后,底层PY4J创建SparkContext顶级对象。在创建该对象的过程中,还会创建另外两个对象,分别是: DAGScheduler和TaskScheduler
    DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段
    TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行
​
2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器,拿到Job任务后,会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程,会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器
​
3- Task调度器拿到TaskSet集合以后,将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。
​
4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束
​
5- 后续过程和之前一样

6、Spark RDD并行度

整个Spark应用中,影响并行度的因素有以下两个原因:

  • 1- 资源的并行度: Executor数量 和 CPU核心数 以及 内存的大小

  • 2- 数据的并行度: Task的线程数 和 分区数量

一般将Task线程数设置为CPU核数的2-3倍。另外每个线程分配3-5GB的内存资源。

如何设置并行度:

语法: SparkConf().set("spark.default.parallelism", "num")
​
​
说明: spark.default.parallelism该参数是SparkCore中的参数。该参数只会影响shuffle以后的分区数量。另外该参数对parallelize并行化本地集合创建的RDD不起作用。

代码演示:

# 导包
from pyspark import SparkContext, SparkConf
import os

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
    # 6.1 TODO: 设置并行度
    conf = SparkConf().set('spark.default.parallelism', '6')

    # - 1.创建SparkContext对象
    sc = SparkContext(conf=conf)
    # - 2.数据输入
    textRDD = sc.textFile('file:///export/data/spark_project/spark_base/content.txt')
    # - 3.数据处理
    #   - 3.1文本内容切分
    flatMapRDD = textRDD.flatMap(lambda line: line.split(" "))
    #   - 3.2数据格式转换
    mapRDD = flatMapRDD.map(lambda word: (word, 1))

    # 6.2 TODO: shuffle之前查看分区数量
    print(f"shullfe之前分区数: {mapRDD.getNumPartitions()},分区内容:{mapRDD.glom().collect()}")

    #   - 3.3分组和聚合
    reduceRDD = mapRDD.reduceByKey(lambda agg, curr: agg + curr)
    
    # 6.3 TODO: shuffle之后查看分区数量
    print(f"shullfe之后分区数: {reduceRDD.getNumPartitions()},分区内容:{reduceRDD.glom().collect()}")

    # - 4.数据输出
    # print(reduceRDD.collect())
    # - 5.释放资源
    sc.stop()

标签:Core,Task,shuffle,分区,RDD,摸鱼,Spark,Stage
From: https://blog.csdn.net/weixin_65694308/article/details/139901999

相关文章

  • vue3【提效】使用 VueUse 高效开发(工具库 @vueuse/core + 新增的组件库 @vueuse/compo
    Vueuse是一个功能强大的Vue.js生态系统工具库,提供了可重用的组件和函数,帮助开发者更轻松地构建复杂的应用程序。官网:https://vueuse.org/core/useWindowScroll/安装VueUsenpmi@vueuse/core@vueuse/components(可选)安装自动导入,添加到imports中//需......
  • 记一次aspnetcore发布部署流程初次使用k8s
    主题:aspnetcorewebapi项目,提交到gitlab,通过jenkins(gitlab的ci/cd)编译、发布、推送到k8s。关于gitlab、jenkins、k8s安装,都是使用docker启动服务。首先新建一个项目,为了方便浏览就把swaggerr非开发环境不展示去掉 下面就是需要准备Dockerfile和k8s.yaml文件,这里不应该用......
  • .netcore微服务——项目搭建
    在.NETCore中,微服务是一种架构风格,它将应用程序构造为一组小型服务的集合,这些服务都通过HTTP-basedAPI进行通信。每个服务都是独立部署的,可以用不同的编程语言编写,并且可以使用不同的数据存储技术。微服务的主要优点包括:增强容错能力:一个服务的故障不会影响其他服务。增......
  • .NET Core 和 .NET 标准类库项目类型有什么区别?
    在VisualStudio中,至少可以创建三种不同类型的类库:类库(.NETFramework)类库(.NET标准)类库(.NETCore)虽然第一种是我们多年来一直在使用的,但一直感到困惑的一个主要问题是何时使用.NETStandard和.NETCore类库类型。那么,类库(.NETStandard)和类库(.NETCore)之间有什么......
  • 标准化(Z-score)
    标准化(Z-score)是用于将不同微生物的丰度数据进行标准化处理,以便在热图中更容易比较和解释不同样本之间的差异。具体来说,标准化的过程如下:abundance<-scale(abundance,center=TRUE,#减去均值scale=TRUE#除以标准差)标准化过程详解:减去均值(center=TRU......
  • China.NETConf2019 - 用ASP.NETCore构建可检测的高可用服务
    一、前言2019中国.NET开发者峰会(.NETConfChina2019)于2019年11月10日完美谢幕,校宝在线作为星牌赞助给予了峰会大力支持,我和项斌等一行十位同事以讲师、志愿者的身份公司参与到峰会的支持工作中,我自己很荣幸能够作为讲师与大家交流,分享了主题《用ASP.NETCore构建可检测的高......
  • 基于 .net core 8.0 的 swagger 文档优化分享-根据命名空间分组显示
    前言公司项目是是微服务项目,网关是手撸的一个.netcorewebapi项目,使用refit封装了20+服务SDK,在网关中进行统一调用和聚合等处理,以及给前端提供swagger文档在我两年前进公司的时候,文档还能够顺滑的打开,在去年的时候文档只能在本地打开,或者访问原始的swagger页面,knife......
  • Asp .Net Core 系列:基于 Castle DynamicProxy + Autofac 实践 AOP 以及实现事务、用户
    目录什么是AOP?.NetCore中有哪些AOP框架?基于CastleDynamicProxy实现AOPIOC中使用CastleDynamicProxy实现事务管理实现用户自动填充什么是AOP?AOP(Aspect-OrientedProgramming,面向切面编程)是一种编程范式,旨在通过将横切关注点(cross-cuttingconcerns)从主要业务逻辑......
  • C#面:ASP.NET Core ⽐ ASP.NET 更具优势的地⽅是什么?
    ASP.NETCore相对于ASP.NET具有以下几个优势:跨平台支持:ASP.NETCore是跨平台的,可以在Windows、Linux和macOS等多个操作系统上运行。这使得开发人员可以选择更适合他们的操作系统来进行开发和部署。更轻量级:ASP.NETCore是一个轻量级的框架,它具有更小的内存占用和更快的启动......
  • EFCore 在APS.NET MVC中使用
    1.创建一个APS.NETMVC项目 2.安装Nuget包Microsoft.EntityFrameworkCore.DesignMicrosoft.EntityFrameworkCore.SqlServer3.在Models中添加Book实体类publicclassBook{///<summary>///id///</summary>publicintId{get;set;}///......