首页 > 其他分享 >Spark的共享变量

Spark的共享变量

时间:2024-05-30 10:58:55浏览次数:24  
标签:task 变量 val driver 累加器 广播 Spark 共享

传递给Spark的函数,如map()或者filter()的判断条件函数,能够利用定义在函数之外的变量,
但是集群中的每一个task都会得到变量的一个副本,并且task对变量进行的更新则不会被返回给driver.
而Spark的两种共享变量:累加器(accumulator)和广播变量(broadcast variable).

累加器

累加器可以很简便地对各个worker返回给driver的值进行聚合(有些类似aggregate);
累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数.
======累加器原理======

Driver端: 
Driver端构建Accumulator并初始化; 
同时完成了Accumulator注册:Accumulators.register(this); 
同时Accumulator会在序列化后发送到Executor端;
Driver接收到ResultTask完成的状态更新后,会去更新Value的值; 
然后在Action操作执行后就可以获取到Accumulator的值了.

Executor端: 
Executor端接收到Task之后会进行反序列化操作,反序列化得到RDD和function; 
同时在反序列化Task的同时也去反序列化Accumulator(在readObject方法中完成); 
同时也会向TaskContext完成注册,完成任务计算之后,随着Task结果一起返回给Driver端.

广播变量

广播变量通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,
每次操作,driver都要把变量发送给worker上的executor中的所有task一次;
如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低;
使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输.
累加器、广播变量的应用
// 因为广播变量是只读的,所以无法在每个节点上进行修改.
// 更新广播变量: 只能修改后重新发送广播变量,可以通过unpersist删除原有的广播变量;br.unpersist

package rdd
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

object SparkBroadCastDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkBroadCastDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)
    // 定义本地变量(在算子中对其操作不会影响它的值,因为不在同一个进程中,本地变量在driver进程中,而操作本地变量则是在executor进程中)
    val ints = ListBuffer(1,2,3,4,5)
    var num = 0
    // 创建RDD
    val par: RDD[Int] = sc.parallelize(ints)
    /* 
    将本地变量定义为广播变量
    广播变量:类似于mr的distribute,它是通过网络发送到每个executor里面,然后每个task不会重复的进行发送,因为广播变量是通过网络传输给executor的,所以广播变量里面的数据必须是可序列化的对象.
    */
    val br: Broadcast[ListBuffer[Int]] = sc.broadcast(ints)
    /*
    定义累加器
    累加器:类似于mr的count,它是在task中进行累加,然后在driver里进行汇总.
    */
    val acc: Accumulator[Int] = sc.accumulator(0)   // 定义累加器写法1
    val acc1 = sc.longAccumulator                   // 定义累加器写法2
-----------------------------------------------------------------------------------------------------------------------------------------------------------
    val mapRDD: RDD[Int] = par.map(f => {
      /*
      为了凸显累加器的作用,先使用自定义的本地变量进行累加,注意这个外部变量是每个task都有其副本,
      所以task在对其操作的时候,实际上操作的不是本地变量本身,而是在task中的副本,所以达不到在每个task中汇总数据的目的.
      所以这里不能使用外部变量求累加值,只能使用累加器,因为在分布式环境下,外部变量数据是不能同步的,而累加器可以同步加driver并在dirver端进行汇总的.
      */  
      num += 1
      /*
      这里由于这个application有两个action操作,所以执行了两个job,那这个累加器就累加了两次;也就是说如果RDD有N个Action并且它们都同时执行了这个function的时候,那这个累加器将累加N次.
      */
      acc.add(1)   // 相当于 acc += 1 的写法
      //这是使用的广播变量,executor中的task都共享了一个副本.取广播变量的值使用.value,因为广播变量从driver端经过网络传输到executor端是需要序列化的,否者传输不过去,而我们使用的时候需要反序列化.
      println(s"MapBroadCast:${br.value}")    
      f
    })  
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    /*
    Action1: 执行foreach操作. 
    */
    mapRDD.filter(f => {
      // 在不同的算子中应该使用不同的累加器,除非这个累加器的含义相同这样才在不同的算子中能用.
      acc1.add(1L)
      //广播变量可以在DAG中的任务算子中拿到
      println(s"fileBroadCast:${br.value}")
      true
    }).foreach(println)
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------     /*
    Action2: 执行count操作. 
    */
    mapRDD.count()
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------     // 打印了外部变量值,在分布式环境下是task对其操作,是不会影响driver端的变量的.
    println(num)
    // 打印累加器,在分布式环境下会对其进行汇总.
    println(s"acc:${acc}")
  }
}
// spark的广播变量可以通过unpersist方法删除,修改,然后重新广播:
val map = sc.textFile("/test.txt").map(line => {
    val arr = line.split(",")
    (arr(0), arr(2).toInt)
}).distinct
var mapBC = sc.broadcast(map.take(10).toMap)
mapBC.unpersist
mapBC = sc.broadcast(map.take(2).toMap)

