首页 > 编程语言 >spline-spark-agent收集Iceberg(Spark程序)血缘

spline-spark-agent收集Iceberg(Spark程序)血缘

时间:2022-10-20 18:12:08浏览次数:70  
标签:Iceberg attr -- agent spline spark id name

一、背景

  使用Spark操作Iceberg(HiveCataLog的方式),使用Spline-Agent收集Spark作业的血缘。

二、编译

1、下载源码包:https://github.com/AbsaOSS/spline-spark-agent.git

2、经过测试,发现了一些Bug,影响到了Spark作业的正常执行,因此做了一些修改,整个修改好的Agent源码地址如下,直接下载编译即可(选择2.0版本!!!)

https://gitee.com/LzMingYueShanPao/spline-spark-agent.git

3、版本说明

Spark:3.0.3
Iceberg:0.12.1

三、编译

1、将代码上传至Linux,进入spline-spark-agent目录,执行编译命令

mvn clean install -DskipTests

2、第一步编译成功后,进入spline-spark-agent/bundle-3.0目录,执行编译命令

mvn clean install -DskipTests

3、查看结果

4、参考地址:https://github.com/AbsaOSS/spline-spark-agent

四、使用

1、创建Jar存放目录
mkdir -p /opt/module/spline

2、上传spark-3.0-spline-agent-bundle_2.12-1.0.0-SNAPSHOT.jar至/opt/module/spline

3、血缘日志接入Kafka

复制代码
spark-shell \
--jars /opt/module/spline/spark-3.0-spline-agent-bundle_2.12-1.0.0-SNAPSHOT.jar \
--conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener \
--conf spark.spline.lineageDispatcher=kafka \
--conf spark.spline.lineageDispatcher.kafka.topic=spline_test \
--conf spark.spline.lineageDispatcher.kafka.producer.bootstrap.servers=xxx.xxx.xxx.100:9092,xxx.xxx.xxx.101:9092,xxx.xxx.xxx.102:9092 \
--conf spark.spline.lineageDispatcher.kafka.sasl.mechanism=SCRAM-SHA-256 \
--conf spark.spline.lineageDispatcher.kafka.security.protocol=PLAINTEXT \
--conf spark.spline.lineageDispatcher.kafka.sasl.jaas.config=
复制代码

4、血缘日志打印在控制台

spark-shell \
--jars /opt/module/spline/spark-2.4-spline-agent-bundle_2.11-1.0.0-SNAPSHOT.jar \
--conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener \
--conf spark.spline.lineageDispatcher=console

5、数据血缘收集Iceberg

  iceberg-spark3-runtime-0.12.1.jar下载地址:https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark3-runtime/0.12.1/iceberg-spark3-runtime-0.12.1.jar

bin/spark-shell --deploy-mode client --jars /opt/module/iceberg/iceberg-spark3-runtime-0.12.1.jar,/opt/module/spline/spark-3.0-spline-agent-bundle_2.12-1.0.0-SNAPSHOT.jar  \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.hive_prod.type=hive \
    --conf spark.sql.catalog.hive_prod.uri=thrift://192.168.xxx.xxx:9083 \
    --conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener \
    --conf spark.spline.lineageDispatcher=kafka \
    --conf spark.spline.lineageDispatcher.kafka.topic=spline_test \
    --conf spark.spline.lineageDispatcher.kafka.producer.bootstrap.servers=192.168.xxx.xxx:9092,192.168.xxx.xxx:9092,192.168.xxx.xxx:9092 \
    --conf spark.spline.lineageDispatcher.kafka.sasl.mechanism=SCRAM-SHA-256 \
    --conf spark.spline.lineageDispatcher.kafka.security.protocol=PLAINTEXT \
    --conf spark.spline.lineageDispatcher.kafka.sasl.jaas.config=

6、查看kafka的topic:spline_test

  得到如下数据:

