首页 > 其他分享 >sparkCore

sparkCore

时间:2022-10-30 09:59:06浏览次数:62  
标签:val 分区 sparkCore RDD 算子 数据 页面

spark第二天

1、打包代码到yarn上运行

将代码提交到Yarn.上运行
1、将setMaster代码注释,使用提交命令设置运行方式
2、修改输入输出路径,并准备数据
3、打包上传至服务器
4、使用spark-submit命令提交任务

2、spark客户端模式&集群模式

client模式

yarn-client

cluster模式

yarn-cluster

3、流程示意

分布式文件系统(fliesystem)--加载数据集
transformation延迟执行--针对RDD的操作
Action触发执行
spark转化算子
 	 * 在Spark所有的操作可以分为两类:
     * 1、Transformation操作(算子)
     * 2、Action操作(算子)

     * 转换算子是懒执行的,需要由Action算子触发执行
     * 每个Action算子会触发一个Job
     *
     * Spark的程序的层级划分:
     * Application --> Job --> Stage --> Task
     *
     * 怎么区分Transformation算子和Action算子?
     * 看算子的返回值是否还是RDD,如果是由一个RDD转换成另一个RDD,则该算子是转换算子
     * 如果由一个RDD得到其他类型(非RDD类型或者没有返回值),则该算子是行为算子

     * 在使用Spark处理数据时可以大体分为三个步骤:
     * 1、加载数据并构建成RDD
     * 2、对RDD进行各种各样的转换操作,即调用转换算子
     * 3、使用Action算子触发Spark任务的执行

/**
     * map算子:转换算子
     * 需要接受一个函数f
     * 函数f:参数的个数只有一个,类型为RDD中数据的类型 => 返回值类型自己定义
     * 可以将函数f作用在RDD中的每一条数据上,需要函数f必须有返回值,最终会得到一个新的RDD
     * 传入一条数据得到一条数据
     */
/**
     * flatmap算子:转换算子
     * 同map类似,只不过所接受的函数f需要返回一个可以遍历的类型
     * 最终将函数的返回值进行展开(扁平化处理),最终会得到一个新的RDD
     * 传入一条数据得到多条数据
     */
/**
     * fliter算子:转换算子
     * 一般用于过滤数据
     * 需要接受一个函数f
     * 函数f:参数的个数只有一个,类型为RDD中数据的类型 => 返回值类型必须为Boolean
     * 最终会基于函数f返回的Boolean值进行过滤,最终会得到一个新的RDD
     * 如果为TRUE,返回数据
     * 如果为FLASE,过滤数据
     */
     
/**
     * sample:转换算子
     * 用于对数据进行取样
     * 总共有三个参数:
     * withReplacement:有无放回
     * fraction:抽样的比例(这个比例并不是精确的,因为抽样是随机的)
     * seed:随机数种子
     */
/**
     * mapValues:转换算子
     * 同map类似,只不过mapValues需要对KV格式的RDD的Value进行遍历处理
     */
/**
     * join:转换算子
     * 需要作用在两个KV格式的RDD上,会将相同的Key的数据关联在一起
     */
/**
     * union:转换算子,用于将两个相类型的RDD进行连接
     */
/**
     * groupBy:按照某个字段进行分组
     */
/**
     * groupByKey:转换算子,需要作用在KV格式的RDD上
     */
/**
     * reduceByKey:转换算子,需要作用在KV格式的RDD上,不仅能实现分组,还能实现聚合
     * 需要接受一个函数f
     * 函数f:两个参数,参数的类型同RDD的Value的类型一致,最终需要返回同RDD的Value的类型一致值
     * 实际上函数f可以看成一个聚合函数
     * 常见的聚合函数(操作):max、min、sum、count、avg
     * reduceByKey可以实现Map端的预聚合,类似MR中的Combiner
     * 并不是所有的操作都能使用预聚合,例如avg就无法实现
     */
/**
     * aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合
     * 可以弥补reduceByKey无法实现avg操作 
     * zeroValue:初始化的值,类型自定义,可以是数据容器
     * seqOp:在组内(每个分区内部即每个Map任务)进行的操作,相当是Map端的预聚合操作
     * combOp:在组之间(每个Reduce任务之间)进行的操作,相当于就是最终每个Reduce的操作
     */     
/**
     * cartesian:转换算子,可以对两个RDD做笛卡尔积
     * 当数据重复时 很容易触发笛卡尔积 造成数据的膨胀
     */    
/**
     * sortBy:转换算子 可以指定一个字段进行排序 默认升序
     */
