背景
数据血缘是数据资产管理非常重要的一部份,团队现在已经实现通过 Hook 上报 Hive SQL 任务数据血缘,通过 impala lineage 日志获取 impala 任务数据血缘。随着 Spark SQL 计算引擎的使用,现针对该场景设计可行的血缘获取方案。
方案
思路分析
在spark的源码中,以Scala的形式提供了一个org.apache.spark.sql.util.QueryExecutionListener trait (类似Java 语言的接口),来作为Spark SQL等任务执行的监听器。在org.apache.spark.sql.util.QueryExecutionListener 中提供了 onSuccess 和 onFailure 两个方法。
Spark QueryExecutionListener 代码
/**
* The interface of query execution listener that can be used to analyze execution metrics.
*
* @note Implementations should guarantee thread-safety as they can be invoked by
* multiple different threads.
*/
trait QueryExecutionListener {
/**
* A callback function that will be called when a query executed successfully.
*
* @param funcName name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc.
* @param durationNs the execution time for this query in nanoseconds.
*
* @note This can be invoked by multiple different threads.
*/
@DeveloperApi
def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
/**
* A callback function that will be called when a query execution failed.
*
* @param funcName the name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc.
* @param exception the exception that failed this query. If `java.lang.Error` is thrown during
* execution, it will be wrapped with an `Exception` and it can be accessed by
* `exception.getCause`.
* @note This can be invoked by multiple different threads.
*/
@DeveloperApi
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
}
开发一个类实现 QueryExecutionListener 中的 onSuccess 方法,在SPARK SQL 执行成功后,通过从 QueryExecution 中解析 LogicalPlan,获取表级别数据血缘(当前团队需求未细化到字段级别),并将结果通过 http 上报到数据资产平台后端服务。
代码开发
在下面代码中主要实现了 QueryExecutionListener 的 onSuccess 方法:
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.ObjectMapper
import common.dto.{LineageInfo, LineageReport, TableInfo}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
import org.apache.spark.sql.util.QueryExecutionListener
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
import java.util
class LineageReportListener extends QueryExecutionListener with Logging {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {
if (!funcName.equals("command"))
return
val queryCommand = qe.sparkSession.sparkContext.getConf.get("spark.sql.query.command")
if (!queryCommand.toLowerCase.contains("insert"))
return
val sourceTableSet: mutable.HashSet[TableInfo] = mutable.HashSet()
val targetTableSet: mutable.HashSet[TableInfo] = mutable.HashSet()
val plan: LogicalPlan = qe.analyzed
plan.collect {
case plan: LogicalRelation =>
val catalogTable = plan.catalogTable.get
plan.output.foreach(_ => {
sourceTableSet += new TableInfo(catalogTable.database, catalogTable.identifier.table)
})
case plan: HiveTableRelation =>
plan.output.foreach(_ => {
sourceTableSet += new TableInfo(plan.tableMeta.database, plan.tableMeta.identifier.table)
})
case plan: InsertIntoHiveTable =>
plan.query.output.foreach(_ => {
targetTableSet += new TableInfo(plan.table.database, plan.table.identifier.table)
})
case plan: CreateHiveTableAsSelectCommand =>
plan.query.output.foreach(_ => {
targetTableSet += new TableInfo(plan.tableDesc.database, plan.tableDesc.identifier.table)
})
}
if (sourceTableSet.nonEmpty && targetTableSet.size == 1) {
val objectMapper: ObjectMapper = new ObjectMapper()
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
val sourceTableList: util.List[TableInfo] = sourceTableSet.toList.asJava
val targetTable: TableInfo = targetTableSet.toList.asJava.get(0)
val lineageDTO: LineageReport = new LineageReport("sparkSQL", new LineageInfo(queryCommand, sourceTableList, targetTable))
val url = qe.sparkSession.sparkContext.getConf.get("spark.sql.lineage.report.url")
reportLineageInfo(url, objectMapper.writeValueAsString(lineageDTO))
}
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {
}
private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
try
body
catch {
case NonFatal(e) =>
val ctx = qe.sparkSession.sparkContext
println(s"Lineage Parsing Failed for Application: ${ctx.appName} #${ctx.applicationId}", e)
}
}
private def reportLineageInfo(url: String, jsonStr: String): Unit = {
val httpClient = HttpClients.createDefault()
val post = new HttpPost(url)
post.setHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(jsonStr, "UTF-8"))
var response: CloseableHttpResponse = null
try {
response = httpClient.execute(post)
if (response.getStatusLine.getStatusCode == 200)
println("Lineage Report Success: " + jsonStr)
else
println("Lineage Report Failed: " + jsonStr)
} finally {
try {
httpClient.close()
response.close()
} catch {
case e: Exception =>
e.printStackTrace()
}
}
}
}
SQL语句获取
数据资产平台后台服务需要同时上报用户提交的SQL内容。
在上述代码中可以看到,val queryCommand = qe.sparkSession.sparkContext.getConf.get("spark.sql.query.command") 在 sparkContext 从 sparkConf 中get配置项 spark.sql.query.command ,由此获得完整的SQL脚本。
该配置项需要修改 SPARK 源码,在提交SQL时将sql内容添加到 SparkConf 中。
在 spark-sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala 中添加一行代码 context.sparkContext.conf.set("spark.sql.query.command", command)
override def run(command: String): CommandProcessorResponse = {
try {
val substitutorCommand = SQLConf.withExistingConf(context.conf) {
new VariableSubstitution().substitute(command)
}
context.sparkContext.setJobDescription(substitutorCommand)
context.sparkContext.conf.set("spark.sql.query.command", command)
val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
hiveResponse = SQLExecution.withNewExecutionId(execution, Some("cli")) {
hiveResultString(execution.executedPlan)
}
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
} catch {
case st: SparkThrowable =>
logDebug(s"Failed in [$command]", st)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), st.getSqlState, st)
case cause: Throwable =>
logError(s"Failed in [$command]", cause)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause)
}
}
标签:val,SQL,QueryExecutionListener,qe,plan,sql,import,spark,SPARK
From: https://www.cnblogs.com/ikinson/p/18527526