首页 > 其他分享 >图书《数据资产管理核心技术与应用》核心章节节选-3.1.2. 从Spark 执行计划中获取数据血缘

图书《数据资产管理核心技术与应用》核心章节节选-3.1.2. 从Spark 执行计划中获取数据血缘

时间:2024-08-02 15:32:12浏览次数:9  
标签:节选 获取数据 计划 3.1 apache org spark 执行 Spark

本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。

从Spark 执行计划中获取数据血缘

因为数据处理任务会涉及到数据的转换和处理,所以从数据任务中解析血缘也是获取数据血缘的渠道之一,Spark 是大数据中数据处理最常用的一个技术组件,既可以做实时任务的处理,也可以做离线任务的处理。Spark在执行每一条SQL语句的时候,都会生成一个执行计划,这一点和很多数据库的做法很类似,都是SQL语句在执行时,先生成执行计划。如下图3-1-10所示,在Spark的官方文档链接https://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html#content中,有明确提到,可以根据EXPLAIN关键字来获取执行计划,这和很多数据库查看执行计划的方式很类似。

图3-1-10

Spark底层生成执行计划以及处理执行计划的过程如下图3-1-11所示。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。

 

图3-1-11

从图中可以看到,

1、 执行SQL语句或者Data Frame时,会先生成一个Unresolved Logical Plan,就是没有做过任何处理和分析的逻辑执行计划,仅仅会从SQL语法的角度做一些基础性的校验。

2、 之后通过获取Catalog的数据,对需要执行的SQL语句做表名、列名的进一步分析校验,从而生成一个可以直接运行的逻辑执行计划。

3、 但是Spark底层会有个优化器来生成一个最优的执行操作方式,从而生成一个优化后的最佳逻辑执行计划。

4、 将最终确定下来的逻辑执行计划转换为物理执行计划,转换为最终的代码进行执行。

Spark的执行计划其实就是数据处理的过程计划,会将SQL语句或者DataFrame 做解析,并且结合Catalog一起,生成最终数据转换和处理的代码。所以可以从Spark的执行计划中,获取到数据的转换逻辑,从而解析到数据的血缘。但是spark的执行计划都是在spark底层内部自动处理的,如何获取到每次Spark任务的执行计划的信息呢?其实在Spark底层有一套Listener的架构设计,可以通过Spark Listener 来获取到spark 底层很多执行的数据信息。

在spark的源码中,以Scala的形式提供了一个org.apache.spark.sql.util.QueryExecutionListener  trait (类似Java 语言的接口),来作为Spark SQL等任务执行的监听器。在org.apache.spark.sql.util.QueryExecutionListener  中提供了如下表3-1-2所示的两个方法。

表3-1-2

方法名

描述

def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit

执行成功时,调用的方法,其中包括了执行计划参数,这里的执行计划可以是逻辑计划或者物理计划

def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit

执行失败时,调用的方法,其中同样也包括了执行计划参数,这里的执行计划可以是逻辑计划或者物理计划

因此可以借用QueryExecutionListener  来主动让Spark在执行任务时,将执行计划信息推送到自己的系统或者数据库中,然后再做进一步的解析,如下图3-1-12所示。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。

图3-1-12

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
case class PlanExecutionListener(sparkSession: SparkSession) extends QueryExecutionListener with Logging{

  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {
    // 执行成功时,调用解析执行计划的方法
    planParser(qe)
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {

  }

  private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
    try
      body
    catch {
      case NonFatal(e) =>
        val ctx = qe.sparkSession.sparkContext
        logError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)
    }
  }


  def planParser(qe: QueryExecution): Unit = {
    logInfo("----------- start to get spark  analyzed LogicPlan--------")
      //解析执行计划,并且将执行计划的数据发送到自有的系统或者数据库中
      ......
  }
} 

上面的代码中,实现了QueryExecutionListener 这个trait中的onSuccess和onFailure这两个方法,只有在onSuccess时,才需要获取执行计划的数据,因为只有onSuccess时的血缘才是有效的。

实现好了自定义的QueryExecutionListener后,可以通过sparkSession.listenerManager.register来将自己实现的PlanExecutionListener 注册到Spark会话中,listenerManager是Spark中Listener的管理器。

在获取到执行计划时,需要再结合Catalog一起,来进一步解析血缘的数据,如下图3-1-13所示

图3-1-13

Spark 中常见的执行计划实现类如下表3-1-3所示,获取数据血缘时,就是需要从如下的这些执行计划中解析血缘关系。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。

表3-1-3

执行计划实现类

描述

org.apache.spark.sql.execution.datasources.LogicalRelation

一般用于解析字段级的关联关系

org.apache.spark.sql.catalyst.catalog.HiveTableRelation

Hive 表关联关系的执行计划,一般用于SQL执行时,存在关联查询的情况会出现该执行计划。

org.apache.spark.sql.hive.execution.InsertIntoHiveTable

一般是在执行insert into 的SQL 语句时才会产生的执行计划,例如insert into xxx_table(colum1,column2) values("4","zhangsan")

org.apache.spark.sql.execution.datasources

.InsertIntoHadoopFsRelationCommand

一般用于执行类似    sparkSession

      .read

      .table("xx_source_table ")

      .limit(10)

      .write

      .mode(SaveMode.Append)

      .insertInto("xx_target_table ")产生的执行计划。

org.apache.spark.sql.hive.execution.

CreateHiveTableAsSelectCommand