/**
	* // 当某些情况下 想在算子内部对算子外部的变量进行操作时 需要使用累加器
      // 在Driver端创建一个累加器
      val longAcc: LongAccumulator = sc.longAccumulator
       // 在算子内部使用算子外部变量时,外部的变量会以副本的形式跟着Task发送到Executor中去执行
      // 在算子内部操作其实是外部变量的副本
      // 使用累加器
      longAcc.add(1)
      println("累加器的值为:" + longAcc.value)
spark行为算子
/**
     * take : Action算子,可以将指定条数的数据转换成Scala中的Array
     *
     */
/**
     * collect : Action算子,可以将RDD中所有的数据转换成Scala中的Array
     */     
/**
     * count : Action算子,统计RDD中数据的条数
     */
/**
     * reduce : Action算子,将所有的数据作为一组进行聚合操作
     */ 
/**
     * save相关:
     * saveAsTextFile、saveAsObjectFile
     */     
为什么算子外部创建的连接不能够在算子内部使用??
/**
     * Spark代码虽然在编程时看起来是一个完整的程序就一个部分
     * 而且在IDEA中也不会报错,也能正常package,一运行时才会报错
     * 原因:
     * Spark代码是分为两个部分:
     * 1、算子外部:Driver端
     * 2、算子内部:以Task的形式发送到Executor中去执行
     *
     * 连接是不能够被序列化的,Driver端和Executor属于不同的JVM进程,甚至在不同的节点上,
     * 所以在算子外部创建的连接不能够在算子内部使用
     */
高性能转化算子
groupByKey:转换算子,需要作用在KV格式的RDD上

aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合,可以弥补reduceByKey无法实现avg操作


优化:
如何选择foreachPartition/mapPartitions?

foreachpartition
减少连接的问题,通过分区进行连接
foreachpartition对每个分区进行一次操作

mapPartitions
如果只是想将数据保存到外部系统-->foreachPartition
使用mapPartitions进行优化,减少建立连接的次数,做到每个Partition公用一个连接

当需要同外部系统建立连接时:
如果需要从外部获取数据并进行后续操作-->mapPartitions

Reduce Join
直接关联是Reduce Join效率比较低
当大表关联小表的时,可以将小表的数据广播给每个Executor,实现MapJoin

当大表的数据量比较大的时候-->切片的数量很多-->Task的数量很多
如果直接在算子内部使用算子外部的变量,会导致算子外部的变量复制的次数等于Task的次数
又因为Task最终是在每个Executor中执行的,所以可以将变量广播到每个Executor中
当Task执行时 只需要向Executor获取数据即可
     
前提条件:
Executor的数量 << Task的数量
     

4、spark五大特性

Spark RDD五大特性

RDD五大特性:
1、RDD是由一系列分区组成的
2、Task是作用在每一个分区上的,每个task任务至少处理一个分区
3、RDD之间是有一系列的依赖关系的,依赖可以基于有无shuffle阶段分为:宽依赖和窄依赖,通过宽窄依赖可以划分不同的stage(一组可以进行并行计算的Task,可以使map任务,也可以是reduce任务,在spark中统称为stage)
4、算子中有几个特殊的算子只能作用在KV格式的RDD上
5、spark会给每个服务提供最佳的计算位置,移动计算不移动数据

5、Cache和Broadcast以及CheckPoint的区别:

checkpoint:

checkpoint

broadcast:

BlockManager

Cache:当一个RDD被使用多次时,可以进行缓存,如果内存不够,容易造成OOM,需要考虑缓存的策略,如果内存足够 --> cache --> MEMORY_ONLY,如果内存不够 --> persist --> MEMORY_AND_DISK_SER (尽可能将数据放入内存)。常选用内存,序列化,磁盘的。


Broadcast:直接关联是reduce join,效率比较低,当大表关联小表时,可以将小表的数据广播给每个executor,实现mapjoin; 当大表的数据量比较大的时候-->切片的数量很多-->Task的数量很多, 如果直接在算子内部使用算子外部的变量,会导致算子外部的变量复制的次数等于Task的次数, 又因为Task最终是在每个Executor中执行的,所以可以将变量广播到每个Executor中,当Task执行时 只需要向Executor获取数据即可。
     前提条件:
     Executor的数量 << Task的数量>>
     
     
CheckPoint:将多次使用的数据启动一个job将数据写入高可靠的文件系统中(HDFS),安全可靠,占用内存多,效率低,cache基于内存,读写效率高,不可靠,容易丢失。

6、血统(lineage)

image-20221026144905947

每一个看做一个RDD,存在一定依赖关系进行恢复数据
利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD 中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。 相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的 特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时 ,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制 了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。 
RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高 效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父 RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父 RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父 RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与 Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出 节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上 其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开 销要远小于Wide Dependencies的数据重算开销。 
容错 
在RDD计算,通过checkpoint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是 logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通 过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算 生成丢失的分区数据。 

