首页 > 其他分享 >摸鱼大数据——Spark Core——缓存和checkpoint

摸鱼大数据——Spark Core——缓存和checkpoint

时间:2024-07-06 11:31:17浏览次数:20  
标签:Core 缓存 list checkpoint etlRDD Spark line lambda

1、RDD的缓存

当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。
​
主要作用: 提升Spark程序的计算效率
注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此缓存的数据是不太稳定可靠。
​
由于是临时存储,可能会存在丢失,所以缓存操作,并不会将RDD之间的依赖关系给截断掉(丢失掉),因为当缓存
失效后,可以全部重新计算
缓存的API都是Lazy惰性的,如果需要触发缓存操作,推荐调用count算子,因为运行效率高

缓存相关API:
设置缓存的API: 
    rdd.cache(): 将RDD的数据缓存储内存中
    rdd.persist(缓存的级别/位置): 将RDD的数据存储在指定位置
​
手动清理缓存API:
    rdd.unpersist()
默认情况下,当整个Spark应用程序执行完成后,缓存数据会自动失效,会被自动删除
​
​
缓存的级别/位置:
    DISK_ONLY: 只存储在磁盘
    DISK_ONLY_2: 只存储在磁盘,并且有2个副本
    DISK_ONLY_3: 只存储在磁盘,并且有3个副本
    MEMORY_ONLY: 只存储在内存中
    MEMORY_ONLY_2: 只存储在内存中,并且有2个副本
    MEMORY_AND_DISK: 存储在内存和磁盘中,先放在内存,再放在磁盘
    MEMORY_AND_DISK_2: 存储在内存和磁盘中,先放在内存,再放在磁盘,并且有2个副本
    OFF_HEAP: Executor进程的堆外内存
    
工作中最常用的是: MEMORY_AND_DISK和MEMORY_AND_DISK_2
演示缓存代码:
# 导包
import os
import time
​
import jieba
from pyspark import SparkConf, SparkContext, StorageLevel
​
# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
​
​
def get_topN_keyword(etlRDD, n):
    r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \
        .filter(lambda word: word not in ('.', '+', '的')) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r1)
​
​
def get_topN_search(etlRDD, n):
    r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r2)
​
​
# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    # 2.数据输入
    textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')
    # 3.数据处理(切分,转换,分组聚合)
    etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(
        lambda line_list: len(line_list) >= 6)
    # 去除搜索内容两端的 [ ]
    etlRDD = etlRDD.map(lambda line_list:
                        [
                            line_list[0],
                            line_list[1],
                            line_list[2][1:-1],
                            line_list[3],
                            line_list[4],
                            line_list[5]
                        ])
    # 不加缓存
    # etlRDD.count()
    # 7.TODO: cache添加缓存,注意: 只能把缓存添加内存!相对用的少
    # etlRDD.cache().count()
    # 8.TODO: persist添加缓存,注意: 可以修改缓存级别
    etlRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2).count()
​
    # 4.数据输出
    # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤
    # 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词
    get_topN_keyword(etlRDD, 10)
​
    # 8.TODO: 如果不想用缓存,可以使用unpersist释放缓存,给哪个rdd加的,就给哪个释放
    etlRDD.unpersist()
​
    # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据
    # 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容
    get_topN_search(etlRDD, 5)
    # 6.为了方便查看页面,可以让程序多睡会儿
    time.sleep(500)
    # 5.关闭资源
    sc.stop()
​
图解缓存效果:

无缓存的DAG流程图显示:

有缓存的DAG流程图显示:

cache基于内存:

persist可以修改缓存级别: 同时基于内存和磁盘

2、RDD的checkpoint检查点

RDD缓存主要是将数据存储在内存中,是临时存储,不太稳定,它主要是用来提升程序运行效率的。RDD的checkpoint(检查点)主要是将数据存储在HDFS上,是持久化存储。而HDFS存储数据有3副本的机制,让数据更加安全可靠。
​
checkpoint认为使用磁盘或者HDFS存储数据之后,数据非常的安全可靠,因此checkpoint会将RDD间的依赖关系给删除/丢弃掉。因此如果checkpoint的数据真的出现了问题,是无法在从头开始计算。
​
checkpoint主要作用: 提高程序的容错性
注意事项: checkpoint可以将数据存储在磁盘或者HDFS上,主要是将数据存储在HDFS上。
​
相关API:
    sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径
    rdd.checkpoint(): 对指定RDD启用checkpoint
    rdd.count(): 触发checkpoint

代码演示:

# 导包
import os
import time
​
import jieba
from pyspark import SparkConf, SparkContext, StorageLevel
​
# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
​
​
def get_topN_keyword(etlRDD, n):
    r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \
        .filter(lambda word: word not in ('.', '+', '的')) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r1)
​
​
def get_topN_search(etlRDD, n):
    r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r2)
​
​
# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    # 2.数据输入
    textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')
    # 3.数据处理(切分,转换,分组聚合)
    etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(
        lambda line_list: len(line_list) >= 6)
    # 去除搜索内容两端的 [ ]
    etlRDD = etlRDD.map(lambda line_list:
                        [
                            line_list[0],
                            line_list[1],
                            line_list[2][1:-1],
                            line_list[3],
                            line_list[4],
                            line_list[5]
                        ])
    # 不加缓存
    # etlRDD.count()
    # 7.TODO: 先拿着sc对象设置检查点保存位置, 建议用hdfs,这样能利用hdfs的高可靠高可用性
    sc.setCheckpointDir('hdfs://node1:8020/ckpt')
    # 8.TODO: 添加检查点checkpoint
    etlRDD.checkpoint()
    etlRDD.count()
​
    # 4.数据输出
    # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤
    # 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词
    get_topN_keyword(etlRDD, 10)
​
    # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据
    # 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容
    get_topN_search(etlRDD, 5)
    # 6.为了方便查看页面,可以让程序多睡会儿
    time.sleep(500)
    # 5.关闭资源
    sc.stop()
​

没有设置检查点正常的DAG执行流图:

设置检查点后:

3、缓存和checkpoint的区别

面试题:Spark提供了两种持久化方案。一种为缓存操作,一种为checkpoint方案。请问有什么区别呢?

1- 数据存储位置不同
    缓存: 存储在内存或者磁盘 或者 堆外内存中
    checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上
​
2- 数据生命周期:
    缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
    checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除
​
3- 血缘关系:
    缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
    checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
    
4- 主要作用不同:
    缓存: 提高Spark程序的运行效率和容错性
    checkpoint检查点: 提高Spark程序的容错性和高可用,高可靠性

思考:既然持久化的方案有两种,那么在生产环境中应该使用哪种方案呢?

在同一个项目中,推荐缓存和checkpoint(检查点)同时配合使用。
​
使用顺序如下: 在代码中设置缓存和checkpoint检查点,然后再一同使用Action算子触发!!! 使用count算子触发
​
实际过程如下: 程序会优先从缓存中读取数据,如果发现缓存中没有数据。再从checkpoint中读取数据,并且接着将读取到的数据重新在内存中放置一份,后续还是优先从缓存中读取

测试:

# 导包
import os
import time
​
import jieba
from pyspark import SparkConf, SparkContext, StorageLevel
​
# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
​
​
def get_topN_keyword(etlRDD, n):
    r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \
        .filter(lambda word: word not in ('.', '+', '的')) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r1)
​
​
def get_topN_search(etlRDD, n):
    r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(n, lambda t: t[1])
    print(r2)
​
​
# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    # 2.数据输入
    textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')
    # 3.数据处理(切分,转换,分组聚合)
    etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(
        lambda line_list: len(line_list) >= 6)
    # 去除搜索内容两端的 [ ]
    etlRDD = etlRDD.map(lambda line_list:
                        [
                            line_list[0],
                            line_list[1],
                            line_list[2][1:-1],
                            line_list[3],
                            line_list[4],
                            line_list[5]
                        ])
    # 不加缓存
    # etlRDD.count()
    # 7.TODO: persist添加缓存,注意: 可以修改缓存级别
    etlRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2)
    # 8.TODO: 先拿着sc对象设置检查点保存位置, 建议用hdfs,这样能利用hdfs的高可靠高可用性
    sc.setCheckpointDir('hdfs://node1:8020/ckpt')
    etlRDD.checkpoint()
    etlRDD.count()
    # TODO:触发缓存和检查点
    etlRDD.count()
​
    # 4.数据输出
    # 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤
    # 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词
    get_topN_keyword(etlRDD, 10)
​
    # 7.TODO: 如果不想用缓存,可以使用unpersist释放缓存,给哪个rdd加的,就给哪个释放
    # etlRDD.unpersist()
​
    # 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据
    # 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容
    get_topN_search(etlRDD, 5)
    # 6.为了方便查看页面,可以让程序多睡会儿
    time.sleep(500)
    # 5.关闭资源
    sc.stop()
​

DAG有向无环图:

标签:Core,缓存,list,checkpoint,etlRDD,Spark,line,lambda
From: https://blog.csdn.net/weixin_65694308/article/details/139901903

相关文章

  • 摸鱼大数据——Spark Core——Spark内核调度
    1、内容概述Spark内核调度的任务:如何构建DAG执行流程图如何划分Stage阶段Driver底层是如何运转确定需要构建多少分区(线程)Spark内核调度的目的:尽可能用最少的资源高效地完成任务计算2、RDD的依赖RDD依赖:一个RDD的形成可能是由一个或者多个RDD得到的,此时这个RDD和......
  • vue3【提效】使用 VueUse 高效开发(工具库 @vueuse/core + 新增的组件库 @vueuse/compo
    Vueuse是一个功能强大的Vue.js生态系统工具库,提供了可重用的组件和函数,帮助开发者更轻松地构建复杂的应用程序。官网:https://vueuse.org/core/useWindowScroll/安装VueUsenpmi@vueuse/core@vueuse/components(可选)安装自动导入,添加到imports中//需......
  • 记一次aspnetcore发布部署流程初次使用k8s
    主题:aspnetcorewebapi项目,提交到gitlab,通过jenkins(gitlab的ci/cd)编译、发布、推送到k8s。关于gitlab、jenkins、k8s安装,都是使用docker启动服务。首先新建一个项目,为了方便浏览就把swaggerr非开发环境不展示去掉 下面就是需要准备Dockerfile和k8s.yaml文件,这里不应该用......
  • .netcore微服务——项目搭建
    在.NETCore中,微服务是一种架构风格,它将应用程序构造为一组小型服务的集合,这些服务都通过HTTP-basedAPI进行通信。每个服务都是独立部署的,可以用不同的编程语言编写,并且可以使用不同的数据存储技术。微服务的主要优点包括:增强容错能力:一个服务的故障不会影响其他服务。增......
  • .NET Core 和 .NET 标准类库项目类型有什么区别?
    在VisualStudio中,至少可以创建三种不同类型的类库:类库(.NETFramework)类库(.NET标准)类库(.NETCore)虽然第一种是我们多年来一直在使用的,但一直感到困惑的一个主要问题是何时使用.NETStandard和.NETCore类库类型。那么,类库(.NETStandard)和类库(.NETCore)之间有什么......
  • 标准化(Z-score)
    标准化(Z-score)是用于将不同微生物的丰度数据进行标准化处理,以便在热图中更容易比较和解释不同样本之间的差异。具体来说,标准化的过程如下:abundance<-scale(abundance,center=TRUE,#减去均值scale=TRUE#除以标准差)标准化过程详解:减去均值(center=TRU......
  • China.NETConf2019 - 用ASP.NETCore构建可检测的高可用服务
    一、前言2019中国.NET开发者峰会(.NETConfChina2019)于2019年11月10日完美谢幕,校宝在线作为星牌赞助给予了峰会大力支持,我和项斌等一行十位同事以讲师、志愿者的身份公司参与到峰会的支持工作中,我自己很荣幸能够作为讲师与大家交流,分享了主题《用ASP.NETCore构建可检测的高......
  • 基于 .net core 8.0 的 swagger 文档优化分享-根据命名空间分组显示
    前言公司项目是是微服务项目,网关是手撸的一个.netcorewebapi项目,使用refit封装了20+服务SDK,在网关中进行统一调用和聚合等处理,以及给前端提供swagger文档在我两年前进公司的时候,文档还能够顺滑的打开,在去年的时候文档只能在本地打开,或者访问原始的swagger页面,knife......
  • Asp .Net Core 系列:基于 Castle DynamicProxy + Autofac 实践 AOP 以及实现事务、用户
    目录什么是AOP?.NetCore中有哪些AOP框架?基于CastleDynamicProxy实现AOPIOC中使用CastleDynamicProxy实现事务管理实现用户自动填充什么是AOP?AOP(Aspect-OrientedProgramming,面向切面编程)是一种编程范式,旨在通过将横切关注点(cross-cuttingconcerns)从主要业务逻辑......
  • C#面:ASP.NET Core ⽐ ASP.NET 更具优势的地⽅是什么?
    ASP.NETCore相对于ASP.NET具有以下几个优势:跨平台支持:ASP.NETCore是跨平台的,可以在Windows、Linux和macOS等多个操作系统上运行。这使得开发人员可以选择更适合他们的操作系统来进行开发和部署。更轻量级:ASP.NETCore是一个轻量级的框架,它具有更小的内存占用和更快的启动......