首页 > 数据库 >QueryExecutionListener 实现 SPARK SQL 数据血缘

QueryExecutionListener 实现 SPARK SQL 数据血缘

时间:2024-11-05 17:59:25浏览次数:2  
标签:val SQL QueryExecutionListener qe plan sql import spark SPARK

背景

数据血缘是数据资产管理非常重要的一部份,团队现在已经实现通过 Hook 上报 Hive SQL 任务数据血缘,通过 impala lineage 日志获取 impala 任务数据血缘。随着 Spark SQL 计算引擎的使用,现针对该场景设计可行的血缘获取方案。

方案

思路分析

在spark的源码中,以Scala的形式提供了一个org.apache.spark.sql.util.QueryExecutionListener trait (类似Java 语言的接口),来作为Spark SQL等任务执行的监听器。在org.apache.spark.sql.util.QueryExecutionListener 中提供了 onSuccessonFailure 两个方法。

Spark QueryExecutionListener 代码
/**
 * The interface of query execution listener that can be used to analyze execution metrics.
 *
 * @note Implementations should guarantee thread-safety as they can be invoked by
 * multiple different threads.
 */
trait QueryExecutionListener {

  /**
   * A callback function that will be called when a query executed successfully.
   *
   * @param funcName name of the action that triggered this query.
   * @param qe the QueryExecution object that carries detail information like logical plan,
   *           physical plan, etc.
   * @param durationNs the execution time for this query in nanoseconds.
   *
   * @note This can be invoked by multiple different threads.
   */
  @DeveloperApi
  def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit

  /**
   * A callback function that will be called when a query execution failed.
   *
   * @param funcName the name of the action that triggered this query.
   * @param qe the QueryExecution object that carries detail information like logical plan,
   *           physical plan, etc.
   * @param exception the exception that failed this query. If `java.lang.Error` is thrown during
   *                  execution, it will be wrapped with an `Exception` and it can be accessed by
   *                  `exception.getCause`.
   * @note This can be invoked by multiple different threads.
   */
  @DeveloperApi
  def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
}

开发一个类实现 QueryExecutionListener 中的 onSuccess 方法,在SPARK SQL 执行成功后,通过从 QueryExecution 中解析 LogicalPlan,获取表级别数据血缘(当前团队需求未细化到字段级别),并将结果通过 http 上报到数据资产平台后端服务。

代码开发

在下面代码中主要实现了 QueryExecutionListener 的 onSuccess 方法:

import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.ObjectMapper
import common.dto.{LineageInfo, LineageReport, TableInfo}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
import org.apache.spark.sql.util.QueryExecutionListener

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
import java.util

class LineageReportListener extends QueryExecutionListener with Logging {

  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {

    if (!funcName.equals("command"))
      return

    val queryCommand = qe.sparkSession.sparkContext.getConf.get("spark.sql.query.command")
    if (!queryCommand.toLowerCase.contains("insert"))
      return

    val sourceTableSet: mutable.HashSet[TableInfo] = mutable.HashSet()
    val targetTableSet: mutable.HashSet[TableInfo] = mutable.HashSet()

    val plan: LogicalPlan = qe.analyzed

    plan.collect {
      case plan: LogicalRelation =>
        val catalogTable = plan.catalogTable.get
        plan.output.foreach(_ => {
          sourceTableSet += new TableInfo(catalogTable.database, catalogTable.identifier.table)
        })

      case plan: HiveTableRelation =>
        plan.output.foreach(_ => {
          sourceTableSet += new TableInfo(plan.tableMeta.database, plan.tableMeta.identifier.table)
        })

      case plan: InsertIntoHiveTable =>
        plan.query.output.foreach(_ => {
          targetTableSet += new TableInfo(plan.table.database, plan.table.identifier.table)
        })

      case plan: CreateHiveTableAsSelectCommand =>
        plan.query.output.foreach(_ => {
          targetTableSet += new TableInfo(plan.tableDesc.database, plan.tableDesc.identifier.table)
        })
    }

    if (sourceTableSet.nonEmpty && targetTableSet.size == 1) {
      val objectMapper: ObjectMapper = new ObjectMapper()
      objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)

      val sourceTableList: util.List[TableInfo] = sourceTableSet.toList.asJava
      val targetTable: TableInfo = targetTableSet.toList.asJava.get(0)
      val lineageDTO: LineageReport = new LineageReport("sparkSQL", new LineageInfo(queryCommand, sourceTableList, targetTable))

      val url = qe.sparkSession.sparkContext.getConf.get("spark.sql.lineage.report.url")
      reportLineageInfo(url, objectMapper.writeValueAsString(lineageDTO))
    }
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {

  }


  private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
    try
      body
    catch {
      case NonFatal(e) =>
        val ctx = qe.sparkSession.sparkContext
        println(s"Lineage Parsing Failed for Application: ${ctx.appName} #${ctx.applicationId}", e)
    }
  }


  private def reportLineageInfo(url: String, jsonStr: String): Unit = {
    val httpClient = HttpClients.createDefault()
    val post = new HttpPost(url)
    post.setHeader("Content-Type", "application/json")
    post.setEntity(new StringEntity(jsonStr, "UTF-8"))
    var response: CloseableHttpResponse = null
    try {
      response = httpClient.execute(post)
      if (response.getStatusLine.getStatusCode == 200)
        println("Lineage Report Success: " + jsonStr)
      else
        println("Lineage Report Failed: " + jsonStr)
    } finally {
      try {
        httpClient.close()
        response.close()
      } catch {
        case e: Exception =>
          e.printStackTrace()
      }
    }
  }
}

SQL语句获取