{
  "id": "3d2c9289-ac47-5672-babf-d4a67bde5d14",
  "name": "Spark shell",
  "operations": {
    "write": {
      "outputSource": "hdfs://192.168.xxx.xxx:8020/user/hive/warehouse/iceberg_hadoop_db.db/tbresclientfollower65",
      "append": false,
      "id": "op-0",
      "name": "OverwriteByExpression",
      "childIds": [
        "op-1"
      ],
      "params": {
        "table": {
          "identifier": "hive_prod.iceberg_hadoop_db.tbresclientfollower65",
          "output": [
            {
              "__attrId": "attr-8"
            },
            {
              "__attrId": "attr-9"
            },
            {
              "__attrId": "attr-10"
            },
            {
              "__attrId": "attr-11"
            },
            {
              "__attrId": "attr-12"
            },
            {
              "__attrId": "attr-13"
            },
            {
              "__attrId": "attr-14"
            },
            {
              "__attrId": "attr-15"
            }
          ]
        },
        "isByName": false,
        "deleteExpr": {
          "__exprId": "expr-0"
        }
      },
      "extra": {
        "destinationType": "iceberg"
      }
    },
    "reads": [
      {
        "inputSources": [
          "hdfs://192.168.xxx.xxx:8020/user/hive/warehouse/iceberg_hadoop_db.db/tbresclientfollower64"
        ],
        "id": "op-3",
        "name": "DataSourceV2Relation",
        "output": [
          "attr-0",
          "attr-1",
          "attr-2",
          "attr-3",
          "attr-4",
          "attr-5",
          "attr-6",
          "attr-7"
        ],
        "params": {
          "table": {
            "identifier": "hive_prod.iceberg_hadoop_db.tbresclientfollower64"
          },
          "identifier": "iceberg_hadoop_db.tbresclientfollower64",
          "options": "org.apache.spark.sql.util.CaseInsensitiveStringMap@1f"
        },
        "extra": {
          "sourceType": "iceberg"
        }
      }
    ],
    "other": [
      {
        "id": "op-2",
        "name": "SubqueryAlias",
        "childIds": [
          "op-3"
        ],
        "output": [
          "attr-0",
          "attr-1",
          "attr-2",
          "attr-3",
          "attr-4",
          "attr-5",
          "attr-6",
          "attr-7"
        ],
        "params": {
          "identifier": "hive_prod.iceberg_hadoop_db.tbresclientfollower64"
        }
      },
      {
        "id": "op-1",
        "name": "Project",
        "childIds": [
          "op-2"
        ],
        "output": [
          "attr-0",
          "attr-1",
          "attr-2",
          "attr-3",
          "attr-4",
          "attr-5",
          "attr-6",
          "attr-7"
        ],
        "params": {
          "projectList": [
            {
              "__attrId": "attr-0"
            },
            {
              "__attrId": "attr-1"
            },
            {
              "__attrId": "attr-2"
            },
            {
              "__attrId": "attr-3"
            },
            {
              "__attrId": "attr-4"
            },
            {
              "__attrId": "attr-5"
            },
            {
              "__attrId": "attr-6"
            },
            {
              "__attrId": "attr-7"
            }
          ]
        }
      }
    ]
  },
  "attributes": [
    {
      "id": "attr-0",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "bd_tbresclientfollower_uuid"
    },
    {
      "id": "attr-1",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "clientid"
    },
    {
      "id": "attr-2",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "id"
    },
    {
      "id": "attr-3",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "objid"
    },
    {
      "id": "attr-4",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "objname"
    },
    {
      "id": "attr-5",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "type"
    },
    {
      "id": "attr-6",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "bd_createtime"
    },
    {
      "id": "attr-7",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "bd_lst_upd_time"
    },
    {
      "id": "attr-8",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "bd_tbresclientfollower_uuid"
    },
    {
      "id": "attr-9",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "clientid"
    },
    {
      "id": "attr-10",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "id"
    },
    {
      "id": "attr-11",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "objid"
    },
    {
      "id": "attr-12",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "objname"
    },
    {
      "id": "attr-13",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "type"
    },
    {
      "id": "attr-14",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "bd_createtime"
    },
    {
      "id": "attr-15",
      "dataType": "e63adadc-648a-56a0-9424-3289858cf0bb",
      "name": "bd_lst_upd_time"
    }
  ],
  "expressions": {
    "constants": [
      {
        "id": "expr-0",
        "dataType": "75fe27b9-9a00-5c7d-966f-33ba32333133",
        "extra": {
          "simpleClassName": "Literal",
          "_typeHint": "expr.Literal"
        },
        "value": true
      }
    ]
  },
  "systemInfo": {
    "name": "spark",
    "version": "3.0.3"
  },
  "agentInfo": {
    "name": "spline",
    "version": "1.0.0-SNAPSHOT+unknown"
  },
  "extraInfo": {
    "appName": "Spark shell",
    "dataTypes": [
      {
        "_typeHint": "dt.Simple",
        "id": "e63adadc-648a-56a0-9424-3289858cf0bb",
        "name": "string",
        "nullable": true
      },
      {
        "_typeHint": "dt.Simple",
        "id": "75fe27b9-9a00-5c7d-966f-33ba32333133",
        "name": "boolean",
        "nullable": false
      }
    ],
    "appId": "application_1665743023238_0122",
    "startTime": 1666249345168
  }
}
View Code