7、专业术语

Application:基于Spark的应用程序,包含了driver程序和 集群上的executor

DriverProgram:运行main函数并且新建SparkContext的程序 

ClusterManager:在集群上获取资源的外部服务(例如 standalone,Mesos,Yarn )

WorkerNode:集群中任何可以运行应用用代码的节点 

Executor:是在一个workernode上为某应用用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用用都有各自自独立的executors 

Task:被送到某个executor上的执行单元

Job包含很多任务的并行计算的task,可以看做和Spark的action对应,每个action都会触发一个job任务 

Stage一个Job会被拆分很多组任务,每组任务被称为Stage(就像MapReduce分map任务和reduce任务一样) 

8、任务调度

spark调度器:

调度器根据RDD的结构信息为每个动作确定有效的执行计划 。调度器的接口是runJob函数,参数为RDD及其分区集,和 一个RDD分区上的函数。该接口足以表示Spark中的所有动作 (即count、collect、save等)。

总的来说,我们的调度器跟Dryad类似,但我们还考虑了哪些 RDD分区是缓存在内存中的。调度器根据目标RDD的Lineage 图创建一个由 stage构成的无回路有向图(DAG)。每个 stage内部尽可能多地包含一组具有窄依赖关系的转换,并将 它们流水线并行化(pipeline)。 stage的边界有两种情况: 一是宽依赖上的Shuffle操作;二是已缓存分区,它可以缩短 父RDD的计算过程。例如图6。父RDD完成计算后,可以在 stage内启动一组任务计算丢失的分区。

job调度流程:

image-20221026155455531

image-20221026160518937

image-20221026160430848

image-20221026160557385

image-20221026150458644

DAG scheduler:

基于Stage构建DAG,决定每个任务的最佳位置
记录哪个RDD或者Stage输出被物化
将taskset传给底层调度器TaskScheduler
重新提交shuffle输出丢失的stage

Task scheduler:

提交taskset(一组并行task)到集群运行并汇报结果

出现shuffle输出lost要报告fetchfailed错误

碰到straggle任务需要放到别的节点上重试 ,如果碰到任务运行比较久,然后卡住了,这时候可以换一个executor进行运行,这时候谁运行出来就用谁作为结果。

为每一一个TaskSet维护一一个TaskSetManager(追踪本地性及错误信息) .

练习

9、PageRank

含义:
算法原理:
➢入链====投票
PageRank让链接来"投票",到-一个页面的超链接相当于对该页投一票
➢入链数量
如果一个页面节点接收到的其他网页指向的入链数量越多,那么这个页面越重要。
➢入链质量
指向页面A的入链质量不同,质量高的页面会通过链接向其他页面传递更多的权重。所以越是质量高的页面指向页面A,则页面A越重要。

计算过程:
➢初始值
●每个页面设置相同的PR值
Google的PageRank算法给每个页面的PR初始值为1。
➢迭代递归计算(收敛)
● Google不断的重复计算每个页面的PageRank。 那么经过不断的重复计算,这
些页面的PR值会趋向于稳定,也就是收敛的状态。
●在具体企业应用中怎么样确定收敛标准?
1.每个页面的PR值和上一次计算的PR相等
2.设定一个差值指标(0.0001)。当所有页面和上一次计算的PR差值平
均小于该标准时,则收敛。
3.设定一个百分比(99%) ,当99%的页面和上一次计算的PR相等

image-20221026211951119

