作者: Billmay
Spark API
Spark 主要提供以下四种拓展方式
API | 局限 | 版本 |
Customized function or RDD | 无法支持 Spark SQL | 任意 |
DataSource API | API 变动会比较频繁 | Before Spark 2.3: v1Spark 2.3-3.0: v1+v2After Spark 3.0: v1+新版 v2 |
Catalyst Extension | 无法自定义 rule 的顺序 | After Spark 2.2 |
Catalog plugin | Spark 3 之后才可使用 | After Spark 3.0 |
Customized function or RDD
当使用 RDD 编程时,我们完全可以基于 Customized function or RDD 实现外部数据源的读写,完成数据源的拓展以及各种操作。但当 Spark SQL 出现,这样的方式就失效了。
Example
- Create customized function for existing RDD
class CustomFunctions(rdd:RDD[SalesRecord]) {
def totalSales = rdd.map(_.itemValue).sum
}
- Create customized RDD
class DiscountRDD(prev:RDD[SalesRecord],discountPercentage:Double)
extends RDD[SalesRecord](prev){
// override compute method to calculate the discount
override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] = {
firstParent[SalesRecord].iterator(split, context).map(salesRecord => {
val discount = salesRecord.itemValue*discountPercentage
new SalesRecord(salesRecord.transactionId,
salesRecord.customerId,salesRecord.itemId,discount)
})}
override protected def getPartitions: Array[Partition] =
firstParent[SalesRecord].partitions
}
DataSource API
Define the way read and write from other datasource.
DataSource API 是 Spark 中非常活跃的一个 API,几乎每个 Spark 版本都会对其进行优化。其主要的版本对照如表
Spark version | DataSource API |
Before Spark 2.3 | v1 |
Spark 2.3 - Spark 3.0 | v2 |
After Spark 3.0 | v2 升级版 |
Before Spark 2.3:DataSource API v1
优点
- It is simple,适用于大多数场景
缺点
- Coupled with other APIs (RDD, DataFrame, SQLContext)
- 和上层 API 耦合过重,就比如 SQLContext 被废弃,那意味着 DataSource API 也得废弃
- Hard to push down
- 下推通过组合 trait 的方式,那么就会有笛卡尔积的 trait。比如要增加 limit 能力, 就得增加 limitscan, limitprunedscan, limitfilterscan, limitprunedfilterscan 多个 trait
- 可下推的算子也有限,比如不支持下推 Aggregate
- Write API too simple
- No streaming support
// Read
trait RelationProvider {
def createRelation(sqlContext: SQLContext,parameters: Map[String, String],schema: StructType): BaseRelation
}
abstract class BaseRelation{}
trait TableScan {}
trait PrunedFilteredScan {}
//write
trait InsertableRelation {}
Spark 2.3 - Spark 3.0: DataSource API v2
优点
- Friendly to Java for DS V2 APIs is Java interface
- Does not coupled with other APIs
- Easier to push down
- 每种 pushdown 都有自己的 interface
- Streaming support
// Read
public interface DataSourceV2 {}
public interface ReadSupport extends DataSourceV2 {
DataSourceReader createReader(DataSourceOptions options);
}
public interface DataSourceReader {
List<InputPartition<Row>> planInputPartitions();
}
public interface SupportsPushDownFulters extends DataSourceReader {
Filter[] pushFilters(Filter[] filters)
}
public interface InputPartition<T> extends Serializable {
InputPartitionReader<T> createPartitionReader();
}
public interface InputPartitionReader<T> extends Closeable {
boolean next() throws IOEXception;
T get();
}
After Spark 3.0 datasource api v2
摘自 Data Source V2 API Improvement design doc
升级版的 v2 优化了以下问题
- Scan execution order is not obvious
- Splitting and reading data partitions should be independent
- Columnar Scan API should not be a mixin trait
- Streaming API doesn’t play well with the batch API
- Interface name is confusing
Catalyst extension
Spark SQL 最重要的部分就是 catalyst ,它负责 SQL 的解析分析优化等操作,其主要流程如下。
从 Spark 2.2 之后,Spark 支持拓展 catalyst。拓展点如下表
Stage | Extension | description |
Parser | injectParser | 负责 SQL 解析 |
Analyzer | injectResolutionRuleinjectPostHocResolutionRuleinjectCheckRule | 负责逻辑执行计划生成,catalog 绑定,以及进行各种检查 |
Optimizer | injectOptimizerRule | 负责逻辑执行计划的优化 |
Planner | injectPlannerStrategy | 负责物理执行计划的生成 |
其中 Analyzer 有三个拓展点,分别的用处为
- injectResolutionRule:可以在这鉴权,检查元信息
- injectPostHocResolutionRule:可以在这检查 insert
- injectCheckRule:check 是否还有 unresolved 的
在 Spark 3 之后,又额外提供了一些其他拓展点
- e.injectColumnar
- e.injectFunction
- e.injectQueryStagePrepRule
Catalyst 拓展只能在 catalyst 框架下进行,具体表现为
- 拓展点的位置被限制
- 无法修改其他 rule
这其实会有一些问题:比如我们无法修改原有 rule 的行为,原有 rule 会 block 一些我们特殊需求,如特殊的类型转换等。
Catalog plugin
更好的支持多数据源
在 spark 3 之后,Spark 提供了 Catalog plugin,能够
- Provide schema
- DDL
- Multiple catalog
TiSpark 2.5
TiKV 为数据源进行 Read + Write。
TiSpark Read
API: Customized RDD + Catalyst Extension + Catalog plugin
TiSpark Read 使用 Catalyst Extension ,拓展 injectPlannerStrategy
拓展点。在该拓展点中,TiSpark 会截取可下推的 sub-plan 进行下推至 TiKV,并获取数据。无法下推的部分交给 Spark 完成。
TiSpark 2.5 之后,还使用了 Spark 3.0 提供的 catalog plugin ,侵入性更小。在没有 catalog plugin 时,我们需要 hacker catalog 以及更多的执行计划,还需要提供一些额外机制(dbprefix)用于判断属于哪一个 DataSource,非常不方便。
为什么不使用 DataSource API 呢?
- DS API 无法精确下推
- Spark 3.2 之前不支持下推 Aggration
- 无法根据数据类型进行是否下推的判断
TiSpark Write
DataSource API V1 + RDD API
TiSpark Write 的实现基于 DataSource API V1 拓展。TiSpark 会从 Dataframe 出发,利用 RDD API 进行数据的各种处理,最后使用 TiKV-java-client 提供的 2PC 接口保证整体事务的原子性
TiSpark master
TiSpark 2.5 主要基于 DS V1 实现,其问题有
- Write 拓展的粒度不够细
- No streaming support
- Can’t use v2 API in user view (如 writeto 无法使用)
- Can’t apply new features of spark(很多新特性都是基于 DSV2,如delete)
- No Catalog support,比如需要通过 option 传入 db 与 table 信息
- No Catalyst optimize,Write 逻辑节点在 catalyst 中不会有任何优化/检查
因此 TiSpark 在 master 分支进行了 DS V1 -> DS V2 的升级,主要做的事情如下
Support catalog plugin
Spark 3 之后,提供了 catalog plugin,使得 mutile catalog 成为可能。
使用 catalog plugin 可以带来以下优点
- Less hacker code
原来我们需要自定义混合的 catalog ( tidb schema + hive schema),逻辑混杂在一起。
- Need not dbprefix
为了防止同名库表,原来我们需要 dbprefix 区分不同的 datasoue
- Make read simple
由于原来我们需要在 catalyst 额外进行一些拓展,使用混合 catalog 去判断库表的存在性。难以维护开发
Read on DSV2
在 V1 API 中,Read 需要拓展 catalyst,将逻辑节点替换为 TiDBRelation,然后由 TiDBRelation 提供 schema(而不是 catalog)
在 V2 AP2 中,schema 由 TiDBTable 提供。更重要的是无需拓展 catalyst 并去替换为 TiDBRelation,Spark 会使用 catalog.loadTable 帮我们加载 TiDBTable。
因此,Read 可以进行如下优化
- 不必在 Analyzer 中改写相关逻辑节点了为 TiDBRelation 了。
- 不必在 Planner 下推时使用 TiDBRelation 的 schema 了,直接从 Spark 原生逻辑节点 DataSourceV2ScanRelation2(TiDBTable) 中获取即可
Write on DSV2
对开发者来说,Spark 提供了 v1,v2 API 用于拓展。
对于用户来说,Spark 也向用户暴露了两类API。如 df.write
是早期的 api,df.writeto.append
为新版 api。
Spark 会判断开发者是拓展了 v1 还是 v2
- df.writeto.append:开发者只能使用 v2 API 实现。
- df.write:需要向前兼容,因此 v1 与 v2 API 都可以用。Spark 主要通过实现的接口判断开发者使用的 API 是 v1 还是 v2。不满足任一以下情况的都会被判断为 v1 API
- Source extends SupportsCatalogOption/TableProvider
- Table extends supportsWrite
- Table has batchwrite capabilities
Problems
原来 write 由 DSV1 实现,在转向 V2的过程中我们发现了一些问题
- 无法处理整体数据
- V2 write framework can’t process global Data
- 和 Catalyst 的优化有冲突,如
- Data convert:boolean -> long (unsafe)
- Autorandom:mismatch
Temporary solution
由于 DSV2 存在一些问题,我们的处理方式是:先进行整体 v2 的切换,在 v2 框架中,write 仍先使 v1 API 实现
总结
DSV2 support 带来的好处
- Less hacker code, easier to develop
- The benefit of DSV2
- Streaming support
- New user API support(writeto)
- Closer to catalyst
- The new feature of Spark
- Delete
- More pushdown support
TiSpark prospect
- Read
- 随着 Spark 对下推的支持,我们可以逐步使用 DataSource API ,而不再是拓展 catalyst 的方式。
- Write
- 能够使用 DSV2 改写write
- Spark SQl
- 支持更多的 SQL:Insert,delete,update,mergeinto