首页 > 其他分享 >Spark

Spark

时间:2023-03-12 18:45:28浏览次数:23  
标签:task shuffle RDD 磁盘 Spark 数据

Spark

Spark基础

Spark的定义

Spark是当今大数据领域最活跃、最热门、最高效的大数据通用计算平台之一

Spark VS Hadoop

Spark对于Hadoop拥有巨大的优势,但是spark无法完全替代Hadoop,Spark主要替代的是Hadoop中的Mapreduce计算模型,存储依旧可以使用HDFS,但是Spark计算的中间结果存储在内存中,调度可以使用Spark自带的也可以使用Yarn。

Hadoop可以使用廉价的、异构的机器来进行分布式存储与计算,但是Spark对于硬件的要求更高,对于内存以及CPU有一定的要求。

Spark的优势以及特点

优秀的数据模型和丰富计算抽象

首先是MR,它提供了对数据访问和计算的抽象,但是对于数据的复用就是简单的将数据存储到一个稳定的文件系统中(HDFS),所以会产生数据的备份复制,磁盘的I/O以及数据的序列化,所以在遇到多个计算之间复用中间结果的情况下效率将会非常低下。但是这类的操作比较常见,例如:迭代计算、交互式数据挖掘等

所以提出了一个新的概念RDD

RDD是一个课哟容错且并行的数据结构,他可以显式的将中间结果数据集存储到内存中,并且通过控制数据集的分区来达到数据存放处理最优化,同时RDD也提供了丰富的API来操作数据集。

完整的生态圈

Spark拥有着完整的生态圈:

  1. Spark Core:实现了spark的基础功能,包括RDD、任务调度、内存管理、错误恢复、与存储系统交互等
  2. Spark Sql:spark用来操作结构化数据的程序包,通过spark sql我们可以使用sql进行操作数据
  3. Spark Streaming:Spark提供对实时数据进行流式计算的组件,提供了用来操作数据流的API
  4. Spark MLlib:提供常见的机器学习的功能的程序库,包括回归、分类、聚合、协同过滤等,还提供模型评估、数据导入等额外功能
  5. Graph X:Spark中用于图计算的API,性能良好,拥有丰富的功能和运算符,在海量数据上也能自如的运行复杂的图算法。
  6. 集群管理器:Spark设计的为可以高效的在一个计算节点到数千个计算节点之间伸缩计算。
  7. Structured Streaming:处理结构化流,统一了实时和离线的API

Spark的特点

  • 快:相比于MR,Spark的基于内存的处理快了100倍以上,基于硬盘的运算也快了10倍。Saprk实现了高效的DAG执行引擎,可以通过基于内存来高效的处理数据流。
  • 易用:Spark支持java、python、scala、R的API,还支持超过80种高级算法,Spark还支持交互式的python和Scala的shell,可以方便的在shell中使用spark集群来进行验证。
  • 通用:Spark提供了统一的解决方案,Spark可以应用于批处理、交互式查询、实时流处理、机器学习、图计算,这些类型的处理都可以在一个应用中无缝切换
  • 兼容性:Spark可以非常方便的与其他的开源产品进行融合。

Spark的运行模式

local 本地模式(单机)

  • 学习测试使用
  • 分为local单线程和local-cluster多线程

Standalone 独立集群模式

  • 学习测试使用
  • 典型的Master/Slave模式

standalone-HA 高可用模式

  • 生产环境使用
  • 基于standalone模式,使用zk搭建高可用,避免Master存在单点故障

on yarn 模式

  • 生产环境使用
  • 运行在yarn集群上,yarn负责资源管理,Spark负责任务调度和计算
  • 优点:计算资源按需伸缩,集群利用率高,共享底层存储,避免数据跨集群迁移

on mesos 模式

  • 国内使用较少
  • 运行在mesos资源管理器之上,mesos负责资源调度,Spark负责任务调度和计算

on cloud 模式

  • 中小公司未来会使用更多的云服务
  • 比如AWS的EC2,使用这个模式可以很轻松的访问Amazon的S3

Spark core

RDD详解

RDD的概念

因为MR框架采用的是非循环式数据流模型,把中间结果写到HDFS中,带来了大量的数据复制、磁盘IO、以及序列化开销,且只支持一些特定的计算模式,并没有提供一种特定的数据抽象。