源码:
package sql

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo25PageRank {
  def main(args: Array[String]): Unit = {
    // 构建Spark Context环境
    // 创建SparkConf
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo25PageRank")
    conf.setMaster("local") // 设置Spark的运行方式
    // 构建Spark的上下文环境
    val sc: SparkContext = new SparkContext(conf)

    // 定义收敛条件:当PR值的平均差值小于该值时就停止循环
    val stopVal: Double = 0.0001
    var flag: Boolean = true

    // 加载数据
    val pageRDD: RDD[String] = sc.textFile("F:\\IdeaProjects\\Spark\\src\\main\\data\\pageRank.txt")

    // 切分数据,将指向的页面用List/Set接受
    val pagesSetRDD: RDD[(String, Set[String])] = pageRDD.map(line => {
      val splits: Array[String] = line.split("->")
      val page: String = splits(0)
      val pageSet: Set[String] = splits(1).split(",").toSet
      (page, pageSet)
    })

    // 给每个页面赋上一个初始的PR值(默认置为1),最终返回一个三元组
    var pagePRRDD: RDD[(String, Set[String], Double)] = pagesSetRDD.map(kv => (kv._1, kv._2, 1.0))

    while (flag) {

      // 将页面的PR值平均分给每一个指向的页面
      val pageAvgPRRDD: RDD[(String, Double)] = pagePRRDD.flatMap(t3 => {
        val pageAvgPR: Double = t3._3 / t3._2.size
        t3._2.map(page => (page, pageAvgPR))
      })

      // 统计每个页面新的PR值
      val newPagePRRDD: RDD[(String, Double)] = pageAvgPRRDD.reduceByKey(_ + _)

      // 计算每个页面新的PR值和上一次的PR值的平均差值,当平均差值小于0.0001时停止循环,即收敛
      val pageCnt: Long = pageRDD.count() // 页面的数量

      // 关联上一次每个页面的PR值
      val oldPRKVRDD: RDD[(String, Double)] = pagePRRDD.map(t3 => (t3._1, t3._3))
      val sumPRDiff: Double = newPagePRRDD.join(oldPRKVRDD)
        // 计算每个页面PR值的差值
        .map {
          case (page: String, (newPR: Double, oldPR: Double)) =>
            Math.abs(newPR - oldPR)
        }.sum() // 计算差值之和

      // 计算平均差值
      val avgPRDiff: Double = sumPRDiff / pageCnt
      println(s"平均差值为:$avgPRDiff")
          
      if (avgPRDiff < stopVal) {
        flag = false
      }
      // 将新的PR值赋给每个页面,重复计算,直到收敛
      pagePRRDD = pagesSetRDD.join(newPagePRRDD).map(t2 => (t2._1, t2._2._1, t2._2._2))
      pagePRRDD.foreach(println)
    }
  }
}

10、sparkPi

Spark PI

1、在正方形中随机打点:只要保证生成的点在x和y坐标都在(-1,1)范围内即可
2、根据正方形面积公式:推导:圆的面积/正方形的面积=pi/4=落在圆内的点的数量/生成的所有点的数量
当生成点的数量无限多时,最终pi更加精确
3、判断落在圆形内:计算点到原点的距离,当距离小于等于1(圆的半径)时,则该点在圆内,
4、pi=4*落在圆内点的数量/生成的所有点的数量

源码:
package sql

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

object Demo17SparkPi {
  def main(args: Array[String]): Unit = {
    /**
     * 通过Spark代码实现PI的计算
     */
    
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo17SparkPi")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
               
    val pointNum: Int = 100000000
    val seqRDD: RDD[Int] = sc.parallelize(1 to pointNum)
    val inCirclePointNum: Long = seqRDD
      // 随机生成范围在-1,1的点
      .map(i => {
        (Random.nextDouble() * 2 - 1, Random.nextDouble() * 2 - 1)
      })
      // 过滤出在圆内的点
      .filter(xy => {
        // 计算到原点的距离
        val distance: Double = xy._1 * xy._1 + xy._2 * xy._2
        distance <= 1
      })
      // 统计在圆内的点的数量
      .count()

    // 根据公式推导Π的值
    println("Π的值为:" + inCirclePointNum * 4.0 / pointNum)
  }
}

标签:val,分区,sparkCore,RDD,算子,数据,页面
From: https://www.cnblogs.com/tanggf/p/16840524.html

相关文章

  • SparkCore(四)
    【理解】Spark内核原理RDD依赖RDD的5大特性中,第三个是【与父RDD的依赖关系】依赖关系可以按照是否有shuffle进一步分类窄依赖:【没有】shuffle,父RDD的一个分......
  • SparkCore(二)
    RDD的API操作/方法/算子比如有一个100M的csv文件,需要对它的每个元素操作,比如先+1,再平方,结果保存另一个csv文件。如下图,如果用传统python思维,不仅每个中间容器占用内存,消......
  • SparkCore:累加器和广播变量
    累加器累加器(分布式共享只写变量):用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每......
  • sparkcore案例四:统计每个省份的用户访问量
    题目:/***统计每个省份的用户访问量,最终要求将不同省份用户访问量存放到不同的分区中分区存放规则如下*省份是以包含山0*如果省份包含海1*其他......
  • sparkcore案例三:获取每一种状态码对应的访问量
    题目描述:/***清洗完成的数据中包含一个用户的响应状态码,获取每一种状态码对应的访问量*1、读取清洗完成的数据成为RDD[String]*2、可以把上一步得到的RDD......
  • SparkCore系列(四)函数大全
    有了上面三篇的函数,平时开发应该问题不大了。这篇的主要目的是把所有的函数都过一遍,深入RDD的函数RDD函数大全数据准备        val sparkconf = new Spa......