数据资产平台后台服务需要同时上报用户提交的SQL内容。
在上述代码中可以看到,val queryCommand = qe.sparkSession.sparkContext.getConf.get("spark.sql.query.command") 在 sparkContext 从 sparkConf 中get配置项 spark.sql.query.command ,由此获得完整的SQL脚本。
该配置项需要修改 SPARK 源码,在提交SQL时将sql内容添加到 SparkConf 中。
在 spark-sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala 中添加一行代码 context.sparkContext.conf.set("spark.sql.query.command", command)

override def run(command: String): CommandProcessorResponse = {
    try {
      val substitutorCommand = SQLConf.withExistingConf(context.conf) {
        new VariableSubstitution().substitute(command)
      }
      context.sparkContext.setJobDescription(substitutorCommand)
      context.sparkContext.conf.set("spark.sql.query.command", command)
      val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
      hiveResponse = SQLExecution.withNewExecutionId(execution, Some("cli")) {
        hiveResultString(execution.executedPlan)
      }
      tableSchema = getResultSetSchema(execution)
      new CommandProcessorResponse(0)
    } catch {
        case st: SparkThrowable =>
          logDebug(s"Failed in [$command]", st)
          new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), st.getSqlState, st)
        case cause: Throwable =>
          logError(s"Failed in [$command]", cause)
          new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause)
    }
  }

标签:val,SQL,QueryExecutionListener,qe,plan,sql,import,spark,SPARK
From: https://www.cnblogs.com/ikinson/p/18527526

相关文章

  • MySQL数据库理论与知识剖析
    MySQL数据库理论与知识剖析在信息技术领域,数据库作为数据存储、管理和分析的核心工具,扮演着举足轻重的角色。MySQL,作为开源数据库管理系统中的佼佼者,以其高效、灵活和易用的特点,成为了众多企业和开发者的首选。本文旨在深入剖析MySQL数据库的理论基础与关键知识,帮助读者更......
  • MyBatis 动态 SQL 详解
    动态SQL简介动态SQL是MyBatis的强大特性之一,它允许在XML映射文件内以标签的形式编写动态SQL,完成逻辑判断和动态拼接SQL的功能。动态SQL可以根据用户输入或外部条件动态地构建查询,避免了硬编码查询逻辑,简化了数据库查询的复杂度,同时提高了代码的可读性和维护性。......
  • MySQL server 免安装教程
    1,下载免安装包-社区版本https://dev.mysql.com/downloads/file/?id=5343202,解压放到一电脑某个路径下,整个包3,创建data文件夹和my.ini文件my.ini代码照抄,注意修改路径,与解压后的安装包地址一致[mysqld]#设置3306端口port=3306#设置mysql的安装目录basedir="D:\\s......
  • 178_springboot基于spark的汽车行业大数据分析系统
    目录系统展示开发背景代码实现项目案例 获取源码博主介绍:CodeMentor毕业设计领航者、全网关注者30W+群落,InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者,博客领航之星、开发者头条/腾讯云/AWS/Wired等平台优选内容创作者、深耕Web......
  • 如何在虚拟机上安装MySQL5.7和彻底在虚拟机上删除MySQL5.7
    一、安装MySQL数据库5.7版本​在部署hive时,我的主节点为hadoop1,从节点为hadoop2和hadoop3,软件为VMware的centos9。在hadoop1节点使用yum在线安装MySQL5.7版本。在root用户下执行:1.更新密钥rpm--importhttps://repo.mysql.com/RPM-GPG-KEY-mysql-20222.安装Mysqlyum......
  • pyspark建模(类似于dwd层),flask直接对接前端请求进行召回(类似于ads层,但是不保存)
    2.SparkMLib2.1SparkMLib开发环境准备2.1.1配置python和spark环境安装Python环境安装Anaconda3-5.2.0-Windows-x86_64.exe配置环境变量Anaconda_HOMEE:\20241014_Soft\Anaconda3PATH%Anaconda_HOME%Scripts;%Anaconda_HOME%Library\mingw-w64\bin;%Anaconda_H......
  • 实时数仓及olap可视化构建(基于mysql,将maxwell改成seatunnel可以快速达成异构数据源实
    1.OLAP可视化实现(需要提前整合版本)Linux121Linux122Linux123jupyter✔spark✔✔✔python3+SuperSet3.0✔hive✔ClinckHouse✔Kafka✔✔✔Phoenix✔DataX✔maxwell✔Hadoop✔✔✔MySQL✔......
  • 解决linux将csv连入mysql数据库的问题
    创建一个csv文件路径在/opt/module/data/123.csv0.登入数据库:mysql-uroot-p123456root是用户名 123456是密码1.使用数据库(user库[自己创建的库])(以下都是mysql操作)useuser;2.在mysql数据库中创建对应表(最好别设置主键和限制,csv数据不一定干净)create tablestud......
  • FreeSQL学习
    FreeSQL是一款功能强大的对象关系映射组件,它支持.NETCore2.0+和.NETFramework4.0+。它允许开发者通过简单的接口连接和操作多种类型数据库,包括但不限于MySQL/PostgreSQL/SQLServer/SQLite等。FreeSQL主要特点包括:1.多数据库支持:FreeSQL支持多种数据库系统,包括MySQL/PostgreS......
  • SQLite学习
    1.什么是SQLite?  官方定义:SQLite是一个C语言库,它实现了一个小型、快速、自包含、高可靠性、全功能的SQL数据库引起。SQLite是世界上使用最多的数据库引擎。SQLite文件格式是稳定的、跨平台的、向后兼容的,SQLite数据库文件通常用作在系统之间传输丰富内容的容器以及作为数据的......