Spark学习(二):RDD详解
RDD API
一般来说有三种创建RDD的方式
- 由外部存储系统的数据集创建(比如本地的文件系统,同时兼容所有Hadoop支持的数据集):
sc.textFile
- 通过已有的RDD通过算子转换生成新的RDD:
rdd1.flatMap(_.split(" "))
- 通过一个Scala集合创建:
sc.parallelize()
RDD原理
MapReduce框架采用了一种非循环式的数据流模型,把中间结果写入到HDFS(GFS的工业界实现),带来了大量的数据复制、磁盘IO和序列化的开销,且只能支持map和reduce两种特定的计算。RDD就是为了解决这些问题而产生的
RDD的全称叫:Resilient Distributed Dataset,翻译过来就是弹性 分布式 数据集,它是一个数据集的表示,不仅表示了数据集,也同时表示了这个数据集从哪来,以及如何计算,主要包含了以下几个属性:
表示数据集位置:分区列表,分区函数,最佳位置列表
-
分区列表:一个RDD有很多分区,每个分区内部包含了该RDD的部分数据,在Spark中任务以task线程的方式运行,一个分区就对应着一个task线程,这个重要属性保障了分布式运算的实现
用户可以在创建RDD时指定RDD的分区个数,比如
var rdd = sc.textFile("/word.txt", 8)
-
分区函数:当数据为KV类型数据时,可以通过设定分区函数来自定义数据的分区(可选项)
当前Spark实现了两种类型的分区函数:基于哈希的HashPartitioner,基于范围的RangePartitioner
-
最佳位置列表:存储每个分区的优先位置,涉及到数据的本地性、数据块位置最优,Spark任务在调度的时候会优先考虑存有数据的节点开始计算任务,减少数据的网络传输,来提高计算效率
表示数据集来源:计算函数,依赖关系
- 计算函数:在Spark中RDD的运算是以分区为单位的,每个RDD都会实现compute计算函数来对每个分区进行计算
- 依赖关系:一个RDD往往会依赖多个RDD,这里涉及到RDD与RDD之间的依赖关系,Spark任务的容错机制就是根据这个特性实现的
RDD并不实际存储真正要计算的数据,而是记录的数据的位置,数据的转换关系;同时,所有的RDD转换都是一个惰性求值的过程,只有当发生一个要求返回结果给driver的action动作时,才会真正地运行
使用惰性求值/延迟执行的优点:在Action时形成DAG进行stage的划分和并行优化,提高效率
RDD持久化/缓存
某些RDD的计算或转换可能比较耗时,如果这些RDD后续会被频繁地使用到,可以考虑将这些RDD进行持久化/缓存,使用方式如:rdd.cache
,在调用cache方法之后,触发了后面的action时,该RDD就会被缓存在计算节点的内存中,供后面重用
缓存的级别有很多,默认只存储在内存当中,在开发中通常使用memory_and_disk
弊端:虽然持久化/缓存可以把数据放在内存当中,速度快但可靠性较差,即使选择将其持久化到磁盘上也可能遭遇磁盘损坏的情况
改进:想要获得更可靠的数据持久化效果,可以考虑使用checkpoint,将数据放到HDFS上,天然地借助HDFS的天生高容错、高可靠来实现数据最大程度的安全,实现RDD的容错和高可用,使用方法如下:
sc.setCheckpointDir("HDFS目录")
RDD.checkpoint
RDD对于Spark容错的支持
一般来说,支持容错有两种常见的实现方式:数据复制或日志记录,对于以数据为中心的系统来说,这两种方式都非常昂贵,因为它需要跨集群网络拷贝大量数据,涉及到大量的耗时操作
而RDD是天然支持容错的:它自身是一个不变的只读数据集,可以通过依赖关系、计算函数等记住构建它的操作图,因此当执行任务的worker执行失败时,完全可以通过操作图来获得之前执行的操作,进行重新计算。这种方式下无需采用复制的方式支持容错,很好地降低了跨网络的数据传输成本
不过,在某些场景下,Spark 也需要利用记录日志的方式来支持容错。例如,在 Spark Streaming 中,针对数据进行 update 操作,或者调用 Streaming 提供的 window 操作时,就需要恢复执行过程的中间状态。此时,需要通过 Spark提供的 checkpoint 机制,以支持操作能够从 checkpoint 得到恢复。
RDD 算子
Transformation:转换操作,返回一个新的RDD
Action:动作操作,无返回值或返回非RDD数据、
标签:分区,容错,RDD,详解,计算,Spark,数据 From: https://www.cnblogs.com/pinoky/p/18435294