5.3.算子
5.3.1.何为算子
算子是一个函数空间到另一个函数空间的映射。广义的讲,对任何函数进行某一项操作都可以认为是一个算子。
5.3.2.分类
转换算子
行动算子
控制算子
5.3.3.转换算子(单Value)
5.3.3.1.map
将处理的数据逐条进行映射转换,将返回值构成新的 RDD。这里的转换可以是类型的转换,也可以是值的转换。
5.3.3.2. mapPartitions & mapPartitionsWithIndex
mapPartitions 是以分区为单位进行数据转换操作,会将整个分区的数据加载到内存。处理完的数据不会立刻释放,因为存在引用关系,所以在内存较小,数据量较大的情况下,容易出现OOM 内存溢出。
能用 mapPartitions 的地方都可以用 map 解决,两者的主要区别是调用的粒度不一样:map 的输入变换函数是应用于RDD 中的每个元素,而 mapPartitions 的输入函数是应用于每个分区。
有些时候比如连接数据库时用 mapPartitions 比较好,因为每次连接开销很大,每个分区连一次比每调用一次连接一次要好。mapPartitionsWithIndex 跟 mapPartitions 差不多,只是参数多了个分区号而已。
5.3.3.3.flatMap
扁平化映射,将嵌套的集合数据展开到最外层的集合中
如List(1,2,3,List(4,5,6,List(7,8,9)))
扁平化后 List(1,2,3,4,5,6,7,8,9)
// 案例练习:对 List(List(1, 2), 3, List(4, 5)) 进行扁平化映射处理。
val mkRDD02: RDD[Any] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
val resultRDD: RDD[Any] = mkRDD02.flatMap {
case i: List[_] => i
case i => List(i)
}
resultRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
5.3.3.4. glom
将同一个分区的多个单个数据直接转换为相同类型的单个数组进行处理,适用于以分区为单位的数据统计。
// 案例练习:求每个分区的最大值,再求所有分区的最大值总和。
val mkRDD02: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2)
val glomRDD02: RDD[Array[Int]] = mkRDD02.glom()
val mapRDD: RDD[Int] = glomRDD02.map(_.max)
mapRDD.foreach(recore => println(s"分区:${TaskContext.getPartitionId()},最大值:${recore}"))
println(s"所有分区的最大值总和:${mapRDD.collect().sum}")
5.3.3.5. groupBy(shuffle)
分组是指将数据按指定条件进行分组,从而方便我们对数据进行统计分析。按照传入函数的返回值进行分组,将相同的 Key 对应的值放入一个迭代器中。使用 groupBy 算子后数据会被打乱重新组合,我们将这样的操作称之为 Shuffle。
// 案例练习:将 List("张三" -> "男", "李四" -> "女", "王五" -> "男") 按性别分组并统计人数。
val list02 = List("张三" -> "男", "李四" -> "女", "王五" -> "男")
val mkRDD02: RDD[(String, String)] = sc.makeRDD(list02, 2)
// 按照性别进行分组
val groupByRDD02: RDD[(String, Iterable[(String, String)])] = mkRDD02.groupBy(_._2)
// 统计不同性别学生的人数
val mapRDD: RDD[(String, Int)] = groupByRDD02.map(kv => kv._1 -> kv._2.size)
5.3.3.6. filter
过滤是指过滤出符合一定条件的元素。将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(1 to 9, 2)
// filter 转换算子
val filterRDD: RDD[Int] = mkRDD.filter(_ % 2 == 0)
// foreach 行动算子
filterRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
5.3.3.7.sample
取样,采样就是从大量的数据中获取少量的数据,获取的方法可以依据某种策略,得到的数据用于分析,企图使用少量数据的分析结果代替全局。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(1 to 10, 2)
// sample 转换算子
val sampleRDD01: RDD[Int] = mkRDD.sample(false, 0.3)
// foreach 行动算子
sampleRDD01.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
val sampleRDD02: RDD[Int] = mkRDD.sample(true, 3)
sampleRDD02.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
5.3.3.8.distinct(shuffle)
将数据集中重复的数据去重。Scala 的 distinct 底层采用了 HashSet 的方式,而 Spark 的 distinct 则是采用了 mapreduceByKey map 的方式进行去重。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(1, 1, 2, 2, 3, 4, 5), 2)
// distinct 转换算子
val mapRDD: RDD[Int] = mkRDD.distinct()
// 指定并行度为 2
// val mapRDD: RDD[Int] = mkRDD.distinct(2)
// foreach 行动算子
mapRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
5.3.2.9. coalesce(shuffle)
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。(可以简单地理解为合并分区,可能导致数据倾斜)。当 Spark 程序中存在过多小任务时,可以通过 coalesce 方法合并分区,减少分区的个数,减小任务调度成本。
// 创建 RDD 并设置 3 个分区
val mkRDD: RDD[Int] = sc.makeRDD(1 to 10, 3)
mkRDD.saveAsTextFile("3")
println(mkRDD.partitions.length)
// coalesce 转换算子
val coalesceRDD01: RDD[Int] = mkRDD.coalesce(2)
coalesceRDD01.saveAsTextFile("2-1")
println(coalesceRDD01.partitions.length)
// 为了缩减分区后数据均衡,可以进行 Shuffle 处理
val coalesceRDD02: RDD[Int] = mkRDD.coalesce(2, true)
coalesceRDD02.saveAsTextFile("2-2")
println(coalesceRDD02.partitions.length)
5.3.2.10. repartition(shuffle)
重新分配分区数,一定会产生 Shuffle 操作,底层就是调用了 coalesce 。
// 创建 RDD 并设置 3 个分区
val mkRDD: RDD[Int] = sc.makeRDD(1 to 10, 3)
mkRDD.saveAsTextFile("3")
println(mkRDD.partitions.length)
// repartition 转换算子
val repartitionRDD: RDD[Int] = mkRDD.repartition(4)
repartitionRDD.saveAsTextFile("4")
println(repartitionRDD.partitions.length)
5.3.2.11. sortBy(shuffle)
sortBy 函数可以根据指定的规则对数据源中的数据进行排序,默认为升序,该函数会产生 Shuffle 操作。
即是转换算子又是行动算子,所以sortBy会产生Job
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 9, 7), 2)
// sortBy 转换算子,默认为正序,字符串按字典序排序
val sortByAscRDD: RDD[Int] = mkRDD.sortBy(num => num)
// saveAsTextFile 行动算子
sortByAscRDD.saveAsTextFile("asc")
// ascending = false 时为降序
val sortByDescRDD: RDD[Int] = mkRDD.sortBy(num => num, false)
sortByDescRDD.saveAsTextFile("desc")
5.3.3. 转换算子(双 Value)
5.3.3.1. 并集/交集/差集/笛卡尔积
union :表示对两个 RDD 进行并集(合并)操作,且不去重
intersection :表示对两个 RDD 取交集(相同)
subtract :表示对两个 RDD 取差集(不同)
cartesian :表示对两个 RDD 进行笛卡尔积操作,尽量避免使用
// 创建 RDD
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
// union 并集,转换算子
val unionRDD: RDD[Int] = rdd1.union(rdd2)
// foreach 行动算子
unionRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
// 并集后去重
unionRDD.distinct().foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:
${record}"))
// 交集,转换算子
val intersectionRDD: RDD[Int] = rdd1.intersection(rdd2)
intersectionRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:
${record}"))
// subtract 差集,转换算子
val subtractRDD: RDD[Int] = rdd1.subtract(rdd2)
subtractRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
// cartesian 笛卡尔积(尽量避免使用),转换算子
val cartesianRDD: RDD[(Int, Int)] = rdd1.cartesian(rdd2)
cartesianRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
5.3.3.2. 拉链
将两个 RDD 合并成一个 RDD,两个 RDD 的 Partition 数量以及元素数量都必须相同,否则会抛出异常。使用 zip 将 rdd1 和 rdd2 拉链成一个 rdd
// 创建 RDD
val rdd1: RDD[String] = sc.makeRDD(List("张三", "李四", "王五"), 2)
val rdd2: RDD[Int] = sc.makeRDD(List(18, 19, 20), 2)
// zip 拉链,转换算子
val zipRDD: RDD[(String, Int)] = rdd1.zip(rdd2)
// foreach 行动算子
zipRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
5.3.4. 转换算子(Key-Value)
5.3.4.1. partitionBy(shuffle)
将数据按照指定分区数重新进行分区,Spark 默认采用 HashPartitioner。
// 创建 RDD
val mkRDD: RDD[(Int, String)] = sc.makeRDD(List((1, "张三"), (2, "李四"), (3, "王五"), (4, "赵
六")), 4)
println(mkRDD.partitions.length)
mkRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${record._2})"))
// partitionBy 转换算子
val partitionByRDD: RDD[(Int, String)] = mkRDD.partitionBy(new HashPartitioner(2))
println(partitionByRDD.partitions.length)
partitionByRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${rlecord._2})"))
5.3.4.2. sortByKey(shuffle)
将 K, V 格式数据的 Key 根据指定的规则进行排序,默认为升序。如果 Key 是元组,如 (x1, x2, x3, ...),会先按照 x1 排序,若 x1 相同,再按 x2 排序,依次类推。sortByKey 同样是来自 PairRDDFunctions 的函数。
即是转换算子,又是行动算子。
// 创建 RDD
val mkRDD: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (1, 2), (3, 4), (2, 3), (3, 6), (3, 8)), 2)
// sortByKey 转换算子,默认为正序,字符串按字典序排序
val sortByKeyAscRDD: RDD[(Int, Int)] = mkRDD.sortByKey()
// saveAsTextFile 行动算子
sortByKeyAscRDD.saveAsTextFile("asc")
// ascending = false 时为降序
val sortByKeyDescRDD: RDD[(Int, Int)] = mkRDD.sortByKey(ascending = false)
sortByKeyDescRDD.saveAsTextFile("desc")
5.3.4.3. reduceByKey(shuffle)
将相同 Key 的值聚合到一起,Reduce 任务的个数可以通过 numPartitions 参数来设置。reduceByKey 同样是来自PairRDDFunctions 的函数。
有预聚合,减少数据落盘
// 创建 RDD
val mkRDD: RDD[(String, Int)] = sc.makeRDD(List(("Hello", 1), ("Hadoop", 1),
("Hello", 1), ("ZooKeeper", 1),
("Hello", 1), ("Hadoop", 1), ("Hive", 1)), 2)
// reduceByKey 转换算子
val reduceByKeyRDD: RDD[(String, Int)] = mkRDD.reduceByKey(_ + _)
// foreach 行动算子
reduceByKeyRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${record._2})"))
5.3.4.4. groupByKey(shuffle)
前面学习了 groupBy 现在又来一个 groupByKey,两者有什么区别呢?
groupBy:按传入的函数逻辑进行分组,如果传入的函数中数据格式为 K, V 格式数据,会返回 (K, Iterable[(K, V) 格式数据;
groupByKey:按 K, V 格式数据的 Key 进行分组,会返回 (K, Iterable[V]) 格式数据。groupByKey 同样是来自PairRDDFunctions 的函数。
// 创建 RDD
val mkRDD: RDD[(String, Int)] = sc.makeRDD(List(("Hello", 1), ("Hadoop", 1),
("Hello", 1), ("ZooKeeper", 1),
("Hello", 1), ("Hadoop", 1), ("Hive", 1)), 2)
// groupByKey 转换算子
val groupByKeyRDD: RDD[(String, Iterable[Int])] = mkRDD.groupByKey()
// foreach 行动算子
groupByKeyRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${record._2})"))
5.3.4.5. aggregateByKey(shuffle)
reduceByKey 还有一个特点就是分区内和分区间的计算逻辑必须一致,那不一致的案例是怎样的呢?
前面学习的 glom 算子的案例练习:计算每个分区的最大值(分区内),再求所有分区的最大值总和(分区间)。
但是 glom 算子的案例练习还搭配了 map 算子才实现了最终需求,所以 Spark 为了让程序员更方便的使用,提供了aggregateByKey 算子,该算子无需搭配其他算子就可以实现分区内和分区间不同的计算逻辑。aggregateByKey 同样是来自PairRDDFunctions 的函数。
// 创建 RDD
val mkRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6),
("c", 8)), 2)
mkRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${record._2})"))
// aggregateByKey 转换算子
val aggregateByKeyRDD: RDD[(String, Int)] = mkRDD.aggregateByKey(0)(math.max(_, _), _ + _)
// foreach 行动算子
aggregateByKeyRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${record._2})"))
5.3.4.6. mapValues
针对于 K, V 形式的数据只对 V 进行操作。
// 需求:计算每个分区的平均值。
// 创建 RDD
val mkRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6),
("c", 8)), 2)
// aggregateByKey 转换算子
// 分区内计算返回结果为:(分区内数据相加, 相加次数)
val aggregateByKeyRDD: RDD[(String, (Int, Int))] = mkRDD.aggregateByKey((0, 0))(
// t 是 0 -> 0,v 是 kv 的 v
(t, v) => (t._1 + v, t._2 + 1),
// 所有分区的 t1._1 + t2._1 数据相加,t1._2 + t2._2 次数相加
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)
)
// mapValues 转换算子
// 求平均值
val mapValuesRDD: RDD[(String, Int)] = aggregateByKeyRDD.mapValues(t => t._1 / t._2)
// foreach 行动算子
mapValuesRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${record._2})"))
5.3.4.7. foldByKey(shuffle)
当分区内和分区间计算逻辑相同时,Spark 为了让程序员更方便的使用,提供了 foldByKey 算子。foldByKey 同样是来自 PairRDDFunctions 的函数。
// 创建 RDD
val mkRDD: RDD[(String, Int)] = sc.makeRDD(List(("Hello", 1), ("Hadoop", 1),
("Hello", 1), ("ZooKeeper", 1),
("Hello", 1), ("Hadoop", 1), ("Hive", 1)), 2)
// foldByKey 转换算子,foldByKey 实现 WordCount
val aggregateByKeyRDD: RDD[(String, Int)] = mkRDD.foldByKey(0)(_ + _)
// foreach 行动算子
aggregateByKeyRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${record._2})"))
5.3.4.8. combineByKey(shuffle)
aggregateByKey 算子和 foldByKey 算子都是给每个分区的第一个 Key 的 Value 一个初始值,如果想要再灵活点,可以使用 combineByKey 算子,因为它的第一个参数是给每个分区的第一个 Key 的 Value 一个初始值规则(使用createCombiner 函数来初始化第一个 Key 的初始值)。combineByKey 同样是来自 PairRDDFunctions 的函数。
// 创建 RDD
val mkRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6),
("c", 8)), 3)
// combineByKey 转换算子
val aggregateByKeyRDD: RDD[(String, (Int, Int))] = mkRDD.combineByKey(
// 给每个分区的第一个 Key 一个初始值规则
(_, 1),
// t 是 0 -> 0,v 是 kv 的 v
(t, v) => (t._1 + v, t._2 + 1),
// 所有分区的 t1._1 + t2._1 数据相加,t1._2 + t2._2 次数相加
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)
)
// mapValues 转换算子
// 求平均值
val mapValuesRDD: RDD[(String, Int)] = aggregateByKeyRDD.mapValues(t => t._1 / t._2)
// foreach 行动算子
mapValuesRDD.foreach(record =>
println(s"分区:${TaskContext.getPartitionId()},记录:(${record._1}, ${record._2})"))
5.3.4.9. 聚合算子之间的区别
通过源码可以得知五种聚合算子最终都会调用到 combineByKeyWithClassTag 这个函数。所以只需要把入参解读一下就大概知道它们之间的区别了。
5.3.4.10. join
在类型为 K, V 和 K, W 的 RDD 上调用,返回一个相同 Key 对应的所有元素对在一起的 K, (V, W) 的 RDD。
// 创建 RDD
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)), 2)
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("c", 7), ("a", 5), ("b", 6)), 2)
// join 转换算子
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
// foreach 行动算子
joinRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
5.3.4.11. cogroup
在类型为 K, V 和 K, W 的 RDD 上调用,返回一个 K, (Iterable,Iterable) 类型的 RDD。
// 创建 RDD
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("d", 3), ("a", 4)), 2)
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("c", 7), ("a", 5), ("b", 6), ("a", 8), ("b", 9)),
2)
// cogroup 转换算子
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
// foreach 行动算子
cogroupRDD.foreach(record => println(s"分区:${TaskContext.getPartitionId()},记录:${record}"))
5.3.5. 行动算子
5.3.5.1. reduce
通过函数聚合 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 4, 5), 2)
// reduce 行动算子
println(mkRDD.reduce(_ + _)) // 15
5.3.5.2. count
返回 RDD 中元素的个数。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 4, 5), 2)
// count 行动算子
val result02: Long = mkRDD.count()
println(result02) // 5
5.3.5.3. take
返回 RDD 的前 n 个元素组成的数组。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 4, 5), 2)
// take 行动算子
val result03: Array[Int] = mkRDD.take(3)
println(result03.mkString(",")) // 3,1,2
5.3.5.4. takeOrdered
返回 RDD 排序后的前 n 个元素组成的数组,默认正序。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 4, 5), 2)
// takeOrdered 行动算子
val result04: Array[Int] = mkRDD.takeOrdered(3)(Ordering.Int.reverse)
println(result04.mkString(",")) // 5,4,3
5.3.5.5. first
返回 RDD 中的第一个元素,底层就是 take(1) 。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 4, 5), 2)
// first 行动算子
val result05: Int = mkRDD.first()
println(result05) // 3
5.3.5.6. foreach
循环遍历数据集中的每个元素,运行相应的计算逻辑(函数)。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 4, 5), 2)
// foreach 行动算子
// 在 Executor 中打印数据,有并行的概念,所以会是乱序
mkRDD.foreach(println)
// 在 Driver 中打印数据
mkRDD.collect().foreach(println)
5.3.5.7. foreachPartition
按分区循环遍历数据集中的每个元素,并运行相应的计算逻辑(函数)。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 4, 5), 2)
// foreach 行动算子
// 在 Executor 中打印数据,有并行的概念,所以会是乱序
mkRDD.foreachPartition(partitionOfRecords =>
println(s"分区:${TaskContext.getPartitionId()},记录:${partitionOfRecords.toArray.mkString(",")}"))
分区:0,记录:3,1
分区:1,记录:2,4,5
5.3.5.8. collect
将不同分区的数据收集到 Driver 并以数组的形式返回数据。
只用作小型数据的观察,因为:将大量数据汇集到一个 Driver 节点上,并将数据用数组存放,会占用大量 JVM 堆内存,非常容易造成内存溢出。单机环境下使用 collect 问题并不大,但分布式环境下应尽量规避。
如何规避?
若需要遍历 RDD 中元素,大可不必使用 collect,可以使用 foreach 语句;
若需要打印 RDD 中元素,可先用 take 语句,返回数据集前 N 个元素;
若需要查看其中内容,可用 saveAsTextFile 方法。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(3, 1, 2, 4, 5), 2)
// collect 行动算子
val result06: Array[Int] = mkRDD.collect()
println(result06.mkString(","))
5.3.5.9. aggregate
aggregate 函数会将每个分区里面的元素通过 seqOp 函数和初始值进行聚合,然后用 combOp 函数将每个分区的结果和初始值(zeroValue)进行 combine 操作。这个函数最终返回的类型不需要和 RDD 中元素类型一致。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// aggregate 行动算子
val result: Int = mkRDD.aggregate(10)(_ + _, _ + _)
println(result)
5.3.5.10. fold
当 aggregate 的分区内和分区间计算逻辑相同时,Spark 为了让程序员更方便的使用,提供了 fold 算子。
// 创建 RDD
val mkRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// aggregate 行动算子
val result: Int = mkRDD.fold(10)(_ + _)
println(result)
5.3.5.11. countByValue & countByKey
针对 K, V 类型的 RDD,返回一个 K, Int 的 map,表示每一个 Key 对应的元素个数。countByValue 底层调用的是map().countByKey()。countByKey 底层调用的是 reduceByKey。
// 创建 RDD
val mkRDD: RDD[(String, Int)] = sc.makeRDD(List(("Hello", 1), ("Hadoop", 1),
("Hello", 1), ("ZooKeeper", 1),
("Hello", 1), ("Hadoop", 1), ("Hive", 1)), 2)
// countByValue & countByKey 行动算子
val result01: collection.Map[(String, Int), Long] = mkRDD.countByValue()
println(result01)
val result02: collection.Map[(String, Int), Long] = mkRDD.map(_ -> 1).countByKey()
println(result02)
5.3.5.12. save 系列
saveAsTextFile:将数据集的元素以 TextFile 的形式保存到 HDFS 文件系统或者其他文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本。
saveAsSequenceFile:将数据集中的元素以 Hadoop SequenceFile 的格式保存到 HDFS 文件系统或者其他文件系统。
saveAsObjectFile:将 RDD 中的元素序列化成对象,存储到文件中。
// 创建 RDD
val mkRDD: RDD[(String, Int)] = sc.makeRDD(List(("Hello", 1), ("Hadoop", 1),
("Hello", 1), ("ZooKeeper", 1),
("Hello", 1), ("Hadoop", 1), ("Hive", 1)), 2)
// saveAsTextFile & saveAsObjectFile & saveAsSequenceFile 行动算子
mkRDD.saveAsTextFile("textFile")
mkRDD.saveAsObjectFile("objectFile")
// saveAsSequenceFile 算子要求数据必须为 K, V 类型
mkRDD.saveAsSequenceFile("sequenceFile")
5.3.6. 案例练习:WordCount
13种实现方法
5.3.7. 控制算子
控制算子可以将 RDD 持久化,持久化的单位是 Partition 。
5.3.7.1. cache
适用场景:应用只提交一次,且某个 RDD 的数据会被多次使用,例如刚才案例中的 rdd3,为了防止应用多次重新执行,将关键数据进行缓存,提高效率。
val rdd3: RDD[(String, Int)] = rdd2.map(word => {
println("==================== map ====================")
word -> 1
}).cache()
5.3.7.2. persist
我们学的是大数据学科,如果刚才提到的这个关键数据很大内存存不下怎么办?落盘呗。但是这个应用只执行一次就拉倒了,落盘以后执行完作业还要去手动删除就很烦。别着急,Spark 提供了 persist 算子可以解决这个问题。
persist 算子支持修改存储级别,存储级别如下,最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK :
object StorageLevel {
// 不存储任何数据
val NONE = new StorageLevel(false, false, false, false)
// 仅存磁盘
val DISK_ONLY = new StorageLevel(true, false, false, false)
// 仅存磁盘且拥有备份
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
// 仅存内存
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
// 仅存内存且拥有备份
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
// 仅存内存并序列化
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
// 仅存内存并序列化且拥有备份
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
// 内存存不下则溢写到磁盘
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
// 内存存不下则溢写到磁盘并拥有备份
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
// 内存存不下则溢写到磁盘并序列化
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
// 内存存不下则溢写到磁盘并序列化且拥有备份
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
// 使用堆外内存,堆外内存意味着把内存对象分配在 Java 虚拟机的堆以外的内存,这些内存直接受操作系统管理(非虚
拟机)。这样做的结果就是能保护一个较小的堆,以减少垃圾回收对应用的影响。
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
def apply(
useDisk: Boolean,
useMemory: Boolean,
useOffHeap: Boolean,
deserialized: Boolean,
replication: Int): StorageLevel = {
getCachedStorageLevel(
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
}
}
修改存储级别代码如下:
val rdd3: RDD[(String, Int)] = rdd2.map(word => {
println("==================== map ====================")
word -> 1
}).persist(StorageLevel.MEMORY_AND_DISK)
适用场景:应用只提交一次,且某个 RDD 的数据会被多次使用,例如刚才案例中的 rdd3。并且该数据非常大,内存存不下,为了防止应用多次重新执行,将关键数据进行临时落盘,提高效率。
5.3.7.3. checkpoint
如果刚才提到的这个关键数据需要被多个应用使用怎么办?持久落盘。Spark 提供了 checkpoint 算子可以实现该功能。 checkpoint 算子会将数据保存到磁盘(永久保存,一般存储在分布式文件系统中,例如 HDFS),效率低,数据安全。
checkpoint 可以理解为改变了数据源,因为关键数据已经计算完成,没有必要重头进行读取,所以 checkpoint算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系。
// 创建 Checkpoint 目录
sparkContext.setCheckpointDir("hdfs://node02:8020/yjx/cp")
// RDD 持久落盘
rdd3.checkpoint()
checkpoint 算子为了保证数据安全,会自动启动一个新的 Job ,重新计算这个 RDD 的数据,将数据持久化到 Checkpint 目录中。
5.3.7.4. persist 与 unpersist
概念:Spark Persist 是一种将数据持久化到内存中的操作,以便在后续的计算中重复使用。它可以提高计算性能,减少数据读取和写入的开销。
分类:Spark Persist 有多种级别,包括 MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER 等。每种级别都有不同的内存和磁盘使用方式。
优势:通过将数据持久化到内存中,Spark Persist 可以加速后续的计算操作,避免重复的数据读取和写入操作,提高性能和效率。
应用场景:Spark Persist 适用于需要多次使用同一份数据进行计算的场景,例如迭代算法、交互式数据分析和机器学习等。
概念:Spark UnPersist 是一种释放内存中持久化数据的操作,用于在不再需要数据时释放内存资源。
分类:Spark UnPersist 没有具体的分类,它只是用于释放通过 Spark Persist 持久化的数据。
优势:通过释放内存中的持久化数据,Spark UnPersist 可以释放内存资源,避免内存溢出和资源浪费。
应用场景:Spark UnPersist 适用于在不再需要持久化数据时释放内存资源的场景,例如在数据计算完成后或者内存资源紧张时。
默认情况下,Spark 会自动检测每个 persist() 和 cache() 操作,它会检测各个结点的使用情况,如果数据不再使用会把持久化(persisted)的数据删掉,依据的是最近最少使用(least-recently-used LRU)算法。你也可以手动使用 unpersist() 将持久化的数据从内存和磁盘中删掉。
标签:Core,17,val,record,Int,RDD,mkRDD,算子,Spark From: https://blog.csdn.net/qq_64407249/article/details/143885986