RDD被称为分布式弹性数据集,是Spark中最基本的数据抽象,代表的是一个不可变、可分区、其中数据可以进行并行计算的集合。

RDD单词拆分:

Resilient:弹性的,RDD中的数据可以保存在内存中也可以存储在硬盘中。

Distributed:其中的元素是分布式存储的,可用于分布式计算

Dataset:是一个集合,可以存放许多元素

RDD的属性

其中的源码描述如下

A list of partitions:一组分片/一个分区列表,即数据的基本组成单位

A function for computing each split:一个函数会被作用到每一个分区上。Spark中的计算是以分片为单位,compute函数会被作用到每一个分区上

A list of dependencies on the RDDs:一个RDD会依赖其他多个RDD。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会产生类似于流水线一样的前后依赖,在部分分区数据丢失的时候可以通过依赖关系重新计算丢失的分区数据,而不是对所有的RDD分区进行计算,这是Spark的容错机制。

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) :对于KV形式的RDD会存在一个partitioner,即RDD的分区函数,默认为hashpartitioner

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) :一个列表,存储存取每个partitioner的优先位置,对于一个HDFS文件来说,这个文件就是存储的每一个Partition所在块的位置,按照移动数据不如移动计算的理念,Saprk在进行任务调度的时候,会尽可能选择那些存有数据的worker节点进行任务计算

总结:RDD是一个数据集的表示,不仅表示了数据集,还表示了数据集从哪里来的,如何计算

RDD的主要属性包括:

  • 分区列表
  • 计算函数
  • 依赖关系
  • 分区函数--默认是hash
  • 最佳位置

分区列表、分区函数、最佳位置,这三个属性其实就是在说数据集在哪里,在哪里计算更为合适,如何分区

计算函数、依赖关系,这两个属性说明数据集是怎么来的

RDD API

RDD的创建方式
  1. 由外部存储系统的数据集创建,包括本地的文件系统,还有Hadoop支持的文数据集,比如HDFS、HBase等

    val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
    
  2. 通过已有的RDD经过算子转换生成新的RDD

    val rdd2=rdd1.flatMap(_.split(" "))
    
  3. 由一个已经存在的Scala集合创建

    val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    或者
    val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
    

RDD的makeRDD方法底层调用了parallelize方法

RDD算子

RDD的算子分为两类

  • Transformation转换操作:返回一个新的RDD
  • Action动作算子:返回值不是RDD(无返回值或者返回其他的)

注意:

  • RDD不实际存储真正需要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,以及传入了什么函数)
  • RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接进行计算。只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正的运行
  • 之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行stage的划分和并行优化,这种设计可以使Spark更加有效率的运行

Transformation转换算子:

