首页 > 其他分享 >SparkCore:累加器和广播变量

SparkCore:累加器和广播变量

时间:2022-10-05 15:12:59浏览次数:55  
标签:map word String val sum SparkCore 累加器 广播

累加器

累加器(分布式共享只写变量):用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 Task 更新这些副本的值之后,会传回 Driver 端进行 merge。

ACC原理1

list(1,2,3,4) 进行普通的累加,Driver 端会将任务分给 Executor 端,Executor 端会对自己的数据进行累加,但是并不会将累加完的结果返回给 Driver 端。所以 Driver 端输出的结果为 0。

ACC原理2

而将 sum 变量声明为累加器数据结构,Executor 端执行完累加之后会自动将结果返回给 Driver 端进行 merge.

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
// 获取系统累加器
val sum: LongAccumulator = sc.longAccumulator("sum")
rdd.foreach(
  number => {
    sum.add(number)
  }
)
println(sum.value)

// 如果将foreach改成map,会出现少加现象。
// 少加:转换算子中调用累加器,如果没有行动算子的话,就不会执行。
rdd.map(
  number => {
    sum.add(number)
    number
  }
)
println(sum.value)

// 如果执行两次collect,会出现多加现象,因为累加器是全局共享的。
val mapRdd = rdd.map(
  number => {
    sum.add(number)
    number
  }
)
mapRdd.collect()
mapRdd.collect()
println(sum.value)

自定义累加器

object Acc {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("acc")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[String] = sc.makeRDD(List("hello", "hello", "spark"))
    val accumulator: MyAccumulator = new MyAccumulator()
    // 向Spark注册
    sc.register(accumulator, "accumulator")
    rdd.foreach(
      word => {
        accumulator.add(word)
      }
    )
    println(accumulator.value)

    sc.stop()
  }


  // 继承AccumulatorV2,定义泛型。
  // IN:累加器输入的数据类型 String
  // OUT:累加器返回的数据类型 mutable.Map[String, Long]
  class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {

    private var map = mutable.Map[String, Long]()

    // 判断是否为初始状态
    override def isZero: Boolean = {
      map.isEmpty
    }

    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
      new MyAccumulator()
    }

    override def reset(): Unit = {
      map.clear()
    }

    // 获取累加器需要计算的值
    override def add(word: String): Unit = {
      val newCnt = map.getOrElse(word, 0L) + 1
      map.update(word, newCnt)
    }

    // Driver 合并多个累加器
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
      val map1 = this.map
      val map2 = other.value
      map2.foreach{
        case (word, count) => {
          val newCount = map1.getOrElse(word, 0L) + count
          map1.update(word, newCount)
        }
      }
    }

    // 累加器结果
    override def value: mutable.Map[String, Long] = map
  }
}

广播变量

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大只读值,以供一个或者多个 Spark 操作使用。

广播变量

  • 闭包数据,都是以 Task 为单位发送的,每个任务中包含闭包数据。这样会导致一个 Executor 中含有大量重复的数据,并且占用大量的内存。
  • Executor 其实就是一个 JVM,所以在启动的时候,会自动分配内存,将任务中的闭包数据放置在 Executor 内存中,达到共享的目的。
  • Spark 中的广播变量就可以将闭包数据保存到 Executor 的内存中。
  • Spark 中的广播变量不能够更改:分布式共享只读变量。
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(
  ("a", 1), ("b", 2), ("c", 3)
))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(
  ("a", 4), ("b", 5), ("c", 6)
))
// join 会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用。
val joinRdd: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRdd.collect().foreach(println)

val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))
// 封装广播变量
val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
rdd1.map{
  case (word, count) => {
    val tmp: Int = bc.value.getOrElse(word, 0)
    (word, (count, tmp))
  }
}.collect().foreach(println)

标签:map,word,String,val,sum,SparkCore,累加器,广播
From: https://www.cnblogs.com/fireonfire/p/16755609.html

相关文章

  • 【Numpy总结】第五节:Numpy的广播(更易理解的版本)
    Numpy的广播的三种情况广播(Broadcast)是numpy对不同形状(shape)的数组进行数值计算的方式,当运算中的2个数组的形状不同时,numpy将自动触发广播机制;即:可扩充较小数组中......
  • (转载)【RocketMQ 课程笔记】16.实现集群消费模式与广播消费模式
    集群消费模式与广播消费模式环境准备生产者CmProducer生产者是一致的,循环生成10条普通消息投给给Broker,主题为:cm-sample-data,Tag:test,Key:n@Slf4jpublicclassCmPro......
  • 10. NumPy广播机制
    1.前言NumPy中的广播机制(Broadcast)旨在解决不同形状数组之间的算术运算问题。我们知道,如果进行运算的两个数组形状完全相同,它们直接可以做相应的运算。示例如下:import......
  • Numpy 的广播机制高效计算矩阵之间两两距离
    利用numpy可以很方便的计算两个二维数组之间的距离。二维数组之间的距离定义为:X的维度为(m,c),Y的维度为(m,c),Z为X到Y的距离数组,维度为(m,n)。且Z[0,0]是X[0]到Y[0]的距......
  • QT UDP通信聊天程序(单播、广播、组播)
    QTUDP通信(单播、广播、组播)  日期:2021-03-26    浏览:126    评论:0    核心提示:1.QUdpSocketUDP是轻量的、不可靠的、面向数据报、无连接的协议,它可以用......
  • sparkcore案例四:统计每个省份的用户访问量
    题目:/***统计每个省份的用户访问量,最终要求将不同省份用户访问量存放到不同的分区中分区存放规则如下*省份是以包含山0*如果省份包含海1*其他......
  • sparkcore案例三:获取每一种状态码对应的访问量
    题目描述:/***清洗完成的数据中包含一个用户的响应状态码,获取每一种状态码对应的访问量*1、读取清洗完成的数据成为RDD[String]*2、可以把上一步得到的RDD......
  • 四大组件之广播接收者BroadcastReceiver
    参考:Android开发基础之广播接收者BroadcastReceiver什么是广播接收者?我们小时候都知道,听广播,收听广播!什么是收听广播呢?打开收音机,调频就可以收到对应的广播节目了。其实......
  • SparkCore系列(四)函数大全
    有了上面三篇的函数,平时开发应该问题不大了。这篇的主要目的是把所有的函数都过一遍,深入RDD的函数RDD函数大全数据准备        val sparkconf = new Spa......
  • 累加器
    1.累加器objectAccCode{defmain(args:Array[String]):Unit={valsparkConf=newSparkConf().setMaster("local[2]").setAppName("sum")valsc=......