1.缓存的基本介绍
缓存介绍:
1. 当一个RDD的产生过程(计算过程), 是比较昂贵的(生成RDD整个计算流程比较复杂), 并且这个RDD可能会被多方(RDD会被重复使用)进行使用,
2. 此时为了提升计算效率, 可以将RDD的结果设置为缓存, 这样后续在使用这个RDD的时候, 无需在重新计算了, 直接获取缓存中数据即可.
3. 缓存可以提升Spark的容错的能力, 正常情况, 当Spark中某一个RDD计算失败的时候, 需要对整个RDD链条进行整体的回溯计算.
4. 有了缓存后, 可以将某些阶段的RDD进行缓存操作, 这样当后续的RDD计算失败的时候, 可以从最近的一个缓存中恢复数据 重新计算即可, 无需在回溯所有链条.
应用场景:
1. 当一个RDD被重复使用的时候, 可以使用缓存来解决
2. 当一个RDD产生非常昂贵的时候, 可以将RDD设置为缓存
3. 当需要提升容错能力的时候, 可以在局部设置一些缓存来提升容错能力
注意事项:
1. 缓存仅仅是一种临时存储, 可以将RDD的结果数据存储到内存(executor) 或者 磁盘, 甚至可以存储到堆外内存(executor以外系统内存)中.
2. 由于缓存的存储是一种临时存储, 所以缓存的数据有可能丢失的, 所以缓存操作并不会将RDD之间的依赖关系给截断掉(清除掉),
以防止当缓存数据丢失的时候, 可以让程序进行重新计算操作
3. 缓存的API都是lazy的, 设置缓存后, 并不会立即触发, 如果需要立即触发, 后续必须跟一个action算子, 建议使用 count
如何使用缓存呢?
设置缓存的相关API:
rdd.cache() //执行设置缓存的操作, cache在设置缓存的时候, 仅能将缓存数据放置到内存中
rdd.persist(设置缓存级别) //执行设置缓存的操作, 默认情况下, 将缓存数据放置到内存中, 同时支持设置其他缓存方案
清理缓存的相关API:
1. 默认情况下, 当程序执行完成后, 缓存会被自动清理
2. 如需手动清理缓存, 则写法为: rdd.unpersist() //清理缓存
常用的缓存级别有那些呢?
NONE //表示不缓存
MEMORY_ONLY //仅缓存到内存中,直接将整个对象保存到内存中
MEMORY_ONLY_SER //仅缓存到内存中, 同时在缓存数据的时候, 会对数据进行序列化(从对象 --> 二进制数据)操作, 可以在一定程序上减少内存的使用量
MEMORY_AND_DISK:
MEMORY_AND_DISK_2 //优先将数据保存到内存中, 当内存不足的时候, 可以将数据保存到磁盘中, 带2的表示保存二份
MEMORY_AND_DISK_SER:
MEMORY_AND_DISK_SER_2 //优先将数据保存到内存中, 当内存不足的时候, 可以将数据保存到磁盘中, 带2的表示保存二份,
//对于保存到内存的数据, 会进行序列化的操作, 从而减少内存占用量 提升内存保存数据体量,对磁盘必须要进行序列化
//上述的缓存级别, 带2表示的保存多个副本, 从而提升数据可靠性
序列化解释:
将数据 从 对象 转换为 二进制的数据, 对于RDD的数据来说, 内部数据都是一个个对象,
如果没有序列化是直接将对象存储到内存中, 如果有序列化会将对象转换为二进制然后存储到内存中.
好处:
减少内存的占用量, 从而让有限内存可以存储更多的数据
弊端:
会增大对CPU的占用量, 因为转换的操作, 需要使用CPU来工作
#cache sample.
from pyspark import SparkContext, SparkConf,StorageLevel
import os
import jieba
import time
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('cache sample')
sc = SparkContext(conf=conf)
rdd_init = sc.textFile('file:///export/data/workspace/cache.sample')
rdd_filter = rdd_init.filter(lambda line: line.strip() != '' and len(line.split()) == 6)
rdd_map = rdd_filter.map(lambda line: (
line.split()[0],
line.split()[1],
line.split()[2][1:-1],
line.split()[3],
line.split()[4],
line.split()[5]
))
# -----------------设置缓存的代码--------------------
# StorageLevel 这个类需要在前面的from pyspark中加入此对象的导入
# 一般建议, 设置完缓存后, 让其立即触发
rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()
# 快速抽取函数: ctrl + alt + m
xuqiu_1()
# ----------手动清理缓存------------
rdd_map.unpersist().count() //如果这里清理缓存了, 则后续的 xuqiu_2()这个函数, 就用不了缓存了, 可以在DAG图中查看"小绿球"
xuqiu_2()
time.sleep(1000)
node1:4040页面的 导航条部分, 选择 Storage 即可查看缓存.
3. 检查点的基本介绍
概述:
1. checkPoint跟缓存类似, 也可以将某一个RDD结果进行存储操作, 一般都是将数据保存到HDFS中, 提供一种更加可靠的存储方案.
2. 所以说采用checkpoint方案, 会将RDD之间的依赖关系给截断掉(因为 数据存储非常的可靠)
3. checkpoint出现, 从某种角度上也可以提升执行效率(没有缓存高), 更多是为了容错能力
4. 对于checkpoint来说, 大家可以将其理解为对整个RDD链条进行设置阶段快照的操作
5. 由于checkpoint这种可靠性, 所以Spark本身只管设置, 不管删除, 所以checkpoint即使程序停止了, checkpoint数据依然存储着, 不会被删除, 需要手动删除
如何设置checkpoint呢?
1. 通过sc对象, 设置checkpoint保存数据的位置: sc.setCheckpointDir('hdfs路径')
2. 通过rdd.checkpoint() 设置开启检查点 (lazy)
3. 通过rdd.count() 触发检查点的执行
# checkpoint sample code
from pyspark import SparkContext, SparkConf
import os
import time
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('sougou')
sc = SparkContext(conf=conf)
//设置检查点位置
sc.setCheckpointDir('/spark/checkpoint/') //默认路径是HDFS的, 所以可以省略 hdfs://node1:8020 如果存Linux本地, 需要写: file:///
# 读取数据
rdd_init = sc.textFile('file:///export/data/workspace/checkpoint.sample')
# 演示代码, 无任何价值
rdd_map1 = rdd_init.map(lambda line:line)
rdd_map2 = rdd_map1.map(lambda line: line)
rdd_3 = rdd_map2.repartition(3)
rdd_map3 = rdd_3.map(lambda line: line)
rdd_map4 = rdd_map3.map(lambda line: line)
rdd_4 = rdd_map4.repartition(2)
rdd_map5 = rdd_4.map(lambda line: line)
# 开启检查点
rdd_map5.checkpoint() //这后边没法直接 接着 .count(), 因为 checkpoint()方法不返回任何的内容.
rdd_map5.count()
print(rdd_map5.count())
time.sleep(1000)
//步骤: 先注释检查点的代码, 然后去node1:4040中把该程序的(runjob)部分的DAG图截下来, 然后在开启检查点的代码, 同样截取DAG图, 然后对比下即可.
5. 缓存和检查点的区别
面试题:
在Spark中 RDD的缓存和检查点有什么区别呢?
答案:
1. 存储位置
缓存: 会将RDD的结果数据缓存到内存或者磁盘, 或者堆外内存
检查点: 会将RDD的结果数据存储到HDFS(默认),当然也支持本地存储(仅在local模式,但如果是local模式, 检查点无所谓)
//因为本地模式主要是测试来用的, 数据量也不会特别的大, 所以, 设置检查点意义不是特别大.
2. 依赖关系
缓存: 由于缓存存储是一种临时存储, 所以缓存不会截断掉依赖关系, 以防止缓存丢失后, 进行回溯计算
检查点: 会截断掉依赖关系, 因为检查点方案认为存储数据是可靠的, 不会丢失
3. 生命周期
缓存: 当整个程序执行完成后(一个程序中是包含多个JOB任务的), 会自动清理掉缓存数据,或者也可以在程序运行中手动清理
检查点: 会将数据保存到HDFS中, 不会自动删除, 即使程序停止了, 检查点数据依然存在, 只能手动删除数据(会永久保存)
6. 缓存和检查点共用操作
问:
在实际使用中, 在Spark程序中, 是使用缓存呢 还是检查点呢?
答:
会将两种方案都作用于程序中, 一般是先设置检查点, 然后设置缓存
# cache and checkpoint sample code
from pyspark import SparkContext, SparkConf,StorageLevel
import os
import jieba
import time
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
def xuqiu_1():
# 获取搜索词
rdd_search = rdd_map.map(lambda line_tup: line_tup[2])
# 对搜索词进行分词操作
rdd_keywords = rdd_search.flatMap(lambda search: jieba.cut(search))
# 将每个关键词转换为 (关键词,1) 进行分组统计
rdd_res = rdd_keywords.map(lambda keyword: (keyword, 1)).reduceByKey(lambda agg, curr: agg + curr)
# 对结果数据进行排序(倒序)
rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
# 获取结果(前50)
print(rdd_sort.take(50))
def xuqiu_2():
# SQL: select user,搜索词 ,count(1) from 表 group by user,搜索词;
# 提取 用户和搜索词数据
rdd_user_search = rdd_map.map(lambda line_tup: (line_tup[1], line_tup[2]))
# 基于用户和搜索词进行分组统计即可
rdd_res = rdd_user_search.map(lambda user_search: (user_search, 1)).reduceByKey(lambda agg, curr: agg + curr)
rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
print(rdd_sort.take(30))
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('sougou')
sc = SparkContext(conf=conf)
//------------设置检查点保存位置------------
sc.setCheckpointDir('/spark/checkpoint')
rdd_init = sc.textFile('file:///export/data/workspace/cache_checkpoint.sample')
# 过滤数据: 保证数据不能为空 并且数据字段数量必须为 6个
rdd_filter = rdd_init.filter(lambda line: line.strip() != '' and len(line.split()) == 6)
# 对数据进行切割, 将数据放置到一个元组中: 一行放置一个元组
rdd_map = rdd_filter.map(lambda line: (
line.split()[0],
line.split()[1],
line.split()[2][1:-1],
line.split()[3],
line.split()[4],
line.split()[5]
))
# ---- 开启检查点 和 缓存 -----
# 设置开启检查点
rdd_map.checkpoint()
rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count() //先开启检查点, 后开启缓存(推荐)
#或者
rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
rdd_map.checkpoint()
rdd_map.count() //先开启缓存, 后开启检查点, 可能会导致检查点失效, 这种方式理解即可, 一般不推荐.
# 快速抽取函数: ctrl + alt + m
xuqiu_1()
xuqiu_2()
time.sleep(1000)
7. RDD共享变量的引入操作
8. RDD的共享变量_广播变量(Broadcast Variables)
作用:
减少Driver和executor之间网络数据传输数据量, 以及减少内存的使用 从而提升效率
适用于:
多个Task线程需要使用到同一个变量的值的时候
默认做法: //即: 没有广播变量的情况.
各个线程会将这个变量形成一个副本, 然后拷贝到自己的线程中, 进行使用即可, 由于一个executor中有多个线程, 那么意味需要拷贝多次,
导致executor和 Driver之间的传输量增加, 对带宽有一定影响, 同时拷贝了多次, 对内存占用量提升
解决方案: //引入一个广播变量
让executor从Driver中拉取过来一个副本即可, 一个executor只需要拉取一次副本, 让executor中各个线程读取executor中变量即可,
这样减少网络传输量, 同时减少内存使用量
注意:
广播变量是只读的, 各个线程只能读取数据, 不能修改数据.
如何使用广播变量:
通过sc创建一个广播变量: //在Driver设置
广播变量对象 = sc.broadcast(值)
获取变量: 在Task获取
广播变量对象.value
from pyspark import SparkContext, SparkConf
import os
import time
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示广播变量的使用操作")
conf = SparkConf().setMaster('local[*]').setAppName('sougou')
sc = SparkContext(conf=conf)
# 设置广播变量
bc = sc.broadcast(1000)
# 2 读取数据
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 3- 将每个数据都加上指定值 ,此值由广播变量给出:
# 获取广播: bc.value
rdd_res = rdd_init.map(lambda num: num + bc.value)
# 4- 打印结果
rdd_res.foreach(lambda num: print(num))
time.sleep(10000)
9. RDD的共享变量_累加器(Accumulators)的使用操作
解释:
累加器主要提供在多个线程中对同一个变量进行累加的操作, 对于多个线程来说只能对数据进行累加, 不能读取数据, 读取数据的操作只能有Driver来处理
应用场景:
全局累加操作
如何使用呢?
1. 由于Driver设置一个累加器的初始值
累加器对象 = sc.accumulator(初始值)
2. 由rdd(线程)来进行累加操作
累加器对象.add(累加内容)
3. 在Driver中获取值:
累加器.value
累加器错误示范
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示累加器")
# 1- 创建SparkContext对象
conf = SparkConf().setMaster('local[*]').setAppName('sougou')
sc = SparkContext(conf=conf)
# 定义一个变量
a = 10
# 2 读取数据
rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# 3- 处理数据: 为a将列表中变量的值累加上去
def fn1(num):
global a
a += num
//a.value //这里如果直接写 会报错, 因为: 对于多个线程来说只能对数据进行累加, 不能读取数据, 读取数据的操作只能有Driver来处理.
return num
rdd_map = rdd_init.map(fn1)
print(rdd_map.collect())
print(a) //a的值, 还是 10
累加器正确使用
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示累加器")
# 1- 创建SparkContext对象
conf = SparkConf().setMaster('local[*]').setAppName('sougou')
sc = SparkContext(conf=conf)
# 定义一个变量, 引入累加器
a = sc.accumulator(10)
# 2 读取数据
rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# 3- 处理数据: 为a将列表中变量的值累加上去
def fn1(num):
# 对累加器进行进行增加
a.add(num)
return num
rdd_map = rdd_init.map(fn1)
print(rdd_map.collect())
print(a.value) //获取累加器的结果, 65
10. RDD的累加器的小问题说明(重复累加)
问题:
当我们对设置过累加器的RDD, 后续在进行一些其他的操作, 调度多次action算子后, 发现累加器被累加了多次, 本应该只累加一次, 这种情况是如何产生的呢?
原因:
当调度用多次action的时候, 会产生多个JOB(计算任务), 由于RDD值存储计算的规则, 不存储数据, 当第一个action计算完成后, 得到一个结果,
整个任务完成了, 接下来再运行下一个job的任务, 这个任务依然需要重头开始进行计算得到最终结果.
这样就会 累加的操作就会被触发多次,从而被累加了多次
解决方案:
对累加器执行完的RDD 设置为缓存或者检查点, 或者两个都设置, 即可解决.
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示累加器")
# 1- 创建SparkContext对象
conf = SparkConf().setMaster('local[*]').setAppName('sougou')
sc = SparkContext(conf=conf)
# 定义一个变量, 引入累加器
a = sc.accumulator(10)
# 2 读取数据
rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# 3- 处理数据: 为a将列表中变量的值累加上去
def fn1(num):
# 对累加器进行进行增加
a.add(num)
return num
rdd_map = rdd_init.map(fn1)
//rdd_map.cache().count() //设置缓存, 如果不设置, 最终 a.value的值是 120, 如果设置了, 是65(说明读缓存了)
print(rdd_map.collect())
rdd_2 = rdd_map.map(lambda num: num + 1)
print(rdd_2.top(10)) //我们发现, 下述再打印累加器的结果时, 它的值就变成了: 120, 说明累加器又执行了一次.
print(a.value) //获取累加器的结果
11. RDD内核调度_RDD的依赖关系
RDD之间是存在依赖关系, 这也是RDD中非常重要特性, 一般将RDD之间的依赖关系划分为两种依赖关系: 窄依赖 和 宽依赖
窄依赖:
目的:
让各个分区的数据可以并行的计算操作.
指的是:
上一个RDD的某一个分区的数据 被下一个RDD的某一个分区全部都继承处理下来, 我们将这种关系称为窄依赖关系.
//简单理解: 一对一的关系.
宽依赖:
目的:
划分stage(阶段)
指的是:
上一个RDD的分区数据被下一个RDD的多个分区所接收并处理(shuffle), 我们将这种关系称为宽依赖.
结论:
1. 判断两个RDD之间是否存在宽依赖, 主要看两个RDD之间是否存在shuffle, 一旦产生了shuffle, 必须是前面的先计算完成后, 然后才能进行后续的计算操作
2. 在Spark中, 每一个算子是否存在shuffle操作, 在Spark设计的时候就已经确定了, 比如说 map一定不会有shuffle, 比如说reduceByKey一定是存在shuffle.
3. 如何判断这个算子是否会走shuffle呢?
//可以从查看DAG执行流程图, 如果发现一执行到这个算子, 阶段被分为多个, 那么一定是存在shuffle,
//以及可以通过查看每个算子的文档的说明信息, 里面也会有一定的说明
4. 但是: 在实际操作中, 我们一般不会纠结这个事情, 我们要以实现需求为导向, 需要用什么算子的时候, 我们就采用什么算子来计算即可,
虽然说过多的shuffle操作, 会影响我们的执行的效率, 但是依然该用的还是要用的
5. 判断宽窄依赖的关系最重要就是: 看两个RDD之间是否存在shuffle.
12. DAG以及DAG流程图形成说明
DAG解释: //有向无环图.
整个的流程, 有方向, 不能往回走, 不断的往下继续的过程
问:
如何形成一个DAG执行流程图呢?
答:
1. 第一步: 当Driver遇到一个action算子后, 就会将这个算子所对应所有依赖的RDD全部都加载进来形成一个stage阶段
2. 第二步: 对整个阶段进行回溯操作, 从后往前, 判断每一个RDD之间依赖关系, 如果是宽依赖形成一个新的阶段, 如果窄依赖, 放置到一起
3. 当整个回溯全部完成后, 形成了DAG的执行流程图
//详见图解.
13. DAG的阶段划分以及线程的划分操作
//详见图解.
分区数量相关的文档链接: https://spark.apache.org/docs/3.1.2/configuration.html 搜 spark.default.parallelism 这个配置信息.
标签:map,缓存,rdd,累加器,RDD,检查点,line From: https://blog.csdn.net/qq_43428465/article/details/137058040