首页 > 编程语言 >Spark源码解析(二):Spark闭包检查

Spark源码解析(二):Spark闭包检查

时间:2023-03-31 19:55:36浏览次数:47  
标签:闭包 函数 scala Int 源码 Spark 变量 more

一、理解 Scala 闭包:Closures

1.1 闭包的定义

闭包就是一个函数和与其相关的引用环境组合的一个整体(实体)。进一步说,闭包是绑定了自由变量的函数实例。

通常来讲,闭包的实现机制是定义一个特殊的数据结构,保存了函数地址指针与闭包创建时的函数的词法环境以及绑定自由变量。

对于闭包最好的解释,莫过于《流程的Python》里给出的“它是延伸了作用域的函数,其中包括函数定义体引用,以及不在定义体定义的非全局变量。核心在于闭包能够访问定义体之外定义的非全局变量。”

在Scala中,函数引入传入的参数是再正常不过的事情了,比如 (x: Int) => x > 0中,唯一在函数体x > 0中用到的变量是x,即这个函数的唯一参数。

除此之外,Scala还支持引用其他地方定义的变量: (x: Int) => x + more,这个函数将more也作为入参,不过这个参数是哪里来的?从这个函数的角度来看,more是一个 自由变量 ,因为函数字面量本身并没有给more赋予任何含义。相反,x是一个绑定变量,因为它在该函数的上下文里有明确的定义:它被定义为该函数的唯一参数。如果单独使用这个函数,而没有在任何处于作用域内的地方定more,编译器将报错:

image-20230224152448542

另一方面,只要能找到名为more的变量,同样的函数就能正常工作:

image-20230224153042153

运行时从任何带有自由变量的函数字面量,比如(x: Int) => x + more创建的函数,按照定义,要求捕获到它的自由变量more的绑定。相应的函数值结果(包含指向被捕获的more变量的引用)就被称为闭包,该名称源于“捕获”其自由变量从而“闭合”该函数字面量的动作

1.2 修改自由变量

上面的例子可能会使我们出现一个疑问:如果more在闭包创建以后被改变会发生什么?

在scala中,答案是闭包可以看到这个改变。

我们继续上面的代码,例子如下:

image-20230224160121423

Scala的闭包捕获的是变量本身,而不是变量引用的值。正如前面示例所展示的,为(x: Int) => x + more创建的闭包能够看到闭包外对more的修改。

反之也是成立的:闭包对于自由变量的修改,闭包外也是可以看到的。

1.3 自由变量多副本

那么,如果一个闭包访问了某个随着程序运行会产生多个副本的变量会如何呢?

例如,如果一个闭包使用了某个函数的局部变量,而这个函数又被调用了多次,会怎么样?闭包每次访问到的是这个变量的哪一个实例呢?

我们通过代码来进行验证:

// 第一次声明 more 变量
scala> var more = 10
more: Int = 10

// 创建闭包函数
scala> val addMore10 = (x: Int) => {x + more}
addMore10: Int => Int = $Lambda$1223/0x000000080079f840@36810c06

// 调用闭包函数
scala> addMore10(9)
res13: Int = 19

// 重新声明 more 变量
scala> var more = 100
more: Int = 100

// 创建新的闭包函数
scala> val addMore100 = (x: Int) => {x + more}
addMore100: Int => Int =  $Lambda$1237/0x00000008007b1840@ac94285

// 引用的是重新声明 more 变量
scala> addMore100(9)
res14: Int = 109

// 引用的还是第一次声明的 more 变量
scala> addMore10(9)
res15: Int = 19

// 对于全局而言 more 还是 100
scala> more
res16: Int = 100

从上面的示例可以看出重新声明 more 后,全局的 more 的值是 100,但是对于闭包函数 addMore10 还是引用的是值为 10 的 more,这是由虚拟机来实现的,虚拟机会保证 more 变量在重新声明后,原来的被捕获的变量副本继续在堆上保持存活。

1.4 闭包的实现

具体的实现细节较为复杂(其实是自己也不是很懂,看字节码有点头疼),这里简单介绍一下