7、解析JSON数据

  Spark作业通过Spline的Agent程序解析之后,会将血缘信息发送至spline_test主题,此主题中的数据可根据自己公司的实际情况来进行使用。

  我这边会对spline_test主题中的数据进行清洗,消费spline_test主题,解析数据,提取出Spark作业的上下游信息,然后根据这些信息就可以知道Spark使用了哪些表,输出了哪些表,然后将数据导入图数据库HugeGraph即可。

8、我之前也写过收集Spark作业血缘的一篇文章,大家也可以参考一下

  https://www.cnblogs.com/qq1035807396/p/16774469.html

标签:Iceberg,attr,--,agent,spline,spark,id,name
From: https://www.cnblogs.com/qq1035807396/p/16810652.html

相关文章

  • SparkCore(二)
    RDD的API操作/方法/算子比如有一个100M的csv文件,需要对它的每个元素操作,比如先+1,再平方,结果保存另一个csv文件。如下图,如果用传统python思维,不仅每个中间容器占用内存,消......
  • spark_base
    spark集群版原理Spark和其他大数据框架一样,计算都需要使用资源(【core】+【内存】#core就是cpu中的几核几线程的线程数1、如果只有一台服务器,那么就是使用【1台机器】的......
  • Livy,基于Apache Spark的开源REST服务,加入Cloudera Labs
    温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。Fayson的github:https://github.com/fayson/cdhproject提示:代码块部分可以左右滑动查看噢ClouderaLabs的新成......
  • SparkSQL参数
    SparkSQL参数<1>表分区类参数--是否允许动态生成分区sethive.exec.dynamic.partition=true;--是否容忍指定分区全部动态生成sethive.exec.dynamic.partition.mode=......
  • SSH-agent如何通过代理进行服务器连接
    openssh是什么这里不做解释,但凡是用过linux系统的一般都是会了解这个的,毕竟openssh都是系统自带的应用。openssh一般都是指linux上的客户端,很多linux系统自有客户端的ssh......
  • SparkSQL on K8s 在网易传媒的落地实践
    随着云原生技术的发展和成熟,大数据基础设施积极拥抱云原生是业内发展的一大趋势。网易传媒在2021年成功将SparkSQL部署到了K8s集群,并实现与部分在线业务的混合部署,......
  • Spark常规性能调优(一)
    1、常规性能调优一:最优资源配置Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的 ,实现了最优的资源配置后,在此基础上再考......
  • spark通过pipline方式批量插入redis集群方式
    spark通过pipline方式批量插入redis集群网上资料比较少,但是有一大堆都是单机的方式,spring倒是也有写入redis集群的实现代码,以下整理了spark通过pipline批量写入的方式,速度......
  • 【云原生】Spark on k8s 讲解与实战操作
    目录一、概述二、开始Sparkonk8s运行原理三、Spark运行模式1)cluster模式2)client模式四、开始Sparkonk8s编排1)下载Spark包2)构建镜像3)配置spark用户权限4)提交Sp......
  • 给sparkStreaming的socketTextStream端口写数据,streaming收不到的问题!
    本文参考了这篇文章,非常感谢:https://www.jianshu.com/p/31655775b040这两天研究从hdfs里读数据,写进sparkStreaming,不使用kafka,直接发送给streaming,于是想到了socketTextSt......