首页 > 编程语言 >Spark DataSource 源码解析

Spark DataSource 源码解析

时间:2023-02-27 17:02:29浏览次数:42  
标签:provider1 创建 DataSourceProvider json 源码 DataSource BaseRelation Spark

SparkSession.read()

 

 

 

创建DataFrameReader对象,进行数据读取任务。

DataFrameReader

format

schema


option

 

 

 

json、csv、text…

通过format函数设置格式,并调用load函数加载数据。

load

 

 

调用DataSource.lookupDataSource方法获取source(参数为format传入的字符串)对应的Class对象。

如果该Class对象继承自DataSourceV2,且实现了ReadSupport接口,则调用DataSourceV2Relation.create方法创建DataSourceV2Relation逻辑计划,传入Dataset.ofRows方法,生成DataSet<Row>返回。

否则,调用loadV1Source方法,返回DataSet<Row>。

DataSource.lookupDataSource

创建provider1 : 从backwardCompatibilityMap获取provider(DataframeRead的source字符串)所对应的默认的DataSourceProvider的全类名。如果provider为jdbc、json、parquet、orc等相关的全类名,则可直接返回对应的DataSourceProvider的全类名,否则返回provider本身。

举例:
(1)provider = “json” => provider1 = “json”,因为 backwardCompatibilityMap没有key为"json";

(2)provider = “org.apache.spark.sql.json” => provoider1 = “org.apache.spark.sql.execution.datasources.json.JsonFileFormat”

创建provider2 = s"$provider1.DefaultSource"
举例:
(1)provider1 = “json” => provider2 = “json.DefaultSource”;
(2) provoider1 = “org.apache.spark.sql.execution.datasources.json.JsonFileFormat” => provider2 = “org.apache.spark.sql.execution.datasources.json.JsonFileFormat.DefaultSource”)

使用ServiceLoader加载所有继承DataSourceRegister的类,得到serviceLoader

serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList判断provider1是否为shortName(继承DataSourceRegister可以为DataSource注册shortName)。

provider1不是shortName,则使用SparkClassLoader加载provider1,失败则加载provider2
举例:
provider1 = org.apache.spark.sql.execution.datasources.json.JsonFileFormat,加载成功

provider1是shortName,则直接返回其对应的DataSource Class。
举例:provider1 = json,

从以上步骤我们可以看出自定义DataSource的方法,则有以下两种方式:

用provider1正确加载DataSourceProvider类。provider(即传入format方法中的字符串)= shortName,但是DataSourceProvider必须继承DataSourceRegister接口。或者provider = 自定义DataSourceProvider的全类名。
用provider2正确加载DataSourceProvider类。provider = 自定义DataSourceProvider类的包名,DataSourceProvider类名为DefaultSource。
DataSourceProvider一般分为两类:继承FileFormat或RelationProvider接口.
DataSource API扩展借鉴
https://github.com/apache/hbase-connectors/tree/master/spark
https://github.com/IGNF/spark-iqmulus

DataFrameReader.loadV1Source

创建DataSource对象,并调用resolveRelation方法返回BaseRelation对象,传入sparkSession.baseRelationToDataFrame方法,返回RDD<Row>

DataSource.resolveRelation

 

 

 

 

 

 

 

providingClass即是调用DataSource.lookupDataSource返回的DataSource Class。

 

  • 如果DataSource是SchemaRelationProvider类型,且userSpecifiedSchema不为null,则调用dataSource.createRelation(传入schema)创建BaseRelation。
  • 如果DataSource是RelationProvider类型,且userSpecifiedSchema为null,则调用dataSource.createRelation(不传入schema)创建BaseRelation。
  • 如果DataSource是RelationProvider类型,且userSpecifiedSchema不为null,则调用dataSource.createRelation(不传入schema)创建BaseRelation,如果baseRelation.schema != userSpecifiedSchema则报出异常,否则返回
  • 如果DataSource是FileFormat,则创建HadoopFsRelation对象。

对于创建的BaseRelation对象进行checkColumnNameDuplication,然后返回。

SparkSession.baseRelationToDataFrame

利用BaseRelation创建LogicalRelation逻辑计划。

数据源有关的Strategy

FileSourceStrategy

SparkSQL执行过程中利用Strategy会将逻辑计算转换为物理计划。

 

FileSourceStrategy会传入HadoopFsRelation创建FileSourceScanExec物理计划。

FileSourceScanExec

doExecute()判断inputRDD<InternalRow>是否需要进行unsafeRow的转换。

 

 

 

 

 

 

inputRDD中利用relation.fileFormat.buildReaderWithPartitionValues作为readFile的方法,创建BucketedReadRDD或者NonBucketedReadRDD。

 

