传递给Spark的函数,如map()或者filter()的判断条件函数,能够利用定义在函数之外的变量,
但是集群中的每一个task都会得到变量的一个副本,并且task对变量进行的更新则不会被返回给driver.
而Spark的两种共享变量:累加器(accumulator)和广播变量(broadcast variable).
累加器
累加器可以很简便地对各个worker返回给driver的值进行聚合(有些类似aggregate);
累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数.
======累加器原理======
Driver端:
Driver端构建Accumulator并初始化;
同时完成了Accumulator注册:Accumulators.register(this);
同时Accumulator会在序列化后发送到Executor端;
Driver接收到ResultTask完成的状态更新后,会去更新Value的值;
然后在Action操作执行后就可以获取到Accumulator的值了.
Executor端:
Executor端接收到Task之后会进行反序列化操作,反序列化得到RDD和function;
同时在反序列化Task的同时也去反序列化Accumulator(在readObject方法中完成);
同时也会向TaskContext完成注册,完成任务计算之后,随着Task结果一起返回给Driver端.
广播变量
广播变量通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,
每次操作,driver都要把变量发送给worker上的executor中的所有task一次;
如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低;
使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输.
累加器、广播变量的应用
// 因为广播变量是只读的,所以无法在每个节点上进行修改.
// 更新广播变量: 只能修改后重新发送广播变量,可以通过unpersist删除原有的广播变量;br.unpersist
package rdd
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object SparkBroadCastDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkBroadCastDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
// 定义本地变量(在算子中对其操作不会影响它的值,因为不在同一个进程中,本地变量在driver进程中,而操作本地变量则是在executor进程中)
val ints = ListBuffer(1,2,3,4,5)
var num = 0
// 创建RDD
val par: RDD[Int] = sc.parallelize(ints)
/*
将本地变量定义为广播变量
广播变量:类似于mr的distribute,它是通过网络发送到每个executor里面,然后每个task不会重复的进行发送,因为广播变量是通过网络传输给executor的,所以广播变量里面的数据必须是可序列化的对象.
*/
val br: Broadcast[ListBuffer[Int]] = sc.broadcast(ints)
/*
定义累加器
累加器:类似于mr的count,它是在task中进行累加,然后在driver里进行汇总.
*/
val acc: Accumulator[Int] = sc.accumulator(0) // 定义累加器写法1
val acc1 = sc.longAccumulator // 定义累加器写法2
-----------------------------------------------------------------------------------------------------------------------------------------------------------
val mapRDD: RDD[Int] = par.map(f => {
/*
为了凸显累加器的作用,先使用自定义的本地变量进行累加,注意这个外部变量是每个task都有其副本,
所以task在对其操作的时候,实际上操作的不是本地变量本身,而是在task中的副本,所以达不到在每个task中汇总数据的目的.
所以这里不能使用外部变量求累加值,只能使用累加器,因为在分布式环境下,外部变量数据是不能同步的,而累加器可以同步加driver并在dirver端进行汇总的.
*/
num += 1
/*
这里由于这个application有两个action操作,所以执行了两个job,那这个累加器就累加了两次;也就是说如果RDD有N个Action并且它们都同时执行了这个function的时候,那这个累加器将累加N次.
*/
acc.add(1) // 相当于 acc += 1 的写法
//这是使用的广播变量,executor中的task都共享了一个副本.取广播变量的值使用.value,因为广播变量从driver端经过网络传输到executor端是需要序列化的,否者传输不过去,而我们使用的时候需要反序列化.
println(s"MapBroadCast:${br.value}")
f
})
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/*
Action1: 执行foreach操作.
*/
mapRDD.filter(f => {
// 在不同的算子中应该使用不同的累加器,除非这个累加器的含义相同这样才在不同的算子中能用.
acc1.add(1L)
//广播变量可以在DAG中的任务算子中拿到
println(s"fileBroadCast:${br.value}")
true
}).foreach(println)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- /*
Action2: 执行count操作.
*/
mapRDD.count()
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- // 打印了外部变量值,在分布式环境下是task对其操作,是不会影响driver端的变量的.
println(num)
// 打印累加器,在分布式环境下会对其进行汇总.
println(s"acc:${acc}")
}
}
// spark的广播变量可以通过unpersist方法删除,修改,然后重新广播:
val map = sc.textFile("/test.txt").map(line => {
val arr = line.split(",")
(arr(0), arr(2).toInt)
}).distinct
var mapBC = sc.broadcast(map.take(10).toMap)
mapBC.unpersist
mapBC = sc.broadcast(map.take(2).toMap)
标签:task,变量,val,driver,累加器,广播,Spark,共享
From: https://blog.csdn.net/weixin_44872470/article/details/139253052