RDD详解
- 前提:MapReduce框架采用非循环式的数据流模型,把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销,且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象,因此出现了RDD这个概念
概念
- RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合
- Resilient:它是弹性的,RDD中的数据可以保存在内存或者磁盘里面
- Distributed:它里面的元素是分布式存储的,可以用于分布式计算
- Dataset: 它是一个集合,可以存放很多元素
并行计算能力:指Spark框架将大型数据集自动分割成多个小块,并将这些小块分散到多个计算节点上,每个节点可以独立地对其分配到的数据块进行处理和计算,从而实现计算任务的并行执行,类似于多个人同时完成不同的部分工作,最后将各自的结果汇总起来,以提高数据处理的速度和效率
属性
- 分区列表:数据集的基本组成单位,对于RDD来说,每个分片都会被一个计算任务处理,分片数决定并行度,用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值
- 计算函数:一个函数会被作用在每一个分区,Spark中RDD的计算是以分片为单位的,compute函数会被作用到每个分区上
- 依赖关系:一个RDD会依赖于其他多个RDD,RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系,在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算,这也是Spark的容错机制
- 分区函数:可选项,对于kv类型的RDD会有一个Partitioner,即RDD的分区函数,默认为HashPartitioner
- 最佳位置:可选项,一个列表,存储存取每个分区的优先位置,对于一个HDFS文件来说,这个列表保存的就是每个分区所在的块的位置,按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算
RDD是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算
RDD API
RDD的创建方式
(1)由外部存储系统的数据集创建,包括本地的文件系统,还有Hadoop支持的数据集,比例HDFS、Cassandra、HBase等
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
- val关键字用于声明一个不可变的变量rdd1,
- sc是一个SparkContext对象的实例,它是与Spark集群交互的主要入口,通过SparkContext,可以创建RDD、执行作业、获取集群的信息等
- .textFile()是SparkContext对象的一个方法,用于读取存储在给定URI(统一资源标识符)的文本文件,这里的URI是一个HDFS的路径,指向了文件words.txt的位置
- 这段代码执行结束后,rdd1变量就会指向一个包含文件words.txt中所有行的RDD,每个元素都是文件中的一行文本,可以被用于后续的转换(map、filter、reduceByKey等)和行动(count、collect、saveAsTextFile等)操作
(2)通过已有的RDD经过算子转换生成新的RDD
val rdd2=rdd1.flatMap(_.split(" "))
- .flatMap():RDD的转换操作,它将函数应用于RDD的每个元素,然后将结果扁平化为一个新的RDD,这里传递给flatMap的是一个匿名函数 _.split(" ")
- ** _.split(" ")**:这个函数对RDD中的每个元素应用split方法,以空格为分隔符将行分割成单词数组,flatMap将这些数组扁平化成一个单一的RDD,_在scala语言中是一个通配符,表示匿名函数的参数,这里指rdd1中的每一行文本
(3)由一个已经存在的Scala集合创建
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
- parallelize方法通常用于将本地数据集快速转换为RDD,以便于在Spark中进行并行计算
RDD算子
注意
- RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数)
- RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算,只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正运行
- 之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行
(1)Transformation转换操作:返回一个新的RDD
转换算子 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素,所以func应该返回一个序列,而不是单一元素 |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分区上运行,因此在类型为T的RDD上运行时,返回一个类型为U的RDD |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值 |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD,不会去除重复元素 |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
coalesce(numPartitions) | 减少RDD的分区数到指定值,在过滤大量数据之后,可以执行此操作 |
repartition(numPartitions) | 重新给RDD分区 |
(2)Action动作操作:返回值不是RDD(无返回值或返回其他的)
动作算子 | 含义 |
---|---|
reduce(func) | 通过fun 函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前n个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统 |
saveAsObjectFile(path) | 将数据集的元素,以Java序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数 |
foreach(func) | 在数据集的每一个元素上,运行函数 func 进行更新 |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
统计操作
算子 | 含义 |
---|---|
count | 个数 |
mean | 均值 |
sum | 求和 |
max | 最大值 |
min | 最小值 |
variance | 方差 |
sampleVariance | 从采样中计算方差 |
stdev | 标准差:衡量数据的离散程度 |
sampleStdev | 采样的标准差 |
stats | 查看统计结果 |