val count: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
count.collect()
val value: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupBy(_._1)
value.collect()
在上面代码中mapRDD转换两次:reduceByKey 和 groupBy,这种情况下RDD是从头执行两遍;性能不高
解决方法:RDD持久化
mapRDD.cache() 并不会立即持久化,而是在触发后面的action算子时,才会缓存在计算节点的内存中
mapRDD.persist(StorageLevel.MEMORY_AND_DISK) 可以设置存储级别 内存 或 磁盘
自己使用,用完后丢弃
检查点:检查点可以切断血缘关系,检查点其实就是将RDD结果写入磁盘(一般是写入HDFS分布式环境)
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发
检查点需要指定磁盘目录,为了安全,检查点会执行两遍RDD(优化方法:持久化和检查点结合使用,先持久化,再检查点)
mapRDD.cache()
mapRDD.checkpoint()