转换算子 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
fliter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但是独立的在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但是func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func函数类型必须为
(int,Interator[T] => Iterator[U])
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
distinct([numTasks]) 对源RDD去重后返回一个新的RDD
groupBykey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,Iterator[V])的RDD
reduceByKey(func,[numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同Key的值聚合到一起,与groupByKey类似,reduce任务
的个数可以通过第二个参数来设置
sortByKey([ascending],[numTasks]) 在一个(K,V)的RDD上调用,k必须实现Oredered接口,返回一个按照Key进行排序(K,V)的RDD
jion(otherDataset,[numsTask]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同Key对应的所有元素对在一起的(K,(V,W))的RDD
coalesce(numPartitons) 减少RDD的分区数到指定值。在过滤大量数据之后,可以执行此操作
reparttion(numPartitions) 重新给RDD分区

Action动作算子:

动作算子 含义
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每一个元素,Spark将会调用toString方法
,将它转化为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统
countByKey() 针对(K,V)类型的RDD,返回一个(K,int)类型的map,表示每一个key对应的元素个数
foreachPartition(func) 在数据集上的每一个分区,运行函数func

统计操作:

算子 含义
count 个数
mean 均值
sum 求和
max 最大值
min 最小值
variance 方差
sampleVariance 从采样中计算方差
stdev 标准差:衡量数据的离散程度
samplestdev 采样的标准差
stats 查看统计结果

RDD持久化/缓存

某些RDD的计算或者转化可能会比较耗费时间,如果这些RDD后续还会被频繁的使用,那么就可以对这些RDD进行持久化或者缓存操作:

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

presist方法和cache方法

RDD通过persist或者cache方法将前面的计算结果缓存,但是并不是这两个方法被调用时立刻缓存,而是触发后面的action时,该RDD将会被缓存在计算节点中,并且提供给后面使用。

通过查看源码可以发现cache最终也是调用了persist无参方法(默认存储只在内存中):

存储级别

默认的存储级别都是在内存中存储一份,Spark的存储级别还有好多种,存储级别在objectStorageLevel中定义的。

  • MORY_ONLY(默认):将RDD以非序列化的java对象存储在JVM中。如果没有足够的内存存储RDD,则某些分区将不会被存储,每次需要时都会重新计算,这是默认级别
  • MORY_AND_DISK(开发中使用):将RDD已非序列化的java对象存储带到JVM中。如果数据在内存中放不下,则溢写到磁盘上,需要时则会从磁盘上进行读取。

总结:

  • RDD持久化/缓存的目的是为了提高后续操作的速度
  • 缓存的级别由很多,默认只存储在内存中,开发使用memory_and_disk
  • 只有执行action操作的时候才会真正的将RDD数据精心持久化/缓存、
  • 实际开发中如果某一个RDD后续会被频繁的使用,可以将该RDD进行持久化/缓存

RDD的容错机制Checkpoint

持久化的局限:持久化/缓存可以把数据放在内存中,虽然快速但是也是不可靠的;也可以把数据存储到磁盘上,但是也是不可靠的

解决方法:

  • CheckPoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在HDFS上,借助HDFS的天然的高容错、高可靠来实现数据的安全,实现了RDD的容错和可靠性
  • SparkContext.setCheckpointDir("目录") //HDFS的目录
    
    RDD.checkpoint
    

总结:

  • 在开发中如何保证数据的安全性以及读取效率:可以对频繁使用的数据进行持久化/缓存,再做checkpoint操作

持久化和checkpoint的区别:

  • 位置:persist和cache只能保存在本地的磁盘和内存中,Checkpoint可以保存数据到HDFS上
  • 生命周期:cache和persist的RDD会在程序结束后被清除或者手动调用unpersist方法,Checkpoint的RDD在程序结束后依然会存在,不会被删除

RDD的依赖关系

RDD存在两种依赖关系:宽依赖和窄依赖

在上图中我们可以发现:

  • 窄依赖:父RDD的一个分区只会被子RDD的一个分区所继承
  • 宽依赖:父RDD的一个分区会被子RDD的多个分区依赖,其中涉及到shuffle

对于窄依赖:

  • 窄依赖的多个分区可以并行计算
  • 窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区数据就可以

对于宽依赖:

  • 划分阶段(stage)的依据:对于宽依赖,必须等到上一阶段完成才能计算下一个阶段

DAG的生成和stage的划分

DAG

DAG(有向无环图):指的是数据转换执行的过程,有方向,无闭环(其实就是RDD的执行流程)

原始的RDD通过一系列的转换操作就形成了DAG有向无环图,任务执行时,可以按照DAG的描述,执行真正的计算

DAG的边界:

  • 开始:通过SparkContext创建的RDD
  • 结束:触发Action,一旦触发Action就形成了一个完整的DAG
DAG划分stage

从上图我们可以看出:

  • 一个Spark程序可以有多个DAG,有几个Action就有几个DAG,上图最后只有一个Action,那么就是一个DAG。
  • 一个DAG可以有多个Stage,根据宽依赖/shuffle来进行stage的划分
  • 同一个Stage可以有多个task并行执行(task数 = 分区数,在上图中,stage1有三个分区P1,P2,,P3,对应的也有三个task)
  • 可以看到这个DAG中只有reduceByKey操作是一个宽依赖,Spark内核会以此为边界进行前后划分成不同的Stage
  • 在图中stage1中,从textFile到flatMap到马屁都是窄依赖,这几步操作可以形成一个流水线操作,通过flatMap操作生成的partition可以不用等到整个RDD计算结束,而是继续进行map操作,大大的提高了效率。
为什么要划分stage --- 并行计算
  • 一个复杂的逻辑业务如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算需要依赖上一个阶段的数据,那么我们按照shuffle进行划分(也就是按照宽依赖进行划分),就可以将一个DAG划分为多个stage/阶段,在同一个stage中,会有很多歌算子操作,形成一个pipeline流水线,流水线中多个平行的分区可以并行操作。
如何划分DAG的Stage
  • 对于窄依赖,partition的转换处理在stage中完成计算,不划分(将窄依赖尽量划分在同一个satge中,可以实现流水线计算)
  • 对于宽依赖,由于存在shuffle,只能等待父RDD处理完成之后,才能开始接下来的计算,也就是说需要划分Stage
总结:
  • Spark就根据shuffle/宽依赖使用回溯算法来对DAG进行stage进行划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的Stage/阶段中。
RDD累加器和广播变量

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数的时候,他会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务task和任务控制节点之间共享变量。

为了满足这种需求,Spark提供了两种类型的变量:

  • 累加器:累加器支持在所有不同节点之间进行累加运算
  • 广播变量:广播变量用来把变量在所有节点的内存中进行共享,在每一个机器上缓存一个只读变量,而不是为机器上的每个任务都生成一个副本。
累加器

通常在向Spaek传递函数的时候,比如使用map( )函数或者是filter( )穿条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一个新的副本,更新这些副本也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果:

val xx: Accumulator[Int] = sc.accumulator(0)

示例代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

object AccumulatorTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //使用scala集合完成累加
    var counter1: Int = 0;
    var data = Seq(1,2,3)
    data.foreach(x => counter1 += x )
    println(counter1)//6

    println("+++++++++++++++++++++++++")

    //使用RDD进行累加
    var counter2: Int = 0;
    val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
    dataRDD.foreach(x => counter2 += x)
    println(counter2)//0
    //注意:上面的RDD操作运行结果是0
    //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量
    //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
    //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系

    //那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!
    //如果解决?---使用累加器
    val counter3: Accumulator[Int] = sc.accumulator(0)
    dataRDD.foreach(x => counter3 += x)
    println(counter3)//6
  }
}
广播变量

关键词:sc.broadcast( )

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object BroadcastVariablesTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //不使用广播变量
    val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
    val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
    //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
    val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
    //根据水果编号取水果名称
    val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
    fruitNames.foreach(println)
    //注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,
    //那么会导致,被各个Task共用到的fruitMap会被多次传输
    //应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可
    //如何做到?---使用广播变量
    //注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redis
    println("=====================")
    val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
    val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
    fruitNames2.foreach(println)

  }
}