对于普通方法,局部变量在调用的时候是压入栈中的,计算完成之后就会 pop 出,所以在函数的调用完成后就不能再访问这个变量

而Scala中的闭包:实际上是在Heap上创建了一个闭包函数的相关对象,在传入参数后进行初始化,并将最终得到的实例对象的引用压入到栈中

总结:Scala 实现闭包的方法是在 heap 中保存了使用不同参数初始化而产生的不同对象,对象中保存了变量的状态,然后调用具体对象的 apply 方法而最后产生不同的结果。

编译后会生成一个Closures$$anonfun$addMore$1的类文件

二、Spark中对于闭包的处理

Spark中闭包检测的目的:

为了防止spark程序中将闭包发送到Executor节点时发生序列化失败,因为闭包函数有可能引用了未序列化的外部变量

调用链

在进入转换算子(例如map)的时候,会先调用

val cleanF = sc.clean(f)

进入clean函数:

private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
  ClosureCleaner.clean(f, checkSerializable)
  f
}

闭包检测的核心类:ClosureCleaner

最终调用的clean方法:

def clean(
    closure: AnyRef,  //这个就是我们检测的函数
    checkSerializable: Boolean = true,	//在闭包clean后对闭包能否序列化进行检测
    cleanTransitively: Boolean = true): Unit = {	//是否传递性的清理闭包
  clean(closure, checkSerializable, cleanTransitively, Map.empty)
}

核心方法:clean()

首先通过isClosure方法判断是否为闭包类

if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
  logDebug(s"Expected a closure; got ${func.getClass.getName}")
  return
}

isClosure这个函数怎么判断是否为闭包类呢?

发现和我们上文scala的class文件对应起来了

private def isClosure(cls: Class[_]): Boolean = {
  cls.getName.contains("$anonfun$")
}

然后通过getOuterClassesAndObejct()函数获取func中所有对于外部闭包对象的引用

// A list of classes that represents closures enclosed in the given one
val innerClasses = getInnerClosureClasses(func)

// A list of enclosing objects and their respective classes, from innermost to outermost
// An outer object at a given index is of type outer class at the same index
val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)

遍历class文件中的所有field,找到$outer的就是外部的闭包引用,同时,如果外部闭包引用也是闭包类,那么同样获取其外部,直到最外层的非闭包类。

private def getOuterClassesAndObjects(obj: AnyRef): (List[Class[_]], List[AnyRef]) = {
  for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
    f.setAccessible(true)
    val outer = f.get(obj)
    // The outer pointer may be null if we have cleaned this closure before
    if (outer != null) {
      if (isClosure(f.getType)) {
        val recurRet = getOuterClassesAndObjects(outer)
        return (f.getType :: recurRet._1, outer :: recurRet._2)
      } else {
        return (f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
      }
    }
  }
  (Nil, Nil)
}

而后,禁止func中出现return返回值,在这里扫描,如果出现return直接报错。

// Fail fast if we detect return statements in closures
getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)

private class ReturnStatementFinder(targetMethodName: Option[String] = None)
  extends ClassVisitor(ASM9) {
  override def visitMethod(access: Int, name: String, desc: String,
      sig: String, exceptions: Array[String]): MethodVisitor = {

    // $anonfun$ covers indylambda closures
    if (name.contains("apply") || name.contains("$anonfun$")) {
      // A method with suffix "$adapted" will be generated in cases like
      // { _:Int => return; Seq()} but not { _:Int => return; true}
      // closure passed is $anonfun$t$1$adapted while actual code resides in $anonfun$s$1
      // visitor will see only $anonfun$s$1$adapted, so we remove the suffix, see
      // https://github.com/scala/scala-dev/issues/109
      val isTargetMethod = targetMethodName.isEmpty ||
        name == targetMethodName.get || name == targetMethodName.get.stripSuffix("$adapted")

      new MethodVisitor(ASM9) {
        override def visitTypeInsn(op: Int, tp: String): Unit = {
          if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl") && isTargetMethod) {
            throw new ReturnStatementInClosureException
          }
        }
      }
    } else {
      new MethodVisitor(ASM9) {}
    }
  }
}

todo:后面理解深了继续看,已经不是很能看得懂了