标签:task,变量,val,driver,累加器,广播,Spark,共享
From: https://blog.csdn.net/weixin_44872470/article/details/139253052

相关文章

  • 如何在 PHP 8.3 中声明变量
    在php7.3中,我做了如下操作。$TitelString=$_GET["TitelString"];If(!$TitelString)$TitelString="";$AuteurString=$_GET["AuteurString"];If(!$AuteurString)$AuteurString="";$JaarString=$_GET["JaarS......
  • spark sql导出数据为excel文件和csv文件
    一、利用to_csv函数导出数据为csv文件:df=spark.sql('''select*fromtable;''')df.toPandas().to_csv('table.csv',index=False)其中:index=False参数表示在保存时不包括行索引。二、利用to_excel函数导出数据为excel文件:df=spark.sql('''select*from......
  • spark sql中的FORMAT_NUMBER和ROUND函数
    一、例子:FORMAT_NUMBER(ROUND(value,2),'0.00')二、ROUND函数的作用:用于将数值字段舍入到指定的小数位数,如果未指定小数位数,则默认将数字舍入到最接近的整数。三、FORMAT_NUMBER函数的作用:用于将数字格式化为指定的格式,而不是进行舍入。四、两者的区别:如果小数点后面的数字,最......
  • spark sql中的几种数据库join
    一、连接类型:InnerJoin:内连接;FullOuterJoin:全外连接;LeftOuterJoin:左外连接;RightOuterJoin:右外连接;LeftSemiJoin:左半连接;LeftAntiJoin:左反连接;NaturalJoin:自然连接;Cross(orCartesian)Join:交叉(或笛卡尔)连接。二、crossjoin的例子:WITH......
  • spark sql实现“平均月活”和“平均周活”及相关函数
    一、平均月活:SELECTdate_format(time,'yyyy-MM')AScurrent_month,COUNT(DISTINCTuser_id)ASmonth_active_user_numFROMtableWHEREtime>=trunc(now(),'YEAR')GROUPBYdate_format(time,'yyyy-MM');二、平均周活:WITHweek_......
  • 共享单车数据可视化分析|附代码数据
    全文下载链接 http://tecdat.cn/?p=1951最近我们被客户要求撰写关于共享单车的研究报告,包括一些图形和统计输出。随着智能手机的普及和手机用户的激增,共享单车作为城市交通系统的一个重要组成部分,以绿色环保、便捷高效、经济环保为特征蓬勃发展作为城市共享交通系统的一个重要......
  • HTML拆分与共享方式——多HTML组合技术
    作者:私语茶馆1.应用场景    如果是一个产品级的Web项目,往往非常多的页面部分是重复的(为保持风格一致),每个HTML页面将这些重复部分重新写一次,既带来极大的工作量,也造成后续修改不便。    因此会考虑到将一个HTML的不同部分拆分为多个HTML页面,利用类似Include......
  • vite配置自动引入全局scss变量文件
    全局自动引入scss变量文件当定义了全局的scss变量文件并且而其他很多页面都需要使用的时候,都需要显式的使用@import或者@use引用一遍全局scss文件,很是麻烦。使用以下配置这样能有效避免造成大量重复工作,可以在任何scss文件中任意使用全局变量。定义全局scss变量文件配置vi......
  • oracle的排序函数以及mysql使用变量实现排序
    oracle的排序函数rank()函数:跳跃排序,如果两个第一,则后边是第3dense_rank()函数:连续排序,,再如两个第一,则后边是第2row_number()函数:连续排序,没有并列的情况createtableccx_test( coursevarchar(10), scoreint);insertintoccx_testvalues(1,70);insertintoccx_......
  • 关于java的环境变量配置
    java概念1.sun,oraclejdk,openJdk2.jdk:javadevkit(java开发工具包)3.jre:jave运行时环境4.jvm:java虚拟机2.为啥要配置环境变量?让操作系统找到jave/bin目录位置,这样在任何目录都可以使用javecjavajavap,能够让依赖java的软件系统也能找到java配置环境变量:在w......