一般是在执行create table xxx_table as的SQL 语句时才会产生的执行计划,例如create table xx_target_table as select * from xx_source_table

org.apache.spark.sql.execution.command

.CreateDataSourceTableAsSelectCommand

一般用于执行类似sparkSession

      .read

      .table("xx_source_table")

      .limit(10)

      .write

      .mode(SaveMode.Append)

      .saveAsTable("xx_target_table")产生的执行计划。

org.apache.spark.sql.execution.datasources

.InsertIntoDataSourceCommand

一般用于将SQL查询结果写入到一张表中,比如insert into xxx_target_table select * from xxx_source_table

 

如下是以org.apache.spark.sql.execution.datasources

.InsertIntoHadoopFsRelationCommand 为例的spark 执行计划的数据,如下数据已经将原始的执行计划转换为了json格式的数据,方便做展示。

.................更多内容,请参考清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著

 

 

标签:节选,获取数据,计划,3.1,apache,org,spark,执行,Spark
From: https://www.cnblogs.com/laoqing/p/18338683

相关文章

  • 吴恩达深度学习deeplearning.ai学习笔记(一)3.9 3.10 3.11
    3.9神经网络的梯度下降法对于单隐层神经网络而言,主要参数就是,并且输入特征的维度可以记为,第一层有个隐藏单元,第二层有个输出单元,目前仅仅见过只有一个输出单元的情况;的维度是,的维度是,的维度是,的维度是,成本函数为:训练神经网络时,随机初始化参数很重要,而不是全令其为0;每个梯......
  • Langchain-Chatchat3.1——搜索引擎bing与DuckDuckGo
    Langchain-Chatchat3.1——搜索引擎bing与DuckDuckGo1.前提是咱们的Chatchat服务一起部署好了,可以参考Langchain-Chatchat3.1版本docker部署流程——知识库问答2.搜索引擎DuckDuckGo:该搜索引擎不需要key,但是需要全球上网服务,挂代理。pipinstall-Uduckduckgo_search......
  • Langchain——chatchat3.1版本docker部署流程Langchain-Chatchat
    Langchain——chatchat3.1版本docker部署流程Langchain-Chatchat1.项目地址#项目地址https://github.com/chatchat-space/Langchain-Chatchat#dockerhub地址https://hub.docker.com/r/chatimage/chatchat/tags2.docker部署参考官方文档#官方文档https://github.com/......
  • UFS4.0/UFS3.1/Unipro总线协议分析仪
    UFS4.0/UFS3.1/Unipro总线协议分析仪(Analyzer)&训练器(Exerciser):全球市场占有率排名第一的UFS/Unipro总线协议分析仪厂商,支持MIPIM-PHYv5.0GEAR5,UniProv2.0andUFSv4.0等规格测试并支持向下兼容。ProtocolInsight为开发移动设备的客户提供测试和测量工具,并为UFS的......
  • GPU训Llama 3.1疯狂崩溃,竟有大厂用CPU服务器跑千亿参数大模型?
    马斯克19天建成由10万块H100串联的世界最大超算,已全力投入Grok3的训练中。与此同时,外媒爆料称,OpenAI和微软联手打造的下一个超算集群,将由10万块GB200组成。在这场AI争霸赛中,各大科技公司们卯足劲加大对GPU的投资,似乎在暗示着拥有更多、更强大的GPU,就能让自己立于不败之......
  • fpc 3.3.1使用rtti
    生产不建议使用fpc3.3.1(Trunk)近日QQ群的SunGod和啊D等发现fpc3.3.1(Trunk)添加了和delphi一样的rtti功能,但fpc默认是没启用的。启用RTTI的条件:1、编译FPC时添加-dENABLE_DELPHI_RTTI2、在fpc.cfg最后一行添加-dENABLE_DELPHI_RTTI因编译fpc对我来说还是太麻烦,我喜欢用fpcudelu......
  • 从服务器获取数据后更新 DataTable 的列
    我需要一个解决方案来解决我的问题。我有一个Django应用程序,我在服务器端处理中使用DataTables,表模板对于我拥有的所有模型都是动态的,我只需传递列(带有data,||的字典列表|和name)在模板的上下文中,非常简单...title之后,我需要对列进行一些......
  • 怎么解决pytnon爬虫遇到需滚动才能获取数据(selenium)
    需要滑轮滚动才能显示元素获取数据当你遇到网页数据需要滚动到底部或滚动到某个位置才能加载出来时,这通常是因为网页采用了懒加载(LazyLoading)技术来优化页面加载速度和性能。在这种情况下,使用Python爬虫时,你需要模拟滚动行为以触发数据的加载。这可以通过几种方式实现,包括使......
  • LLAMA3.1 8B 本地部署并配合Obsidian建立本地AI知识管理系统
    目前,LLAMA3.1模型分为8B、70B、405B三个版本,其中70B和405B对于显存的要求均已超过了一般家用电脑的配置(或者换个说法,用一张4090也是带不起来的),所以运行8B即可。LLAMA3.18B的性能约相当于ChatGPT3.5。经过我的测试4080、2080、intelultra9185H(无独立显卡,其能力约相当于1060)......
  • 为什么 string.maketrans 在 Python 3.1 中不起作用?
    我是Python新手。怎么了这个在Python3.1中不起作用?fromstringimportmaketrans#Requiredtocallmaketransfunction.intab="aeiou"outtab="12345"trantab=maketrans(intab,outtab)str="thisisstringexample....wow!......