Spark Sql

Spark Sql概述

hive是将sql转化为MapReduce

SparkSQL 可以理解为是将SQL解析为“RDD + 优化"再执行

在这里插入图片描述

在Spark Sql的学习之前我们需要学习一下数据分类:

数据分类

数据分为以下几类:

定义 特点 举例
结构化数据 由固定的Schema 由预定义的Schema 关系型数据库的表
半结构化数据 没有固定的Schema,但是有结构 没有固定的Schema,有结构信息,数据一般是自描述的 指一些有结构的文件格式,例如JSON
非结构化数据 没有固定的Schema,也没有结构 没有固定的Schema,也没有结构 指图片/音频之类的格式

总结:

  • RDD主要用来处理非结构化数据、半结构化数据、结构化数据
  • SparkSql 是一个即支持Sql又支持命令式数据处理的工具
  • SparkSql 主要用来处理结构化数据,较为规范的半结构化数据也可以处理

Spark Sql数据抽象

DataFrema 和 DataSet

SparkSql数据抽象可以分为两类:

  1. DataFrema:一种以RDD为基础的分布式数据集,类似于传统的数据库的二维表格,带有Schema信息(可以理解为数据库的列名和类型)-- DataFrema = RDD + 泛型 + Sql的操作 + 优化
  2. DataSet : DataSet是DataFrema的进一步发展,他比RDD保存了更多的描述信息,概念上等同于关系数据库的二维表,它保存了类型信息,是强类型的,提供了编译时的类型检查。调用DataSet时会先生成逻辑计划,然后被Spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行。DataFrema = DataSet[Row]

RDD、DataFrema、DataSet的关系如下:

在这里插入图片描述

RDD[person] : 以person为类型参数,但不了解其内部结构

DataFrema: 提供了详细的结构信息schema列名称和类型。看起来像一张表

DataSet:不只有Schema信息,同样还有类型信息

举例

假设RDD[Person]中的数据长这样:

RDD[Person]:

在这里插入图片描述

那么DataFrema中的数据长这样