标签:闭包,函数,scala,Int,源码,Spark,变量,more
From: https://www.cnblogs.com/river97/p/17277331.html

相关文章

  • Spark源码解析(一):RDD之Transfrom算子
    一、延迟计算RDD代表的是分布式数据形态,因此,RDD到RDD之间的转换,本质上是数据形态上的转换(Transformations)在RDD的编程模型中,一共有两种算子,Transformations类算子和Actions类算子。开发者需要使用Transformations类算子,定义并描述数据形态的转换过程,然后调用Actions......
  • ETCD源码阅读(三)
    DAY2:阅读raftexample:etcd/contrib/raftexampleserveChannels()func(rc*raftNode)serveChannels(){ snap,err:=rc.raftStorage.Snapshot() iferr!=nil{ panic(err) } rc.confState=snap.Metadata.ConfState rc.snapshotIndex=snap.Metadata.Index r......
  • ETCD源码阅读(二)
    DAY1:阅读raftexample:etcd/contrib/raftexampleraftexample包括三个组件:一个基于raft的kvstore、一个RESTAPIServer、一个基于etcdraft实现的RaftNode。其中RaftNode也拥有一个Httpserver,用于与peer节点进行通信。RESTAPIServer则是这个RaftNode的client。kvs......
  • ETCD源码阅读(一)
    DAY0:ETCD架构下图中展示了etcd如何处理一个客户端请求涉及到的模块和流程。图中淡紫色的矩阵表示etcd,它包括如下几个模块:etcdserver:对外接受客户端的请求,请求etcd代码中的etcdserver目录,其中还有一个raft.go的模块与etcdraft库进行通信。etcdserver中与......
  • vue+leaflet示例:克里金插值渲染显示(附源码下载)
    demo源码运行环境以及配置运行环境:依赖Node安装环境,demo本地Node版本:14.19.1。运行工具:vscode或者其他工具。配置方式:下载demo源码,vscode打开,然后顺序执行以下命令:(1)下载demo环境依赖包命令:npmi(2)启动demo命令:npmrundev(3)打包demo命令:npmrunbuild:release示例效果......
  • 直播网站源码,Android中点击图片放大的简单方法
    直播网站源码,Android中点击图片放大的简单方法简单的思路就是把要放大的图片显示在一个对话框中显示出来 Java代码: publicvoidonThumbnailClick(Viewv){//finalAlertDialogdialog=newAlertDialog.Builder(this).create();//ImageViewimgView=getView();//di......
  • 手把手带你玩转Spark机器学习-深度学习在Spark上的应用
    文章目录系列文章目录前言一、ApacheSparkTimeline二、开发步骤1.在jupyter中启动SparkSession和SparkContext2.下载数据3.用Spark读取图片3.TransferLearning总结前言本文将介绍深度学习在Spark上的应用,我们将聚焦于深度学习Pipelines库,并讲解使用DLPipelines的方式。我们......
  • ChatGPT 微信接入 C#完整源码
    1.无需搭建服务器,操作极其简单。  2.winform运行程序扫码进行微信登录,勾上自动回复,就可以充当机器人调用chatGPT可实现自动回复,可以申请小号操作。  3.可以识别会话消息和群聊消息,拉入群聊@机器人可以进行群聊的消息回复,可以得到@自己的回复消息。4.代码是完整的也......
  • C#上位机开发源码 上位机项目源代码 采用基于RS485通讯总线的ModbusRtu协议
    C#上位机开发源码上位机项目源代码采用基于RS485通讯总线的ModbusRtu协议,支持用户权限管理、sqlite数据库、实时曲线、历史曲线、历史报表、导出Excel、主界面布局可调带记忆等功能YID:81150611746679046......
  • libvirt9源码编译rpm
    注1:libvirt8及以上需要meson版本不小于5.6,目前没发现合适的升级方法(libvirt编译时不识别pip安装的meson,尽管pip3安装的meson已经达到要求了)rpm-ivhrpm-ivhhttps://download.libvirt.org/libvirt-7.10.0-1.src.rpm启用powertools源yuminstalldnf-plugins-coreepel-r......