1、RDD的缓存
当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。 主要作用: 提升Spark程序的计算效率 注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此缓存的数据是不太稳定可靠。 由于是临时存储,可能会存在丢失,所以缓存操作,并不会将RDD之间的依赖关系给截断掉(丢失掉),因为当缓存 失效后,可以全部重新计算 缓存的API都是Lazy惰性的,如果需要触发缓存操作,推荐调用count算子,因为运行效率高
缓存相关API:
设置缓存的API: rdd.cache(): 将RDD的数据缓存储内存中 rdd.persist(缓存的级别/位置): 将RDD的数据存储在指定位置 手动清理缓存API: rdd.unpersist() 默认情况下,当整个Spark应用程序执行完成后,缓存数据会自动失效,会被自动删除 缓存的级别/位置: DISK_ONLY: 只存储在磁盘 DISK_ONLY_2: 只存储在磁盘,并且有2个副本 DISK_ONLY_3: 只存储在磁盘,并且有3个副本 MEMORY_ONLY: 只存储在内存中 MEMORY_ONLY_2: 只存储在内存中,并且有2个副本 MEMORY_AND_DISK: 存储在内存和磁盘中,先放在内存,再放在磁盘 MEMORY_AND_DISK_2: 存储在内存和磁盘中,先放在内存,再放在磁盘,并且有2个副本 OFF_HEAP: Executor进程的堆外内存 工作中最常用的是: MEMORY_AND_DISK和MEMORY_AND_DISK_2
演示缓存代码:
# 导包 import os import time import jieba from pyspark import SparkConf, SparkContext, StorageLevel # 绑定指定的python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' def get_topN_keyword(etlRDD, n): r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \ .filter(lambda word: word not in ('.', '+', '的')) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda agg, curr: agg + curr) \ .top(n, lambda t: t[1]) print(r1) def get_topN_search(etlRDD, n): r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \ .reduceByKey(lambda agg, curr: agg + curr) \ .top(n, lambda t: t[1]) print(r2) # 创建main函数 if __name__ == '__main__': # 1.创建SparkContext对象 conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]') sc = SparkContext(conf=conf) # 2.数据输入 textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample') # 3.数据处理(切分,转换,分组聚合) etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter( lambda line_list: len(line_list) >= 6) # 去除搜索内容两端的 [ ] etlRDD = etlRDD.map(lambda line_list: [ line_list[0], line_list[1], line_list[2][1:-1], line_list[3], line_list[4], line_list[5] ]) # 不加缓存 # etlRDD.count() # 7.TODO: cache添加缓存,注意: 只能把缓存添加内存!相对用的少 # etlRDD.cache().count() # 8.TODO: persist添加缓存,注意: 可以修改缓存级别 etlRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2).count() # 4.数据输出 # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的' 都需要过滤 # 伪SQL:select 关键词 ,count(*) from 搜狗表 group by 关键词 get_topN_keyword(etlRDD, 10) # 8.TODO: 如果不想用缓存,可以使用unpersist释放缓存,给哪个rdd加的,就给哪个释放 etlRDD.unpersist() # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据 # 伪SQL:select 用户,搜索内容,count(*) from 搜狗表 group by 用户,搜索内容 get_topN_search(etlRDD, 5) # 6.为了方便查看页面,可以让程序多睡会儿 time.sleep(500) # 5.关闭资源 sc.stop()
图解缓存效果:
无缓存的DAG流程图显示:
有缓存的DAG流程图显示:
cache基于内存:
persist可以修改缓存级别: 同时基于内存和磁盘
2、RDD的checkpoint检查点
RDD缓存主要是将数据存储在内存中,是临时存储,不太稳定,它主要是用来提升程序运行效率的。RDD的checkpoint(检查点)主要是将数据存储在HDFS上,是持久化存储。而HDFS存储数据有3副本的机制,让数据更加安全可靠。 checkpoint认为使用磁盘或者HDFS存储数据之后,数据非常的安全可靠,因此checkpoint会将RDD间的依赖关系给删除/丢弃掉。因此如果checkpoint的数据真的出现了问题,是无法在从头开始计算。 checkpoint主要作用: 提高程序的容错性 注意事项: checkpoint可以将数据存储在磁盘或者HDFS上,主要是将数据存储在HDFS上。 相关API: sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径 rdd.checkpoint(): 对指定RDD启用checkpoint rdd.count(): 触发checkpoint
代码演示:
# 导包 import os import time import jieba from pyspark import SparkConf, SparkContext, StorageLevel # 绑定指定的python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' def get_topN_keyword(etlRDD, n): r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \ .filter(lambda word: word not in ('.', '+', '的')) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda agg, curr: agg + curr) \ .top(n, lambda t: t[1]) print(r1) def get_topN_search(etlRDD, n): r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \ .reduceByKey(lambda agg, curr: agg + curr) \ .top(n, lambda t: t[1]) print(r2) # 创建main函数 if __name__ == '__main__': # 1.创建SparkContext对象 conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]') sc = SparkContext(conf=conf) # 2.数据输入 textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample') # 3.数据处理(切分,转换,分组聚合) etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter( lambda line_list: len(line_list) >= 6) # 去除搜索内容两端的 [ ] etlRDD = etlRDD.map(lambda line_list: [ line_list[0], line_list[1], line_list[2][1:-1], line_list[3], line_list[4], line_list[5] ]) # 不加缓存 # etlRDD.count() # 7.TODO: 先拿着sc对象设置检查点保存位置, 建议用hdfs,这样能利用hdfs的高可靠高可用性 sc.setCheckpointDir('hdfs://node1:8020/ckpt') # 8.TODO: 添加检查点checkpoint etlRDD.checkpoint() etlRDD.count() # 4.数据输出 # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的' 都需要过滤 # 伪SQL:select 关键词 ,count(*) from 搜狗表 group by 关键词 get_topN_keyword(etlRDD, 10) # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据 # 伪SQL:select 用户,搜索内容,count(*) from 搜狗表 group by 用户,搜索内容 get_topN_search(etlRDD, 5) # 6.为了方便查看页面,可以让程序多睡会儿 time.sleep(500) # 5.关闭资源 sc.stop()
没有设置检查点正常的DAG执行流图:
设置检查点后:
3、缓存和checkpoint的区别
面试题:Spark提供了两种持久化方案。一种为缓存操作,一种为checkpoint方案。请问有什么区别呢?
1- 数据存储位置不同 缓存: 存储在内存或者磁盘 或者 堆外内存中 checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上 2- 数据生命周期: 缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除 checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除 3- 血缘关系: 缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作 checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行 4- 主要作用不同: 缓存: 提高Spark程序的运行效率和容错性 checkpoint检查点: 提高Spark程序的容错性和高可用,高可靠性
思考:既然持久化的方案有两种,那么在生产环境中应该使用哪种方案呢?
在同一个项目中,推荐缓存和checkpoint(检查点)同时配合使用。 使用顺序如下: 在代码中设置缓存和checkpoint检查点,然后再一同使用Action算子触发!!! 使用count算子触发 实际过程如下: 程序会优先从缓存中读取数据,如果发现缓存中没有数据。再从checkpoint中读取数据,并且接着将读取到的数据重新在内存中放置一份,后续还是优先从缓存中读取
测试:
# 导包 import os import time import jieba from pyspark import SparkConf, SparkContext, StorageLevel # 绑定指定的python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' def get_topN_keyword(etlRDD, n): r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \ .filter(lambda word: word not in ('.', '+', '的')) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda agg, curr: agg + curr) \ .top(n, lambda t: t[1]) print(r1) def get_topN_search(etlRDD, n): r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \ .reduceByKey(lambda agg, curr: agg + curr) \ .top(n, lambda t: t[1]) print(r2) # 创建main函数 if __name__ == '__main__': # 1.创建SparkContext对象 conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]') sc = SparkContext(conf=conf) # 2.数据输入 textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample') # 3.数据处理(切分,转换,分组聚合) etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter( lambda line_list: len(line_list) >= 6) # 去除搜索内容两端的 [ ] etlRDD = etlRDD.map(lambda line_list: [ line_list[0], line_list[1], line_list[2][1:-1], line_list[3], line_list[4], line_list[5] ]) # 不加缓存 # etlRDD.count() # 7.TODO: persist添加缓存,注意: 可以修改缓存级别 etlRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2) # 8.TODO: 先拿着sc对象设置检查点保存位置, 建议用hdfs,这样能利用hdfs的高可靠高可用性 sc.setCheckpointDir('hdfs://node1:8020/ckpt') etlRDD.checkpoint() etlRDD.count() # TODO:触发缓存和检查点 etlRDD.count() # 4.数据输出 # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的' 都需要过滤 # 伪SQL:select 关键词 ,count(*) from 搜狗表 group by 关键词 get_topN_keyword(etlRDD, 10) # 7.TODO: 如果不想用缓存,可以使用unpersist释放缓存,给哪个rdd加的,就给哪个释放 # etlRDD.unpersist() # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据 # 伪SQL:select 用户,搜索内容,count(*) from 搜狗表 group by 用户,搜索内容 get_topN_search(etlRDD, 5) # 6.为了方便查看页面,可以让程序多睡会儿 time.sleep(500) # 5.关闭资源 sc.stop()
DAG有向无环图:
标签:Core,缓存,list,checkpoint,etlRDD,Spark,line,lambda From: https://blog.csdn.net/weixin_65694308/article/details/139901903