DataFrame = RDD[Person] - 泛型 + Schema + SQL 操作 + 优化

在这里插入图片描述

那么DataSet中的数据长这样

Dataset[Person] = DataFrame + 泛型:

在这里插入图片描述

也可能是:

即 DataFrame = DataSet[Row]:

在这里插入图片描述

总结:

  • DataFrema = RDD - 泛型 + Schema + Sql + 优化
  • DataSet = DataFrema + 泛型
  • DataSet= RDD + schema + Sql + 优化

Spark Sql的应用

创建DataFrema和DataSet

方式一:读取本地文件

1)在本地创建一个文件,有id、name、age三列,用空格分开,然后上传到HDFS上

vim /root/person.txt

内容如下:

1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40

2)打开Spark-shell

spark/bin/spark-shell

##创建 RDD

val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]

3)定义case class(相当于Schema)

case class Person(id:Int, name:String, age:Int)

4)将RDD与case class关联

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]

5)将RDD转化为DataFrema

val personDF = personRDD.toDF //DataFrame

6)查看数据和Schema

personDF.show

7)注册表

personDF.createOrReplaceTempView("t_person")

8)执行Sql

spark.sql("select id,name from t_person where id > 3").show

9)也可以通过SparkSession构建DataFrema

val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")
dataFrame.show //注意:直接读取的文本文件没有完整schema信息
dataFrame.printSchema
方法二:读取json文件
val jsonDF= spark.read.json("file:///resources/people.json")

接下来就可以使用DataFrema的函数操作

jsonDF.show

注意:直接读取json文件有Schema信息,因为json文件本身包含Schema信息,SparkSql可以自动解析

方法三:读取parquet文件
val parquetDF=spark.read.parquet("file:///resources/users.parquet")

接下来就可以使用Datafrema的函数操作

parquetDF.show

注意:直接读取parquet文件有Schema信息,因为parquet文件保存了列的信息

两种查询风格:DSL 和 SQL

DSL风格示例:

personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show

SQL风格示例

spark.sql("select * from t_person").show

总结:

  • DataFrema和DataSet都可以通过RDD进行创建
  • 也可以通过读取普通文本进行创建,注意直接读取没有完整的约束,需要RDD+SChema
  • 通过json和parquet有完整约束
  • 不管是DataFrema还是DataSet都可以注册成表,之后就可以通过使用SQL进行查询,也可以使用DSL

Spark Sql 多数据源交互

读取json文件:

spark.read.json("D:\\data\\output\\json").show()

读取csv文件

spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()

读取parquet文件

spark.read.parquet("D:\\data\\output\\parquet").show()

读取mysql表

val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()

写入json文件

personDF.write.json("D:\\data\\output\\json")

写入csv文件

personDF.write.csv("D:\\data\\output\\csv")

写入parquet文件

personDF.write.parquet("D:\\data\\output\\parquet")

写入mysql表

val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)

Spark 两种核心shuffle

  • 一种是基于hash的shuffle
  • 一种是基于sort的shuffle

Hash shuffle 解析

以下的讨论都假设每一个Executor都有一个cpu core

HashShuffleManager

shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如 reduceBykey),而将每个task处理的数据按key进行划分。所谓的划分就是对相同key执行hash算法,从而将相同的key写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入到内存缓冲中,当内存缓冲写满之后才会溢写到磁盘文件中去。

下一个stage的task有多少那么当前的stage的每个task就要创建多少的磁盘文件,比如:

  • 下一个stage总共有100个task,那么当前阶段的stage就要创建100份磁盘文件
  • 如果当前stage共有50个task,总共有10个Executor,每个Executor执行5个task,那么每个Executor上总共就要创建500个磁盘文件,所有的Executor上会创建5000个磁盘文件。

所以由此可见,未经优化的shuffle write 操作所产生的磁盘文件的数量是惊人的。

shuffle read 阶段,通常就是一个stage刚开始要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点通过网络拉取到自己所在的节点,然后进行key的聚合或者连接等操作。由于shuffle wirte 的过程中,map task 给下游stage的每个reduce task 都创建了磁盘文件,所以shuffle read的过程中,每个reduce task只需要从上游stage的所有马屁task所在节点上,拉取术语自己的那一个磁盘文件即可。