buildReader方法实现在FileFormat子类当中,比如TextFileFormat:

 

 

 

 

TextFileFormat使用HadoopFileLinesReader或者HadoopFileWholeTextReader读取PartitionedFile文件中的数据,返回Iterator[UnsafeRow] 。

 

 

 

 

createNonBucketedReadRDD和createBucketedReadRDD会创建FileScanRDD。FileScanRDD的compute方法会利用readFile方法读取文件数据。

DataSourceStrategy

 

InMemoryScans

DataSourceStrategy和InMemoryScans策略最后都会生成RowDataSourceScanExec,最终会调用CatalystScan\PrunedScan\TableScan的buildScan方法生成RDD[Row],再调用toCatalystRDD将RDD[Row]转换为RDD[InternalRow]。

 

总结

format方法传入source字符串

DataSource.lookupDataSource会找到source对应的DataSource类(一般包括FileFormat和 RelationProvider两类)

DataSource.resolveRelation会根据DataSource类型创建BaseRelation(一般包括HadoopFsRelation和继承BaseRelation且实现以下接口的类:TableScan、PrunedScan、PrunedFilteredScan、InsertableRelation、CatalystScan )。

SparkSession.baseRelationToDataFrame将BaseRelation传入创建LogicalRelation逻辑计划,并利用LogicalRelation创建DataSet。

FileSourceScanExec\DataSourceStrategy\InMemoryScans将LogicalRelation逻辑计划转换为物理计划,生成具体的DataSourceRDD,compute函数实现真正的读取逻辑。
————————————————
原文链接:https://blog.csdn.net/qq_41775852/article/details/112359682

标签:provider1,创建,DataSourceProvider,json,源码,DataSource,BaseRelation,Spark
From: https://www.cnblogs.com/limaosheng/p/17160368.html

相关文章

  • freeswitch对接移动IMS参数指定和源码修改
    freeswitch对接移动IMS参数指定和源码修改 因为移动的IMS对接都是注册的模式对接的,所以废话不多说,直接上gateway配置数据<include><gatewayname="8610xxxxxxx">/......
  • 【优分享】JMeter源码解析之结果收集器
    本文作者优测性能测试专家高源。简介:本文以最新的JMeter5.5版本源代码为例详细介绍了单机模式和分布式模式下结果收集器的工作原理。通篇干货,还不快来了解一下! 一、JMete......
  • app直播源码,利用原生JS实现回到顶部以及吸顶效果
    app直播源码,利用原生JS实现回到顶部以及吸顶效果  <style>    .box1{      width:1200px;      height:800px;      ......
  • vue源码分析-动态组件
    前面花了两节的内容介绍了组件,从组件的原理讲到组件的应用,包括异步组件和函数式组件的实现和使用场景。众所周知,组件是贯穿整个Vue设计理念的东西,并且也是指导我们开发的......
  • vue源码分析-响应式系统(一)
    从这一小节开始,正式进入Vue源码的核心,也是难点之一,响应式系统的构建。这一节将作为分析响应式构建过程源码的入门,主要分为两大块,第一块是针对响应式数据props,methods,da......
  • slate源码解析(二)- 基本框架与数据模型
    源码架构首先来看下最核心的slate包下的目录:可以看到,作为一个开源富文本库,其源码是相当之少。在第一篇文章中说过,Slate没有任何开箱即用的功能,只提供给开发者用于构建富......
  • 【转】如何阅读源码
    为何要阅读源码在聊如何去阅读源码之前,先来简单说一下为什么要去阅读源码,大致可分为以下几点原因:最直接的原因,就是面试需要,面试喜欢问源码,读完源码才可以跟面试官battle......
  • 【RocketMQ】Dledger日志复制源码分析
    消息存储在【RocketMQ】消息的存储一文中提到,Broker收到消息后会调用CommitLog的asyncPutMessage方法写入消息,在DLedger模式下使用的是DLedgerCommitLog,进入asyncPutMess......
  • 使用Kyuubi 解锁 Spark SQL on CDH 6
    背景CDH最后一个免费版6.3.2发布一年有余,离线计算核心组件版本停在了Hadoop3.0.0,Hive2.1.1,Spark2.4.0。随着Spark3.0的重磅发布,在性能方面又迎来了一次飞跃,本文......
  • 【Mybatis】【配置文件解析】【四】Mybatis源码解析-mappers的解析一
    1 前言这节我们分析一个大头,也是我们平时写的最多的,就是我们写的增删改查了,我们来看下它的解析。既然MyBatis的行为已经由上述元素配置完了,我们现在就要来定义SQL......