Spark
core
spark作业执行的特点
* spark作业执行的特点:
* 1、只有遇到行动算子的时候,整个spark作业才会被触发执行
* 2、遇到几次,执行几次
算子
RDD
RDD: 弹性分布式数据集
* 弹性:数据量可大可小
* RDD类似于容器,但是本身存储的不是数据,是计算逻辑
* 当遇到行动算子的时候,整个spark作业才会被触发执行,是从第一个RDD开始执行,数据才开始产生流动
* 数据在RDD之间只是流动关系,不会存储
* 流动的数据量可以很大,也可以很小,所以称为弹性
* 分布式:
* spark本质上它是需要从HDFS中读取数据的,HDFS是分布式,数据block块将来可能会在不同的datanode上
* RDD中流动的数据,可能会来自不同的datanode中的block块数据
* 数据集:
* 计算流动过程中,可以短暂地将RDD看成一个容器,容器中有数据,默认情况下在内存中不会进行存储
* 后面会有办法将一个RDD的数据存储到磁盘中
RDD的5大特性
RDD的5大特性:(面试必问!)
* 1、RDD是由一系列分区构成
* 注意:
* 1)读文件时的minPartitions参数只能决定最小分区数,实际读取文件后的RDD分区数,由数据内容本身以及集群的分布来共同决定的
* 2)若设置minPartitions的大小比block块数量还少的话,实际上以block块数量来决定分区数
* 3)产生shuffle的算子调用时,可以传入numPartitions,实际真正改变RDD的分区数,设置多少,最终RDD就有多少分区
*
* 2、算子是作用在每一个分区上的
*
* 3、RDD与RDD之间存在一些依赖关系
* 1)窄依赖 前一个RDD中的某一个分区数据只会到后一个RDD中的某一个分区 一对一的关系
* 2)宽依赖 前一个RDD中的某一个分区数据会进入到后一个RDD中的不同分区中 一对多的关系 也可以通过查看是否产生shuffle来判断
* 3)整个spark作业会被宽依赖的个数划分若干个stage, Num(stage) = Num(宽依赖) + 1
* 4)当遇到产生shuffle的算子的时候,涉及到从前一个RDD写数据到磁盘中,从磁盘中读取数据到后一个RDD的现象,
* 注意:第一次触发执行的时候,磁盘是没有数据的,所以会从第一个RDD产生开始执行
* 当重复触发相同的执行的时候,对于同一个DAG有向无环图而言,会直接从shuffle之后的RDD开始执行,可以直接从磁盘读取数据。
* 5)一个阶段中,RDD有几个分区,就会有几个并行task任务
*
* 4、kv算子只能作用在kv的RDD上
*
* 5、spark会提供最优的任务计算方式,只移动计算,不移动数据。
map
//map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
//将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
filter
//filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
flatMap
flatMap: 将rdd中的数据每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
Sample
/**
* sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
* 这个函数主要在机器学习的时候会用到
*/
GroupBy
/**
* groupBy算子的使用
*
* 1、groupBy的算子,后面的分组条件是我们自己指定的
* 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储
*/
GroupByKey
/**
* GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
* 也就说,只有kv格式的RDD才能调用kv格式的算子
*/
groupBy算子与groupByKey算子的区别
/**
* 面试题:spark core中 groupBy算子与groupByKey算子的区别?
* 1、代码格式上:
* groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
* groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
*
* 2、执行shuffle数据量来看
* groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
* 所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
*/
groupByKey与reduceBykey的区别
/**
* 面试题:
* groupByKey与reduceBykey的区别?
* 相同点:
* 它们都是kv格式的算子,只有kv格式的RDD才能调用
* 不同点:
* 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
* 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
* 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
*
*/
Union
//两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
//分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
Join
join算子也要作用在kv格式的RDD上
内连接 两个rdd共同拥有的键才会进行关联
右连接 保证右边rdd键的完整性
MapValues
/**
* mapValues函数也是作用在kv格式的算子上
* 将每个元素的值传递给后面的函数,进行处理得到新的值,键不变,这个处理后的组合重新返回到新的RDD中
*/
MapPartitions
/**
* mapPartitions:一次处理一个分区中的数据
* 它与map的区别在于,map是每次处理一条数据就返回一条数据到下一个rdd
* 而mapPartitions一次处理一个分区的数据,处理完再返回
* 最后的处理效果和map的处理效果是一样的
*
* mapPartition可以优化与数据库连接的次数
*/
SortBy
对一个集合中的元素进行排序
foreach
/**
* 行动算子,就可以触发一次作业执行,有几次行动算子调用,就会触发几次
*
* rdd是懒加载的性质
*/
collect
collect将rdd转成合适的scala中的数据结构
Cache
/**
* 缓存:
* 缓存的目的是为了spark core作业执行的时候,缩短rdd的执行链,能够更快的得到结果
* 缓存的实现方式:
* 1、需要缓存的rdd调用cache函数
* 2、persist(StorageLevel.MEMORY_ONLY) 修改缓存级别
*
*/
Checkpoint
/**
* 永久将执行过程中RDD中流动的数据存储到磁盘(hdfs)中
* checkpoint
*
* 需要设置checkpoint的路径,统一设置的
*
* checkpoint也相当于一个行动算子,触发作业执行
* 第二次DAG有向无环图执行的时候,直接从最后一个有检查点的rdd开始向下执行
*/
checkpoint和cache的区别
/**
* checkpoint和cache的区别?
* cache是将一个复杂的RDD做缓存,将来执行的时候,只是这个rdd会从缓存中取
* checkpoint是永久将rdd数据持久化,将来执行的时候,直接从检查点的rdd往后执行
*/
standalone
/**
* standalone
* - client模式提交命令:
* spark-submit --class com.shujia.core.Demo18Standalone --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar 10
*
* - cluster模式提交命令:
* spark-submit --class com.shujia.core.Demo18Standalone --master spark://master:7077 --executor-memory 512M --total-executor-cores 1 --deploy-mode cluster spark-1.0.jar 10
*/
累加器Accumulator
必须要有行动算子触发作业执行
//创建累加器变量
// val c1: LongAccumulator = sc.longAccumulator("c1")
// linesRDD.foreach((e: String) => {
// num += 1
// println("-----------------------")
// println(num)
// })
// println(s"num的值为:$num") // 0 1000
//使用累加器
// val c1: LongAccumulator = sc.longAccumulator("c1")
// linesRDD.foreach((e:String)=>{
// c1.add(1)
// })
// println(s"累加之后的值为:${c1.value}")
//使用累加器
// val c1: LongAccumulator = sc.longAccumulator("c1")
// linesRDD.map((e: String) => {
// c1.add(1)
// }).collect()
// println(s"累加之后的值为:${c1.value}")
* 写spark core程序的注意事项
* 1、RDD中无法嵌套使用RDD
* 2、RDD中无法使用SparkContext
广播大变量
使用SparkContext中的一个功能,将Driver端的变量广播到executor执行的节点上的blockManager中
val bc: Broadcast[mutable.Map[String, String]] = sc.broadcast(map1)
val scoreRDD: RDD[String] = sc.textFile("spark/data/score.txt")
//未使用广播变量
// val resRDD: RDD[(String, String, String)] = scoreRDD.map((line: String) => {
// val array1: Array[String] = line.split(",")
// val id: String = array1(0)
// // 通过map1的变量,通过键获取值
// val info: String = map1.getOrElse(id, "查无此人")
// val score: String = array1(2)
// (id, info, score)
// })
//使用广播变量
val resRDD: RDD[(String, String, String)] = scoreRDD.map((line: String) => {
val array1: Array[String] = line.split(",")
val id: String = array1(0)
//通过广播过来的大变量,进行关联数据
val map2: mutable.Map[String, String] = bc.value
val info: String = map2.getOrElse(id, "查无此人")
val score: String = array1(2)
(id, info, score)
})
SQL
/**
* spark sql处理数据的步骤
* 1、读取数据源
* 2、将读取到的DF注册成一个临时视图
* 3、使用sparkSession的sql函数,编写sql语句操作临时视图,返回的依旧是一个DataFrame
* 4、将结果写出到hdfs上
*/
/**
* spark sql是spark core的上层api,如果要想使用rdd的编程
* 可以直接通过sparkSession获取SparkContext对象
*/
spark sql的核心数据类型是DataFrame
/**
* sql语句是无法直接作用在DataFrame上面的
* 需要提前将要使用sql分析的DataFrame注册成一张表(临时视图)
*/
/**
* 如果要想使用DSL语法编写spark sql的话,需要导入两个隐式转换
*/
/**
* 读取json数据文件,转成DF
* 读取json数据的时候,是不需要指定表结构,可以自动根据json的键值来构建DataFrame
*/
/**
* spark core的核心数据结构是:RDD
* spark sql的核心数据结构是DataFrame
*/
/**
* 开窗:over
* 聚合开窗函数:sum count lag(取上一条) lead(取后一条)
* 排序开窗函数:row_number rank dense_rank
*
Streaming
/**
* Spark core: SparkContext 核心数据结构:RDD
* Spark sql: SparkSession 核心数据结构:DataFrame
* Spark streaming: StreamingContext 核心数据结构:DStream(底层封装了RDD)
*/
val conf = new SparkConf()
conf.setMaster("local[2]") // 给定核数
conf.setAppName("spark Streaming 单词统计")
val sparkContext = new SparkContext(conf)
//创建Spark Streaming的运行环境,和前两个模块是不一样的
//Spark Streaming是依赖于Spark core的环境的
//this(sparkContext: SparkContext, batchDuration: Duration)
//Spark Streaming处理之前,是有一个接收数据的过程
//batchDuration,表示接收多少时间段内的数据
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
//Spark Streaming程序理论上是一旦启动,就不会停止,除非报错,人为停止,停电等其他突然场景导致程序终止
//监控一个端口号中的数据,手动向端口号中打数据
val rids: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
//hello world
val wordsDS: DStream[String] = rids.flatMap(_.split(" "))
val kvDS: DStream[(String, Int)] = wordsDS.map((_, 1))
val resDS: DStream[(String, Int)] = kvDS.reduceByKey(_ + _)
// val resDS: DStream[(String, Int)] = rids.flatMap(_.split(" "))
// .map((_, 1))
// .reduceByKey(_ + _)
println("--------------------------------------")
resDS.print()
println("--------------------------------------")
/**
* sparkStreaming启动的方式和前两个模块启动方式不一样
*/
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
foreachRDD
/**
* foreachRDD:在DS中使用rdd的语法操作数据
* 缺点:该函数是没有返回值的
* 需求:我们在想使用DS中的RDD的同时,想要使用结束后,会得到一个新的DS
*/
tranformat
面试题:foreachRDD与transform的区别
SaveFile
//将结果存储到磁盘中
//只能设置文件夹的名字和文件的后缀
//每一批次运行,都会产生新的小文件夹,文件夹中有结果数据文件
优化
Cache
/**
* 缓存的目的是避免每一次job作业执行的时候,都需要从第一个rdd算起
* 对重复使用RDD进行缓存
* cache 设置不了缓存级别
* persist 可以设置缓存级别
*/
// stuRDD.cache() // 默认的缓存级别是MEMORY_ONLY
stuRDD.persist(StorageLevel.MEMORY_ONLY_SER)
AggregateByKey
//aggregateByKey
//aggregateByKey(zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
//zeroValue: 初始值,这个参数只会被后面第一个参数函数所使用
//seqOp: 相当于map端预聚合的逻辑
//combOp: 相当于reduce端的聚合逻辑
val resRDD3: RDD[(String, Int)] = clazzKVRDD.aggregateByKey(0)(
//相当于map端预聚合的逻辑
(a1: Int, a2: Int) => a1 + a2,
//相当于reduce端的聚合逻辑
(b1: Int, b2: Int) => b1 + b2
)
MapPartitions
/**
* 实际上针对上面的案例,我们可以针对rdd中的每一个分区创建一个工具对象,在每条数据上使用
* mapPartitions,将每一个分区中的数据封装成了一个迭代器
*/
val resRDD: RDD[(String, String, String)] = lineRDD.mapPartitions((itr: Iterator[String]) => {
println("===================创建一次对象=============================")
val sdf = new SimpleDateFormat("yyyy/MM/dd")
val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
itr.map((line: String) => {
val info: Array[String] = line.split("\t")
val t1: String = info(1)
val date: Date = sdf.parse(t1)
val t2: String = sdf2.format(date)
(info(0), t2, info(2))
})
})
Coalese
/**
* coalesce
*
* 1、默认增大分区是不会产生shuffle的
* 2、合并分区直接给分区数,不会产生shuffle
*/
val resRDD1: RDD[String] = lineRDD.coalesce(10, shuffle = true)
println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")
// resRDD1.foreach(println)
val resRDD2: RDD[String] = resRDD1.coalesce(1,shuffle = true)
println(s"resRDD2的分区数:${resRDD2.getNumPartitions}")
resRDD2.foreach(println)
/**
* Coalesce算子通常是用在合并小文件时候使用
* 对应的spark core中的话,通常使用该算子进行合并分区
*/
client模式提交作业资源调度流程
1,首先,由客户端上的Driver提交作业命令
2,向ResourceManger申请资源
3,ResourceManger会检查一些权限,资源空间,在一个相对空闲的子结点上开启一个ApplicationMaster进程
4,然后ApplicationMaster向ResourceMaster申请资源,启动Executer
5,最后由Execter反向注册给Driver,告诉Driver资源申请完毕,可以发送任务
client模式提交作业资源调度以及任务调度流程
6,当任务遇到action算子时,触发整个作业调度
7,首先,将任务提交给DAG有向无环图(DAG Scheduler)
8,在DAG Scheduler里根据rdd之间的血统,找出宽窄依赖
9,通过宽窄依赖划分stage阶段
10,根据stage阶段,将stage中的task任务封装成一个taskSet对象
11,将这些taskSet对象发送给后续的Task Scheduler
12,从DAG Scheduler中发送过来的taskSet对象中取出task任务
13,根据RDD5大特性的最后一个特性,只移动计算不移动数据,将task任务发送到对应的executor的线程池中执行。
cluster模式提交作业资源调度
cluster模式提交作业不会在客户端产生Driver进程,提交之后,提交进程就可以关闭
1、客户端向ResourceManger申请资源
2、ResourceManger会检查一些权限,资源空间,在一个i相对空闲的子结点上开启一个ApplicationMaster进程
3、这里的ApplicationMaster相当于一个Driver,在启动完之后会申请Executor资源
4、然后将task任务发送到Executor中执行
spark yarn cluster模式相较于client模式而言,
不会在提交作业的节点上产生Driver进程,而是选择一个相对空闲的子节点ApplicationMaster,这里的ApplicationMaster相当于一个Driver这样的话,就减轻了客户端的压力,可以提交更多的spark作业
提交作业的节点与Driver的启动节点不是同一个的,就会导致我们无法在提交作业的节点上看到日志,如果是yarn cluster模式提交的话,我们只能够通过yarn logs -applicationld的命令查看yarn的作业日志来看到运行结里
提高数据本地化级别
PROCESS_LOACL进程本地化
task要计算的数据在同一个Executor的进程中
NODE_LOCAL节点本地化
task要计算的数据是在同一个Worker的不同Executor进程中
task计算的数据在同一个Worker的磁盘上
Spark计算的数据来源于HDFS,那么最好的数据本地化级别就是NODE_LOCAL
NODE_PREF
比如说sparkSQL读取MySql中的数据
RACK_LOCAL机架本地化
(1)、TASK计算的数据在Worker 2的Executor进程中
(2)、TASK计算的数据在Worker 2的磁盘上
ANY
跨机架
Spark数据的本地化:计算移动,数据不移动
Spark里面的数据本地化分为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY
Spark里面数据本地化有谁负责计算?
DAG Scheduler和Task Scheduler
Spqrk调优
避免创建重复的RDD
尽可能复用同一个RDD
对多次使用的RDD进行持久化 <===
尽量避免使用shuffle类算子
使用map-side预聚合的shuffle操作
使用高性能的算子 <===
广播大变量<===
使用Kryo优化序列化性能
优化数据结构
使用高性能的库fastutil
数据倾斜的七种解决方案(面试)
使用Hive ETL预处理数据===》
在Spark作业进行之前,通过Hive来对数据进行预处理,将Spark需要做的join或者聚合操作提前到Hive中就可以避免数据倾斜了,不过这种情况适用于Hive表中数据分布不均,且对表频繁的执行分析操作。
过滤少数导致倾斜的key===》
如果发现导致倾斜的key就少数几个,对计算本身影响不大,可以通过Spark SQL中的Where对其进行过滤或者在Spark Core中对RDD执行filter算子过滤掉这些key
提高shuffle操作的并行度
通过设置spark.sql.shuffle.partitions,增加shuffle read task 的数量,让每个task处理更少的数据,从而缩短task的执行时间
双重聚合
这个方案就是实现两阶段聚合,先进行局部聚合,先给key打上一个随机数,然后对其先进行一次reduceByKey操作,再将得到的结果进行全局聚合,就可以得到全局聚合了
将reduce join转为map join
不使用join算子进行连接操作,而通过Broadcast变量与map类算子实现join操作, 进而规避掉shuffle类的操作,避免数据倾斜的发生
采样倾斜key并分拆join操作==》
1、对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个 key的数量,计算出来数据量最大的是哪几个key。
2、然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以 内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
3、接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数 据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个 RDD。
4、再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打 散成n份,分散到多个task中去进行join了。
5、而另外两个普通的RDD就照常join即可。
6、最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
使用随机前缀和扩容RDD进行join
通过查看RDD/Hive表中的数据分布情况,找到那个造成 数据倾斜的RDD/Hive表,
然后将该RDD的每条数据都打上一个n以内的随机前缀,同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一 个0~n的前缀。
最后将两个处理后的RDD进行join即可