shuffle read 的拉取过程是一边拉取一遍聚合的,每一个shuffle read tsk都有一个属于自己的buffer 缓冲,每次拉取的时候都只能拉取与buffer缓冲大小相同的数据,然后通过内存的map进行聚合等操作。聚合完一批数据之后再拉取下一批数据,并放到buffer缓冲中进行聚合操作,以此类推最后直到数据拉取完,得到最终的结果。

HashShuffleManager的工作原理如下:

优化的HashShuffleManager

为了优化HashShuffleManager 我们可以设置一个参数:saprk.shuffle.consolidateFiles,该参数的默认值为false,将其设置为true,就可以开启优化机制,通常来说的话我们在使用HashShuffleManager的时候都建议开启此参数。

开启consolidate机制之后,在shuffle write 阶段,task就不是为了下游stage的每个task创建一个磁盘文件了,此时会出现shuffleFile Group的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task的数量是相同的。一个Executor 上有多少个对应的cpu core,就可以并行的执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的cpu core 执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说此时task会将数据写入到已有的磁盘文件,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效的将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提高shuffle write的性能。

进过优化的hashShuffleManagere:

在这里插入图片描述

优缺点

优点:

  • 可以省略不必要的排序开销
  • 避免了排序所需的内存开销

缺点:

  • 产生的文件数过多,会对文件系统造成压力
  • 大量小文件的随机读写带来一定的磁盘开销
  • 数据块写入时所需的缓存空间也会随之增加,对内存造成压力

SortShuffle 解析

SortShufflemanager 的运行机制主要分为三种:

  • 普通运行机制
  • bypass 运行机制 : 当shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200)就会启用bypass机制
  • Tungsten Sort 机制:开启此运行机制需要设置配置项 spark.shuffle.manager = tungsten-sort。而且开启此项配置也不一定保证就会采用此机制运行

普通运行机制

在该模式下,数据会先写入到一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceBykey这种聚合类的算子,那么就会选择Map数据结构,一遍通过Map进行聚合,一遍写入到内存中,如果是join这种普通的shuffle算子,那么就会选用Array数据结构,直接写入内存。接着,每写入一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值,如果达到了临界阈值,那么就会尝试将内存数据结构中的数据溢写到磁盘中,然后清空内存数据结构。

在溢写到磁盘之前,会根绝key对内存数据结构中的已有数据进行排序。排序之后,会分批将数据写入到磁盘中,默认的batch数量是10000条,也就是说,排序之后的数据,会以每批1万条的数据形式分批写入到磁盘文件中,写入磁盘文件是通过Java的BufferedoutputStream实现的。bufferedOutputdStream 是java的缓冲输出流,当内存满溢之后再一次性写入到磁盘文件中,这样可以减少磁盘IO,提升性能。

一个task将所有数据写入到内存数据结构的过程中,发生多次磁盘溢写操作,也就会产生多个临时文件,最后会将所有的临时磁盘文件进行合并,也就是Merge的过程,此时会将之前所有的临时磁盘文件读取出来,然后依次写入最终的磁盘文件,此外由于一个task只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中索引文件中标识了下游各个task的数据在文件中的start offset和end offset。

普通运行机制的SortShufflemanager工作原理:

在这里插入图片描述

Bypass运行机制

reduce端任务较少的情况下,基于Hash Shuffle 实现机制明显比基于Sort Shuffle 实现机制快,因此基于Sort Shuffle 实现机制提供了一个回退方案。这就是bypass运行机制。对与reduce 端任务少于配置属性Spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带有hash风格的回退机制。

bypass机制运行的触发条件:

  • shuffle map task的数量少于Spark.shuffle.sort.bypassMergeThreshold = 200参数设置的值
  • 不是聚合类的shuffle 算子

此时,每个task会为下游的task创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件中。当然,写入磁盘文件也是先写入到内存缓冲中,缓冲写满之后再溢写到磁盘上,最后,所有的临时磁盘文件都合并为一个磁盘文件,并且单独创建一个索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一样的,因为都要创建数量惊人的磁盘文件,只是最后会做一个磁盘文件的合并操作,因此少量的磁盘文件,也让该机制相对于未经优化的HashShuffleManager来说,Shuffle read的性能会更好。

而该机制与普通的SortShufflemanager运行机制的不同在于:第一,磁盘的写机制不同,第二不会进行排序。也就是说,启用该机制的最大好处在于shuffle write的过程,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

