spark第二天
1、打包代码到yarn上运行
将代码提交到Yarn.上运行
1、将setMaster代码注释,使用提交命令设置运行方式
2、修改输入输出路径,并准备数据
3、打包上传至服务器
4、使用spark-submit命令提交任务
2、spark客户端模式&集群模式
client模式
cluster模式
3、流程示意
分布式文件系统(fliesystem)--加载数据集
transformation延迟执行--针对RDD的操作
Action触发执行
spark转化算子
* 在Spark所有的操作可以分为两类:
* 1、Transformation操作(算子)
* 2、Action操作(算子)
* 转换算子是懒执行的,需要由Action算子触发执行
* 每个Action算子会触发一个Job
*
* Spark的程序的层级划分:
* Application --> Job --> Stage --> Task
*
* 怎么区分Transformation算子和Action算子?
* 看算子的返回值是否还是RDD,如果是由一个RDD转换成另一个RDD,则该算子是转换算子
* 如果由一个RDD得到其他类型(非RDD类型或者没有返回值),则该算子是行为算子
* 在使用Spark处理数据时可以大体分为三个步骤:
* 1、加载数据并构建成RDD
* 2、对RDD进行各种各样的转换操作,即调用转换算子
* 3、使用Action算子触发Spark任务的执行
/**
* map算子:转换算子
* 需要接受一个函数f
* 函数f:参数的个数只有一个,类型为RDD中数据的类型 => 返回值类型自己定义
* 可以将函数f作用在RDD中的每一条数据上,需要函数f必须有返回值,最终会得到一个新的RDD
* 传入一条数据得到一条数据
*/
/**
* flatmap算子:转换算子
* 同map类似,只不过所接受的函数f需要返回一个可以遍历的类型
* 最终将函数的返回值进行展开(扁平化处理),最终会得到一个新的RDD
* 传入一条数据得到多条数据
*/
/**
* fliter算子:转换算子
* 一般用于过滤数据
* 需要接受一个函数f
* 函数f:参数的个数只有一个,类型为RDD中数据的类型 => 返回值类型必须为Boolean
* 最终会基于函数f返回的Boolean值进行过滤,最终会得到一个新的RDD
* 如果为TRUE,返回数据
* 如果为FLASE,过滤数据
*/
/**
* sample:转换算子
* 用于对数据进行取样
* 总共有三个参数:
* withReplacement:有无放回
* fraction:抽样的比例(这个比例并不是精确的,因为抽样是随机的)
* seed:随机数种子
*/
/**
* mapValues:转换算子
* 同map类似,只不过mapValues需要对KV格式的RDD的Value进行遍历处理
*/
/**
* join:转换算子
* 需要作用在两个KV格式的RDD上,会将相同的Key的数据关联在一起
*/
/**
* union:转换算子,用于将两个相类型的RDD进行连接
*/
/**
* groupBy:按照某个字段进行分组
*/
/**
* groupByKey:转换算子,需要作用在KV格式的RDD上
*/
/**
* reduceByKey:转换算子,需要作用在KV格式的RDD上,不仅能实现分组,还能实现聚合
* 需要接受一个函数f
* 函数f:两个参数,参数的类型同RDD的Value的类型一致,最终需要返回同RDD的Value的类型一致值
* 实际上函数f可以看成一个聚合函数
* 常见的聚合函数(操作):max、min、sum、count、avg
* reduceByKey可以实现Map端的预聚合,类似MR中的Combiner
* 并不是所有的操作都能使用预聚合,例如avg就无法实现
*/
/**
* aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合
* 可以弥补reduceByKey无法实现avg操作
* zeroValue:初始化的值,类型自定义,可以是数据容器
* seqOp:在组内(每个分区内部即每个Map任务)进行的操作,相当是Map端的预聚合操作
* combOp:在组之间(每个Reduce任务之间)进行的操作,相当于就是最终每个Reduce的操作
*/
/**
* cartesian:转换算子,可以对两个RDD做笛卡尔积
* 当数据重复时 很容易触发笛卡尔积 造成数据的膨胀
*/
/**
* sortBy:转换算子 可以指定一个字段进行排序 默认升序
*/
/**
* // 当某些情况下 想在算子内部对算子外部的变量进行操作时 需要使用累加器
// 在Driver端创建一个累加器
val longAcc: LongAccumulator = sc.longAccumulator
// 在算子内部使用算子外部变量时,外部的变量会以副本的形式跟着Task发送到Executor中去执行
// 在算子内部操作其实是外部变量的副本
// 使用累加器
longAcc.add(1)
println("累加器的值为:" + longAcc.value)
spark行为算子
/**
* take : Action算子,可以将指定条数的数据转换成Scala中的Array
*
*/
/**
* collect : Action算子,可以将RDD中所有的数据转换成Scala中的Array
*/
/**
* count : Action算子,统计RDD中数据的条数
*/
/**
* reduce : Action算子,将所有的数据作为一组进行聚合操作
*/
/**
* save相关:
* saveAsTextFile、saveAsObjectFile
*/
为什么算子外部创建的连接不能够在算子内部使用??
/**
* Spark代码虽然在编程时看起来是一个完整的程序就一个部分
* 而且在IDEA中也不会报错,也能正常package,一运行时才会报错
* 原因:
* Spark代码是分为两个部分:
* 1、算子外部:Driver端
* 2、算子内部:以Task的形式发送到Executor中去执行
*
* 连接是不能够被序列化的,Driver端和Executor属于不同的JVM进程,甚至在不同的节点上,
* 所以在算子外部创建的连接不能够在算子内部使用
*/
高性能转化算子
groupByKey:转换算子,需要作用在KV格式的RDD上
aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合,可以弥补reduceByKey无法实现avg操作
优化:
如何选择foreachPartition/mapPartitions?
foreachpartition
减少连接的问题,通过分区进行连接
foreachpartition对每个分区进行一次操作
mapPartitions
如果只是想将数据保存到外部系统-->foreachPartition
使用mapPartitions进行优化,减少建立连接的次数,做到每个Partition公用一个连接
当需要同外部系统建立连接时:
如果需要从外部获取数据并进行后续操作-->mapPartitions
Reduce Join
直接关联是Reduce Join效率比较低
当大表关联小表的时,可以将小表的数据广播给每个Executor,实现MapJoin
当大表的数据量比较大的时候-->切片的数量很多-->Task的数量很多
如果直接在算子内部使用算子外部的变量,会导致算子外部的变量复制的次数等于Task的次数
又因为Task最终是在每个Executor中执行的,所以可以将变量广播到每个Executor中
当Task执行时 只需要向Executor获取数据即可
前提条件:
Executor的数量 << Task的数量
4、spark五大特性
RDD五大特性:
1、RDD是由一系列分区组成的
2、Task是作用在每一个分区上的,每个task任务至少处理一个分区
3、RDD之间是有一系列的依赖关系的,依赖可以基于有无shuffle阶段分为:宽依赖和窄依赖,通过宽窄依赖可以划分不同的stage(一组可以进行并行计算的Task,可以使map任务,也可以是reduce任务,在spark中统称为stage)
4、算子中有几个特殊的算子只能作用在KV格式的RDD上
5、spark会给每个服务提供最佳的计算位置,移动计算不移动数据
5、Cache和Broadcast以及CheckPoint的区别:
checkpoint:
broadcast:
Cache:当一个RDD被使用多次时,可以进行缓存,如果内存不够,容易造成OOM,需要考虑缓存的策略,如果内存足够 --> cache --> MEMORY_ONLY,如果内存不够 --> persist --> MEMORY_AND_DISK_SER (尽可能将数据放入内存)。常选用内存,序列化,磁盘的。
Broadcast:直接关联是reduce join,效率比较低,当大表关联小表时,可以将小表的数据广播给每个executor,实现mapjoin; 当大表的数据量比较大的时候-->切片的数量很多-->Task的数量很多, 如果直接在算子内部使用算子外部的变量,会导致算子外部的变量复制的次数等于Task的次数, 又因为Task最终是在每个Executor中执行的,所以可以将变量广播到每个Executor中,当Task执行时 只需要向Executor获取数据即可。
前提条件:
Executor的数量 << Task的数量>>
CheckPoint:将多次使用的数据启动一个job将数据写入高可靠的文件系统中(HDFS),安全可靠,占用内存多,效率低,cache基于内存,读写效率高,不可靠,容易丢失。
6、血统(lineage)
每一个看做一个RDD,存在一定依赖关系进行恢复数据
利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD 中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。 相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的 特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时 ,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制 了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高 效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父 RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父 RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父 RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与 Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出 节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上 其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开 销要远小于Wide Dependencies的数据重算开销。
容错
在RDD计算,通过checkpoint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是 logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通 过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算 生成丢失的分区数据。
7、专业术语
Application:基于Spark的应用程序,包含了driver程序和 集群上的executor
DriverProgram:运行main函数并且新建SparkContext的程序
ClusterManager:在集群上获取资源的外部服务(例如 standalone,Mesos,Yarn )
WorkerNode:集群中任何可以运行应用用代码的节点
Executor:是在一个workernode上为某应用用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用用都有各自自独立的executors
Task:被送到某个executor上的执行单元
Job包含很多任务的并行计算的task,可以看做和Spark的action对应,每个action都会触发一个job任务
Stage一个Job会被拆分很多组任务,每组任务被称为Stage(就像MapReduce分map任务和reduce任务一样)
8、任务调度
spark调度器:
调度器根据RDD的结构信息为每个动作确定有效的执行计划 。调度器的接口是runJob函数,参数为RDD及其分区集,和 一个RDD分区上的函数。该接口足以表示Spark中的所有动作 (即count、collect、save等)。
总的来说,我们的调度器跟Dryad类似,但我们还考虑了哪些 RDD分区是缓存在内存中的。调度器根据目标RDD的Lineage 图创建一个由 stage构成的无回路有向图(DAG)。每个 stage内部尽可能多地包含一组具有窄依赖关系的转换,并将 它们流水线并行化(pipeline)。 stage的边界有两种情况: 一是宽依赖上的Shuffle操作;二是已缓存分区,它可以缩短 父RDD的计算过程。例如图6。父RDD完成计算后,可以在 stage内启动一组任务计算丢失的分区。
job调度流程:
DAG scheduler:
基于Stage构建DAG,决定每个任务的最佳位置
记录哪个RDD或者Stage输出被物化
将taskset传给底层调度器TaskScheduler
重新提交shuffle输出丢失的stage
Task scheduler:
提交taskset(一组并行task)到集群运行并汇报结果
出现shuffle输出lost要报告fetchfailed错误
碰到straggle任务需要放到别的节点上重试 ,如果碰到任务运行比较久,然后卡住了,这时候可以换一个executor进行运行,这时候谁运行出来就用谁作为结果。
为每一一个TaskSet维护一一个TaskSetManager(追踪本地性及错误信息) .
练习
9、PageRank
含义:
算法原理:
➢入链====投票
PageRank让链接来"投票",到-一个页面的超链接相当于对该页投一票
➢入链数量
如果一个页面节点接收到的其他网页指向的入链数量越多,那么这个页面越重要。
➢入链质量
指向页面A的入链质量不同,质量高的页面会通过链接向其他页面传递更多的权重。所以越是质量高的页面指向页面A,则页面A越重要。
计算过程:
➢初始值
●每个页面设置相同的PR值
Google的PageRank算法给每个页面的PR初始值为1。
➢迭代递归计算(收敛)
● Google不断的重复计算每个页面的PageRank。 那么经过不断的重复计算,这
些页面的PR值会趋向于稳定,也就是收敛的状态。
●在具体企业应用中怎么样确定收敛标准?
1.每个页面的PR值和上一次计算的PR相等
2.设定一个差值指标(0.0001)。当所有页面和上一次计算的PR差值平
均小于该标准时,则收敛。
3.设定一个百分比(99%) ,当99%的页面和上一次计算的PR相等
源码:
package sql
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo25PageRank {
def main(args: Array[String]): Unit = {
// 构建Spark Context环境
// 创建SparkConf
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo25PageRank")
conf.setMaster("local") // 设置Spark的运行方式
// 构建Spark的上下文环境
val sc: SparkContext = new SparkContext(conf)
// 定义收敛条件:当PR值的平均差值小于该值时就停止循环
val stopVal: Double = 0.0001
var flag: Boolean = true
// 加载数据
val pageRDD: RDD[String] = sc.textFile("F:\\IdeaProjects\\Spark\\src\\main\\data\\pageRank.txt")
// 切分数据,将指向的页面用List/Set接受
val pagesSetRDD: RDD[(String, Set[String])] = pageRDD.map(line => {
val splits: Array[String] = line.split("->")
val page: String = splits(0)
val pageSet: Set[String] = splits(1).split(",").toSet
(page, pageSet)
})
// 给每个页面赋上一个初始的PR值(默认置为1),最终返回一个三元组
var pagePRRDD: RDD[(String, Set[String], Double)] = pagesSetRDD.map(kv => (kv._1, kv._2, 1.0))
while (flag) {
// 将页面的PR值平均分给每一个指向的页面
val pageAvgPRRDD: RDD[(String, Double)] = pagePRRDD.flatMap(t3 => {
val pageAvgPR: Double = t3._3 / t3._2.size
t3._2.map(page => (page, pageAvgPR))
})
// 统计每个页面新的PR值
val newPagePRRDD: RDD[(String, Double)] = pageAvgPRRDD.reduceByKey(_ + _)
// 计算每个页面新的PR值和上一次的PR值的平均差值,当平均差值小于0.0001时停止循环,即收敛
val pageCnt: Long = pageRDD.count() // 页面的数量
// 关联上一次每个页面的PR值
val oldPRKVRDD: RDD[(String, Double)] = pagePRRDD.map(t3 => (t3._1, t3._3))
val sumPRDiff: Double = newPagePRRDD.join(oldPRKVRDD)
// 计算每个页面PR值的差值
.map {
case (page: String, (newPR: Double, oldPR: Double)) =>
Math.abs(newPR - oldPR)
}.sum() // 计算差值之和
// 计算平均差值
val avgPRDiff: Double = sumPRDiff / pageCnt
println(s"平均差值为:$avgPRDiff")
if (avgPRDiff < stopVal) {
flag = false
}
// 将新的PR值赋给每个页面,重复计算,直到收敛
pagePRRDD = pagesSetRDD.join(newPagePRRDD).map(t2 => (t2._1, t2._2._1, t2._2._2))
pagePRRDD.foreach(println)
}
}
}
10、sparkPi
1、在正方形中随机打点:只要保证生成的点在x和y坐标都在(-1,1)范围内即可
2、根据正方形面积公式:推导:圆的面积/正方形的面积=pi/4=落在圆内的点的数量/生成的所有点的数量
当生成点的数量无限多时,最终pi更加精确
3、判断落在圆形内:计算点到原点的距离,当距离小于等于1(圆的半径)时,则该点在圆内,
4、pi=4*落在圆内点的数量/生成的所有点的数量
源码:
package sql
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
object Demo17SparkPi {
def main(args: Array[String]): Unit = {
/**
* 通过Spark代码实现PI的计算
*/
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo17SparkPi")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val pointNum: Int = 100000000
val seqRDD: RDD[Int] = sc.parallelize(1 to pointNum)
val inCirclePointNum: Long = seqRDD
// 随机生成范围在-1,1的点
.map(i => {
(Random.nextDouble() * 2 - 1, Random.nextDouble() * 2 - 1)
})
// 过滤出在圆内的点
.filter(xy => {
// 计算到原点的距离
val distance: Double = xy._1 * xy._1 + xy._2 * xy._2
distance <= 1
})
// 统计在圆内的点的数量
.count()
// 根据公式推导Π的值
println("Π的值为:" + inCirclePointNum * 4.0 / pointNum)
}
}
标签:val,分区,sparkCore,RDD,算子,数据,页面
From: https://www.cnblogs.com/tanggf/p/16840524.html