概念与特性
RDD (Resilient Distributed Dataset)弹性分布式数据集,是 Spark 中最基本的数据处理模型。
- 弹性
- 存储:内存和磁盘的自动切换
- 容错:数据丢失可以自动恢复
- 计算:计算出错重试机制
- 分片:可根据需要重新分片
- 分布式:数据存储在大数据集群的不同节点上
- 数据集:RDD 封装了计算逻辑,并不保存数据
- 数据抽象:RDD 是一个抽象类,需要子类来具体实现
- 不可变:RDD 封装的计算逻辑是不可改变的。如果非要改变,只能重新生成新的 RDD
- 可分区、并行计算
五大特性
A list of partitions
- RDD 由多个
partition
(某个节点中的某一片连续的数据)组成的list
- 用于执行任务时并行计算,有多少
partition
就对应多少个task
- RDD 的并行度默认从父 RDD 传给子 RDD
- 默认情况下,HDFS 上的一个数据分片就是一个 partiton
// getPartitions 返回的必然是一系列 Partition 类型的数据组成的数组
protected def getPartitions: Array[Partition]
A function for computing each split
- RDD 的计算是以分片为单位的,每个 RDD 都会实现
compute
函数 compute
函数会对迭代器进行复合,不需要保存每次计算的结果
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
A list of dependencies on other RDDs
- RDD 的每次转换都会生成一个新的 RDD,最终会形成类似于有向无环图
- 依赖具体分为宽依赖和窄依赖,并不是所有的 RDD 都有依赖
- 窄依赖:每一个父 RDD 的
Partition
最多被子 RDD 的一个Partition
所使用 - 宽依赖:多个子 RDD 的
Partion
会依赖于同一个父 RDD 的Partition
- 窄依赖:每一个父 RDD 的
- 当某 RDD 中数据丢失时,只需要根据依赖关系重算丢失分区数据,而不需要重算全部 RDD
protected def getDependencies: Seq[Dependency[_]] = deps
Optionally, a Partitioner for key-value RDDs
key-value
型的 RDD 是根据哈希来分区的,类似于 mapreduce 当中的paritioner
接口,决定数据到哪个分区里面- 两种分片函数:
- 基于哈希的
HashPartitioner
- 基于范围的
RangePartitioner
- 基于哈希的
- 只有对于
key-value
的RDD,才会有Partitioner
,非key-value
的 RDD 的Partitioner
的值是None
Partitioner
函数不但决定了 RDD 本身的分片数量,也决定了parent RDD Shuffle
输出时的分片数量
@transient val partitioner: Option[Partitioner] = None
Optionally, a list of preferred locations to compute each split on
- 判断计算发送到哪个节点,效率最高
- 移动数据不如移动计算:数据在哪台机器上,任务就启在哪个机器上,本地数据不用走网络
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
执行原理
在 Yarn 的集群环境下,介绍 RDD 的工作原理
- 启动 Yarn 集群环境
- Spark 通过申请资源创建调度节点(Driver)和计算节点(Executor)
- Spark 框架根据需求将计算逻辑(RDD之间的依赖关系)根据分区划分为不同的任务,再将所有任务加入到任务池中供于调度
- 调度节点(Driver)将任务池中的任务根据计算节点状态发送到对应的计算节点进行计算
基础编程
RDD 的创建
- 从集合中创建 RDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd")
val sc = new SparkContext(sparkConf)
// 从内存中创建 RDD,将内存集合的数据作为处理的数据源
val seq = Seq[Int](1, 2, 3)
// val rdd: RDD[Int] = sc.parallelize(seq)
// makeRDD 就是调用 parallelize 方法
val rdd: RDD[Int] = sc.makeRDD(seq)
rdd.collect().foreach(println)
sc.stop()
- 从文件中创建 RDD:本地文件系统、所有 Hadoop 支持的数据集(HDFS、HBase)
// path 路径以当前环境的根路径为基准
val rdd: RDD[String] = sc.textFile("datas/1.txt")
// path 路径可以是目录的路径,读 datas 目录下所有 txt
val rdd: RDD[String] = sc.textFile("datas")
// path 路径可以使用通配符 * 读以‘1’开头的 txt
val rdd: RDD[String] = sc.textFile("datas/1*.txt")
// path 路径可以使用 hdfs 的分布式文件路径
val rdd: RDD[String] = sc.textFile("hdfs://linux:8020/1.txt")
// textFile 方法以行为单位,wholeTextFiles 方法以文件为单位
val rdd: RDD[(String, String)] = sc.wholeTextFiles("datas")
rdd.collect().foreach(println)
- 从其他 RDD 中创建,通过一个 RDD 运算产生一个新的 RDD
- 直接创建 RDD(new),一般由 Spark 框架自身使用
分区的设定
// 第二个参数 numSlices 表示分区的数量(选,默认并行度:defaultParallelism)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// 将处理的数据按照分区进行保存
rdd.saveAsTextFile("output")
分区数据的分配
// list 长度为4,分区数量为3,不能均分
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 3)
/**
* length:数组长度,4
* numSlices:分区数量 3
* i = 0:start = (0 * 4) / 3 = 0;end = ((0 + 1)* 4) / 3 = 1 => (0,1)
* i = 1:start = (1 * 4) / 3 = 1;end = ((1 + 1)* 4) / 3 = 2 => (1,2)
* i = 2:start = (2 * 4) / 3 = 2;end = ((2 + 1)* 4) / 3 = 4 => (2,4)
*/
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
// textFile 函数中 minPartitions(最小分区数量)= math.min(defaultParallelism, 2)
val rdd: RDD[String] = sc.textFile("datas/1.txt")
// 第二参数就是设置 minPartitions
val rdd: RDD[String] = sc.textFile("datas/1.txt", 3)
// 文件分区规则
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
.......
// totalSize 文件字节数
long totalSize = 0L;
// goalSize 每个分区应该存放多少字节
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
// 定义了一个最小的切片字节大小
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
.......
}
算子
RDD 方法(算子):
- 转换算子(Transformation):功能的补充和封装,将旧的RDD包装成新的RDD(flatMap,map)
- Value 类型
- 双 Value 类型
- Key - Value 类型
- 行动算子(Action):触发任务的调度和作业的执行(collect)
Value 类型
- map:将处理的数据逐条进行映射转换,可以是值的转换,也可以是类型的转换
// 一个分区的 rdd 计算:数据是一个一个执行逻辑。
// 多个分区的 rdd 计算:分区内数据的执行是有序的,不同分区数据计算是无序的。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD: RDD[Int] = rdd.map(_ * 2)
mapRDD.collect().foreach(println)
- mapPartitions:可以以分区为单位进行数据的转换,但会将整个分区的数据加载到内存进行引用。处理完的数据不会被释放,因为存在对象的引用。该算子需要传递一个迭代器,返回一个迭代器。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mpRdd: RDD[Int] = rdd.mapPartitions(
iter => {
println("====")
iter.map(_ * 2)
}
)
mpRdd.collect().foreach(println)
- mapPartitionsWithIndex:将待处理的数据以分区为单位进行处理,同时可以获得当前分区的索引。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD: RDD[Int] = rdd.mapPartitionsWithIndex(
(index, iter) => {
if (index == 1) iter
else Nil.iterator
}
)
mapRDD.collect().foreach(println)
- flatMap:将处理的数据进行扁平化再进行映射处理,称作扁平映射。
val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))
val flatRDD: RDD[Int] = rdd.flatMap(
list => {
list
}
)
flatRDD.collect().foreach(println)
- glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRdd: RDD[Array[Int]] = rdd.glom()
glomRdd.collect().foreach(data => println(data.mkString(",")))
- groupBy:将数据根据指定的规则进行分组,分区数量默认不变,但是数据会被打乱重新组合(shuffle)。一个组的数据可以在一个分区中,但一个分区中并不是只有一组。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val groupRdd: RDD[(Int, Iterable[Int])] = rdd.groupBy(num => num % 2)
groupRdd.collect().foreach(println)
- filter:符合规则的数据保留,筛选过滤时,分区不变,但是分区内的数据可能不均衡,会出现数据倾斜。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)
val filterRdd: RDD[Int] = rdd.filter(_ % 2 != 0)
filterRdd.collect().foreach(println)
- sample:根据指定的规则从数据集中抽取数据。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 1)
// 第一个参数表示样本是否放回
// 第二个参数表示数据被采样的概率,基准值
// 第三个参数为随机种子
val sampleRdd: RDD[Int] = rdd.sample(true, 0.4, 1)
println(sampleRdd.collect().mkString(","))
- distinct:将数据集中数据进行去重。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4), 1)
// map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
val distinctRdd: RDD[Int] = rdd.distinct()
distinctRdd.collect().foreach(println)
- coalesce:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
// coalesce 默认不会将分区的数据打乱重新组合,所以会出现数据倾斜。
// 如果想避免,可以进行 shuffle 处理
// 该算子可以用来扩大分区,但必须要打开 shuffle
val coalesceRdd: RDD[Int] = rdd.coalesce(2, true)
coalesceRdd.saveAsTextFile("output")
- repartition:扩大分区数量。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
// shuffle 是默认打开的
val repartitionRdd: RDD[Int] = rdd.repartition(3)
repartitionRdd.saveAsTextFile("output")
- sortBy:对分区数据进行排序,默认升序。
val rdd: RDD[Int] = sc.makeRDD(List(6, 2, 5, 3, 4, 1), 2)
// shuffle 是默认打开的,第二个参数控制升降序
val sortRdd: RDD[Int] = rdd.sortBy(number => number)
sortRdd.saveAsTextFile("output")
双 Value 类型
- intersection:对源 RDD 和参数 RDD 求交集后返回一个新的 RDD,要求两个 RDD 元素类型相同。
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
val intersectionRdd: RDD[Int] = rdd1.intersection(rdd2)
println(intersectionRdd.collect().mkString(","))
- union:并集
val unionRdd: RDD[Int] = rdd1.union(rdd2)
println(unionRdd.collect().mkString(","))
- subtract:差集
val subtractRdd: RDD[Int] = rdd1.subtract(rdd2)
println(subtractRdd.collect().mkString(","))
- zip:拉链操作可以支持两个 RDD 元素类型不同,但要求两个 RDD 的分区数量和每个分区中的元素数量均相同。
val zipRdd: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(zipRdd.collect().mkString(","))
Key - Value 类型
- partitionBy:将数据按照指定
Partitioner
重新进行分区,Spark 默认的分区器是HashPartitioner
。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRdd: RDD[(Int, Int)] = rdd.map((_, 1))
// 如果重新分区的分区器和当前 RDD 的分区器相同(类型、分区数量)则不会重新分区。
mapRdd.partitionBy(new HashPartitioner(2)).saveAsTextFile("path")
- reduceByKey:将数据按照相同的 Key 对 Value 进行聚合。
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 1)))
// scala 中聚合一般是两两结合,spark 基于 scala,故也是如此。
// reduceByKey 中,如果一个 key 的数据只有一个,是不会参与计算的。
val reduceRdd: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => { x + y })
reduceRdd.collect().foreach(println)
- groupByKey:相同 Key 的数据分在一个组中,形成一个对偶元组,远组的第一个元素就是 Key,第二个元素就是相同 Key 的 Value 集合。
/**
* reduceByKey 和 groupByKey 的区别:
* 从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据
* 进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
* 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,
* 如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
*/
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 1)))
val groupRdd: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupRdd.collect().foreach(println)
- aggregateByKey:将数据根据不同的规则进行分区内计算和分区间计算。
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
// aggregateByKey 算子存在函数柯里化
// 第一个括号:需要传递一个参数,表示初始值,用于分区内第一个值进行二元计算
// 第二个括号:第一个参数表示分区内计算规则,第二个参数表示分区间计算规则
val aggregateRdd: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_, _), _ + _)
aggregateRdd.collect().foreach(println)
- foldByKey:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey。
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val foldRdd: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)
foldRdd.collect().foreach(println)
- combineByKey:与 aggregateByKey 类似,但不需要指定初始值。
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
// 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
// 第二个参数表示:分区内的计算规则
// 第三个参数表示:分区间的计算规则
val combineRdd : RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
(t: (Int, Int), v) => {(t._1 + v, t._2 + 1)},
(t1: (Int, Int), t2: (Int, Int)) => {(t1._1 + t2._1, t1._2 + t2._2)}
)
val resultRdd: RDD[(String, Int)] = combineRdd.mapValues {
case (num, cnt) => num / cnt
}
resultRdd.collect().foreach(println)
- join:两个不同数据源的数据,相同的 Key 的 Value 会被连接在一起(类似笛卡尔积),形成元组。
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
// 如果匹配不上,就不会出现在结果中。
val joinRdd: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRdd.collect().foreach(println)
- leftOuterJoin(rightOuterJoin):左连接和右连接。
- cogroup:在类型为 (K,V) 和 (K,W) 的 RDD 上调用,返回一个 (K,(Iterable
,Iterable )) 类型的 RDD。
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2)))
val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6), ("c", 7)))
// cogroup : connect + group (分组,连接)
val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
/*
(a,(CompactBuffer(1),CompactBuffer(4)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(),CompactBuffer(6, 7)))
*/
行动算子
触发作业 (Job) 执行的方法,底层代码调用的是环境对象的 runJob
方法 ,底层代码中会创建 ActiveJob
,并提交执行。
- reduce:聚集 RDD 中所有元素,先聚合分区内的数据,再聚合分区间的数据。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val sum: Int = rdd.reduce(_ + _)
println(sum)
- collect:将不同分区的数据按照分区顺序采集到 Driver 端内存中,形成数组。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val array: Array[Int] = rdd.collect()
println(array.mkString(","))
- count: 统计数据源中数据的个数。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val countNum: Long = rdd.count()
- first:返回数据源第一个元素。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val firstNum: Int = rdd.first()
- take:获取前 n 个元素。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val array: Array[Int] = rdd.take(2)
- takeOrdered:获取排序后的前 n 个元素组成的数组。
val rdd: RDD[Int] = sc.makeRDD(List(4, 2, 1, 3))
// 降序
val array: Array[Int] = rdd.takeOrdered(2)(Ordering.Int.reverse)
- aggregate:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的聚合。
val rdd: RDD[Int] = sc.makeRDD(List(4, 2, 1, 3))
// aggregateByKey:初始值只参与分区内计算
// aggregate:初始值不仅会参加分区内也会参加分区间计算
val sum: Int = rdd.aggregate(0)(_ + _, _ + _)
- fold:aggregate 简化版,当分区内和分区间聚合函数相同时。
val rdd: RDD[Int] = sc.makeRDD(List(4, 2, 1, 3), 2)
val sum: Int = rdd.fold(10)(_ + _)
- countByValue 和 countByKey
val rdd: RDD[Int] = sc.makeRDD(List(4, 1, 1, 1), 2)
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3)))
val stringToLong: collection.Map[String, Long] = rdd.countByKey()
- save:将数据保存到不同格式的文件中。
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2)), 2)
// 保存成Text文件
rdd.saveAsTextFile("output1")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output2")
// 保存成Sequencefile文件 必须是(k,v)类型
rdd.saveAsSequenceFile("output3")
- foreach
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// Driver 端内存集合中的循环遍历方法
rdd.collect().foreach(println)
println("===")
// Executor 端内存数据打印,是分布式打印
rdd.foreach(println)
序列化
闭包检查
算子以外的代码都是在 Driver 端进行,算子里的代码都是在 Executor 端执行。在 Scala 的函数式编程中,就会导致算子内经常会使用到算子外的数据,这样就形成了闭包的效果。如果使用的算子没有序列化,就无法通过网络 IO 从 Driver 端传到 Executor 端。因此,在执行任务之前,会检查闭包内的对象是否可以进行序列化,这个操作叫做闭包检查。
序列化的方法和属性
object Serial {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("serial")
val sc = new SparkContext(sparkConf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "scala"))
val search = new Search("h")
val value1: RDD[String] = search.getMatch1(rdd)
val value2: RDD[String] = search.getMatch2(rdd)
value1.collect().foreach(println)
value2.collect().foreach(println)
sc.stop()
}
// 类的构造参数是类的属性,构造参数需要进行闭包检测,等同于类需要进行闭包检测
// 可以 extends Serializable 也可以转化为 case
class Search(query: String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函数序列化
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 属性序列化
def getMatch2(rdd: RDD[String]): RDD[String] = {
// 将 query 转换成局部属性
val s: String = query
rdd.filter(x => x.contains(s))
}
}
}
Kryo 序列化框架
- Java 的序列化可以序列化任何的类,但是比较重(字节多),序列化后之后的对象比较大。
- Kryo 速度是 Serializable 的 10 倍,当 RDD 在 shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 序列化了。
- 即使使用 Kryo 序列化,也要继承 Serializable。
血缘关系
RDD 血缘关系
将创建 RDD 的一系列 Lineage(元数据信息和转换行为)记录下来,以便恢复丢失的分区。
宽依赖
同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle。
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]]
窄依赖
窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用。
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
阶段划分
- DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形。
- 当 RDD 中存在 shuffle 依赖时,阶段会自动增加一个。
- 阶段的数量 = shuffle 依赖的数量 + 1
- ResultStage 只有一个,最后需要执行的阶段。
任务划分
- Application:初始化一个 SparkContext
- Job:一个 Action 算子就会生成一个 Job
- Stage:等于宽依赖(ShuffleDependency)的个数加 1
- Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
- Application -> Job -> Stage -> Task 每一层都是 1 对 n 的关系。
持久化
如果一个 RDD 需要重复使用,那么需要从头再次执行来获取数据。虽然 RDD 对象是可重用的,但是数据是无法重用的。
适用场景:
- 多个操作使用到了同一个 RDD 数据(重用)。
- 数据执行时间长,或者某些数据很重要。
val rdd: RDD[String] = sc.makeRDD(List("hello world", "hello spark"))
val flatRdd: RDD[String] = rdd.flatMap(_.split(" "))
val mapRdd: RDD[(String, Int)] = flatRdd.map(word => {(word, 1)})
// cache 默认持久化操作只能将数据保存到内存(JVM 的堆内存)中,会在血缘关系中添加新的依赖,如果要保存到磁盘中,需要用 persist。
// 持久化操作是在行动算子执行时完成的。
// persist 保存到磁盘的操作,当执行完毕后,会被删除,所以不需要指定具体路径。
mapRdd.cache()
mapRdd.persist(StorageLevel.DISK_ONLY)
val reduceRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_ + _)
reduceRdd.collect().foreach(println)
val groupRdd: RDD[(String, Iterable[Int])] = mapRdd.groupByKey()
groupRdd.collect().foreach(println)
检查点
CheckPoint 检查点就是将 RDD 的中间结果写入磁盘。
由于血缘依赖过长会造成容错成本过高,所以会在中间阶段做检查点容错。如果检查点之后的节点出现错误,只需要从检查点开始重做,减少开销。
对 RDD 进行 checkpoint 操作不会立马执行,必须执行 Action 操作才能触发。
sc.setCheckpointDir("checkPoint")
val rdd: RDD[String] = sc.makeRDD(List("hello world", "hello spark"))
val flatRdd: RDD[String] = rdd.flatMap(_.split(" "))
val mapRdd: RDD[(String, Int)] = flatRdd.map(word => {(word, 1)})
// checkpoint 需要落盘,所以要指定检查点保存路径
// 检查点路径保存的文件,当执行完毕后,不会被删除
// 一般保存路径都是在分布式文件系统中
// 为了保证数据安全,所以一般情况下会独立执行作业,因此,为了提高效率,会和cache一起使用
// 执行过程中会切断血缘关系,重新建立新的血缘关系
mapRdd.cache()
mapRdd.checkpoint()
val reduceRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_ + _)
reduceRdd.collect().foreach(println)
val groupRdd: RDD[(String, Iterable[Int])] = mapRdd.groupByKey()
groupRdd.collect().foreach(println)
分区器
- 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值为 None。
- 每个 RDD 的分区 ID 范围:0~(numPartitions - 1),决定这个值属于哪个分区。
object Part {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd")
val sc = new SparkContext(sparkConf)
val rdd: RDD[(String, String)] = sc.makeRDD(List(("nba", "xxx"), ("cba", "xxx"), ("wnba", "xxx"), ("nba", "xxx")), 3)
val partRdd: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)
partRdd.saveAsTextFile("output")
sc.stop()
}
class MyPartitioner extends Partitioner {
// 分区数量
override def numPartitions: Int = 3
// 根据数据的 Key 返回数据的分区索引,从0开始
override def getPartition(key: Any): Int = {
key match {
case "cba" => 0
case "nba" => 1
case _ => 2
}
}
}
}
参考资料
视频:https://www.bilibili.com/video/BV11A411L7CK
博客:
- https://blog.csdn.net/weixin_38750084/article/details/82955062
- https://blog.csdn.net/zym1117/article/details/79532458
- https://blog.csdn.net/weixin_36040866/article/details/122810519