bypass 运行机制的sortShufflemanager 工作原理如下:

在这里插入图片描述

Tungsten Sort Shuffle 运行机制

基于Tungsten Sort 的Shuffle实现机制主要是借助Tungsten 项目所做的优化来高效处理Shuffle。

Spark提供了配置属性,用于选择开启具体的Shuffle实现机制,但是需要说明的是,虽然在默认的情况下Spark开启的是Sort Shuffle实现机制,但实际上,参考Shuffle 的框架内核部分可以得知SortShuffle 的实现机制与基于Tunsgten Sort Shuffle 实现机制都是使用SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:

对应非基于Tungsten Sort时,通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到Hash 风格的Shuffle实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle 方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当两个方法都返回false,即都不满足对应的条件时,会自动采用普通运行机制。

因此,当设置Spark.sufflr.manager = tunsgten-sort时,也不能保证就一定采用基于Tungsten Sort 放入shuffle 实现机制。

要实现Tunsgten Sort Shuffle 机制需要满足一下的条件:

  • Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求
  • Shuffle 的序列化器支持序列化值的重定位
  • shuffle 过程中的输出分区个数少于16777216个
  • 实际上,使用过程中还存在其他的一些限制,比如引用Page 形式的管理模型后,内部单条记录的长度不能超过128M,另外分区个数的限制也是该模型导致的。

优缺点

优点:

  • 小文件的数量大量减少,Mapper端的内存占用变少
  • Spark 不仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈

缺点:

  • 如果mapper 中的Task 数量过大,依旧会产生很多小文件,此时在shuffle 传数据的过程中到Reducer端,Reducer 会需要同时大量的记录进行反序列化,导致大量内存的消耗和GC负担巨大,导致系统缓慢,甚至崩溃。
  • 强制了在Mapper 端必须要进行排序,即使数据本身不需要排序
  • 他要基于记录本身进行排序,这是Sort -Based Shuffle 的对致命的性能消耗

Spark的底层执行原理

Spark的运行流程

在这里插入图片描述

具体的执行流程如下:

  1. Spark Context向资源管理器注册并向资源管理器申请运行Exector
  2. 资源管理器分配 Executor,然后资源管理器启动Executor
  3. Executor 发送心跳至资源管理器
  4. Sparkcontext 构建DAG有向无环图
  5. 将DAG分解为Stage(TaskSet)
  6. 把Stage 发送给taskScheduler
  7. Executor 向SparkContext 申请task
  8. task Scheduler 将Task 发送给Executor 运行
  9. TaskScheduler 将应用程序代码发放给 Executor
  10. task 在Executor 上运行,运行完毕释放所有的资源

从代码角度来看DAG 图的构建

Val lines1 = sc.textFile(inputPath1).map(...).map(...)

Val lines2 = sc.textFile(inputPath2).map(...)

Val lines3 = sc.textFile(inputPath3)

Val dtinone1 = lines2.union(lines3)

Val dtinone = lines1.join(dtinone1)

dtinone.saveAsTextFile(...)

dtinone.filter(...).foreach(...)

上述代码的DAG图如下所示:

在这里插入图片描述

Spark的计算发生在RDD的Action操作,而对Action之前的所有Transformation,Spark只是记录下RDD的生成的轨迹,而不会真正的触发计算。

将DAG划分为Stage核心算法

一个Application 可以有多个job 多个Stage:

Spark Application 中可以因为不同的Action 触发众多的job,一个Application 中可以有很多的job,每个job 是由一个或者多个stage构成的,后面的stage依赖于前面的stage,也就是说只有前面依赖的stage计算完毕后,后面的stage才会运行。

划分依据:Stage的划分依据就是宽依赖,向reduceBykey、groupBykey等算子,会导致宽依赖的产生。

核心算法:回溯算法

从后往前回溯/反向解析,遇到窄依赖加入本Stage,遇到宽依赖进行Stage划分。

Spark 内核会从触发Action 操作的那个RDD开始从后往前推,首先会为最后的RDD创建一个Stage,然后继续倒推,如果发现某个RDD依然是宽依赖,那个RDD就是新的Stage的最后一个RDD,然后依次类推,继续倒推。根据窄依赖或者宽依赖进行Stage的划分,知道所有的RDD遍历完成。

将DAG 划分为Stage 剖析

在这里插入图片描述

  • 一个Saprk 程序可以有多个DAG,有几个Action 就有几个DAG,上图最后只有一个Action,那么就只有一个DAG
  • 一个DAG可有多个Stage
  • 同一个Stage可以有多个task并行执行
  • 可以看到上图的DAG中只有reduceBykey一个宽依赖,Spark内核会依据此为边界将其进行前后的划分成不同的Stage。
  • 从上图中,可以看出在Stage1中,从textFile 到flatMap到map都是窄依赖的,这几步操作可以形成一个流水线操作,通过flatMap 操作生成的partition 可以不用等整个RDD的计算结束,而是继续进行map操作,这样大大提高了计算的效率。

提交Stages

调度阶段的提交,最终会被转变为一个任务集的提交,DAG Scheduler 通过 Task SCheduler 接口提交任务集,这个任务集最终会触发TaskScheduler 构建一个TaskSetManager 的实例来管理这个任务集的生命周期,对于DAG Scheduler 来说,提交阶段的工作就到此为止了。

而TaskScheruler的具体实现则会在的到计算资源的时候,进一步通过TaskSetManager 调度具体的任务到对应的EXecutor 节点上进行计算

在这里插入图片描述

监控Job、Task、Executor

DAG Scheduler 监控Job 与 Task:

  • 要保证相互依赖的作业调度阶段能够得到顺利的调度执行,DAGScheduler 需要监控当前作业调度阶段乃至任务的完成情况。
  • 这通过对外暴露一系列的回调函数来实现的

标签:task,shuffle,RDD,磁盘,Spark,数据
From: https://www.cnblogs.com/baiyunhao/p/17208733.html

相关文章

  • spark
    对hive表操作spark-shell进入界面spark.sql("").show-----sql语句spark.table("").show查看表spark.sql("selectguid,count(1)asnumfromphonecgroupbyguid......
  • Spark性能调优
    1、概述在大数据领域,肯定有很多小伙伴跟笔者一样为了让生产中数据执行速度更快、性能更高而去使用Spark,当我们用Spark程序实现功能开发并使程序正常稳定运行起来的时候,一定......
  • Java应用【XIV】使用Apache Spark MLlib构建机器学习模型【下】
    如果您觉得本博客的内容对您有所帮助或启发,请关注我的博客,以便第一时间获取最新技术文章和教程。同时,也欢迎您在评论区留言,分享想法和建议。谢谢支持!​四、无监督学习4.1聚......
  • spark SQL 连接hive
    将hive的conf下的hive-site.xml复制到spark的conf下cp/usr/local/hive/apache-hive-1.2.2-bin/conf/hive-site.xml/usr/local/spark/spark-2.0.2-bin-hadoop2.6/conf/......
  • Java应用【XIII】使用Apache Spark MLlib构建机器学习模型【上】
    如果您觉得本博客的内容对您有所帮助或启发,请关注我的博客,以便第一时间获取最新技术文章和教程。同时,也欢迎您在评论区留言,分享想法和建议。谢谢支持!​​一、引言1.1Spark......
  • Spark系列 - (6) Spark 内存管理
    6.Spark内存管理在执行Spark的应用程序时,Spark集群会启动Driver和Executor两种JVM线程,前者为主控进程,负责创建Spark上下文,提交Spark作业(Job),并将作业转化为计算任务(Task),在......
  • spark运行架构
    spark的核心框架是一个计算引擎,它采用了标准的中从结构 Driverspark的驱动器节点,主要执行spark中的main方法,负责实际代码的执行工作 Driver可以理解为使整个应用......
  • spark 提交的参数说明
    以bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://linux1:7077\./examples/jars/spark-examples_2.12-3.0.0.jar\10为例1)--......
  • Spark系列 - (5) Spark Shuffle
    目前已经更新完《Java并发编程》,《JVM性能优化》,《Spring核心知识》《Docker教程》和《Spark基础知识》,都是多年面试总结。欢迎关注【后端精进之路】,轻松阅读全部文章。......
  • Hadoop&Spark-Lec-Something-New
    NarrowandWideTransformationsNarrowtransformation:Asingleoutputpartitioncanbecomputedfromasingleinputpartition不需要考虑数据分区eg.filter(),......