- 学习:知识的初次邂逅
- 复习:知识的温故知新
- 练习:知识的实践应用
目录
一,MR的shuffle回顾
1,Map 阶段:
- 在 MapReduce 中,Map 阶段的任务是处理输入数据,并将数据转换为键值对的形式。每个 Map 任务会处理一部分输入数据,并生成一系列中间键值对。
- 例如,对于文本数据的处理,Map 任务可能会将每行文本拆分为单词,并以单词为键,出现次数为值生成键值对。
2,Shuffle 阶段:
- Shuffle 阶段是 MapReduce 中连接 Map 阶段和 Reduce 阶段的重要环节。在这个阶段,中间键值对会根据键进行分区、排序和分组,然后被发送到相应的 Reduce 任务进行处理。
- 分区:Map 任务生成的中间键值对会根据键的哈希值被分配到不同的分区中。每个分区对应一个 Reduce 任务。
- 排序:在每个分区内,中间键值对会按照键进行排序。这样可以确保相同键的键值对被分配到同一个 Reduce 任务中,并且在 Reduce 任务中可以按照键的顺序进行处理。
- 分组:排序后的中间键值对会被分组,相同键的键值对会被放在一起。这样可以方便 Reduce 任务对相同键的键值对进行聚合操作。
- 数据传输:经过分区、排序和分组后的中间键值对会被发送到相应的 Reduce 任务所在的节点上。这个过程通常涉及网络传输,因此可能会成为性能瓶颈。
3,Reduce 阶段:
- Reduce 阶段的任务是处理 Shuffle 阶段发送过来的中间键值对,并生成最终的输出结果。
- Reduce 任务会对相同键的键值对进行聚合操作,例如求和、求平均值等。然后,将结果写入到输出文件中。
二,spark的shuffle介绍
spark中也有shuffle
当执行宽依赖的算子,就会进行shuffle洗牌阶段
也就是把RDD的数据传递给下一个RDD,进行数据交换
无论是MR还是spark,shuffle的本质都是传递交换数据
1,两种洗牌的方式
- shark中的shuffle计算分为两个部分
- shuffle 写的部分 wirte
- shuffle 读的部分 read
- 会进行文件的读写,影响spark的计算速度
- spark中的shuffle方法类
- 他是spark封装好的处理shuffle的方法大全;
- hashshuffle类
- 进行哈希 hash的计算
- spark1.2版本之前主要使用,之后引入了排序洗牌(sortshuffle)
- spark2.0之后删除了哈希计算内,开始使用sortshuffle内
- 优化了hashshufulle类
- sortshuffle类
- 排序的方式是把相同的key的数据放在一起
- sortshuffle使用的时候,有两个方法实现洗牌
- bypass模式版本和普通模式版本
- bypass模式版本不会排序,是会进行hash操作
- 普通模式版本会进行排序,并shuffle
- 他们两种模式的区别就是task的数量,task数量大于200这进入hash洗牌模式,小于200,这用普通模式进行洗牌计算
- 一个分区对应一个task,所以task的数量由于分区数量决定的
普通模式和bypass模式的主要区别在于如何将
相同key值的数据放在一起
?
排序 普通模式采用的策略
哈希取余 bypass模式采用的策略
2,spark的计算是要尽量避免进入shuffle计算
因为shuffle计算必然会调度更多的任务,造成更多的内存消耗,以及他频繁的磁盘I/O和网络I/O等等都会造成性能瓶颈,严重影响任务的执行速度,所以,我们在执行计算的时候,尽量避免shuffle,需要进行宽依赖计算的时候,尽量使用窄依赖算子进行计算
比如:求不同性别的年龄和,
常规来说,可以使用分组+聚合计算;
由于groupby是宽依赖算子,而宽依赖算子在执行的时候就会触发洗牌阶段,
所以我们避免使用宽依赖算子,巧妙的利用自定义函数,实现了避开shuffle也完成了分组聚合的功能
# 优化计算,减少shuffle
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([('男',20),('男',22),('女',19),('女',18)])
# 求不同性别的年龄和
# reduceByKey 是宽依赖算子
rdd2 = rdd.reduceByKey(lambda x,y:x+y)
# 避免shuffle,需要将宽依赖算子计算的过程换成窄依赖
boy = sc.accumulator(0)
girl = sc.accumulator(0)
def func(x):
if x[0] == '男':
boy.add(x[1])
else:
girl.add(x[1])
return None
rdd3 = rdd.map(func)
# res = rdd2.collect()
# print(res)
# 触发rdd3计算
rdd3.collect()
print(boy.value)
print(girl.value)
这个是在做表关联的时候,常规肯定是join内连接两个表
但是由于内连接是宽依赖算子,会触发洗牌阶段,所以我们巧妙的用字典的方法
用小表取过滤大表,成功的避免了洗牌阶段,也成功的拿到了最终的结果
from pyspark import SparkContext
sc = SparkContext()
rdd_kv1 =sc.parallelize([('a',1),('b',2),('c',2),('d',2),('f',2),('w',2)])
rdd_kv2 =sc.parallelize([('a',1),('c',2),('q',2),('o',2)])
# join关联
rdd_join = rdd_kv1.join(rdd_kv2)
# 将rdd_kv数据量较少转为字典数据,然后用多的rdd数据匹配字典
rdd_dict = rdd_kv2.collectAsMap()
print(rdd_dict)
# 匹配字典
def func(x):
return (x[0],rdd_dict.get(x[0]))
rdd6 = rdd_kv1.map(func).filter(lambda x:x[1] is not None)
# res = rdd_join.collect()
# print(res)
res = rdd6.collect()
print(res)
三,并行度
1,资源并行度
- task在指定任务能够使用到的cpu核心数量
- 所谓资源并行度,其实就是所是多任务,多进程或者多个线程的执行任务
- 并行,多个任务同时执行,比如cpu是4核心,有两个线程任务,两个线程任务可以同时执行,也就并行
- 并发,有八个线程任务,交替执行,这个就是并发;
- spark中cpu的核心数量设置
- num-excutors = 3 设置executors数量 和服务器数量尽量保持一致
- executor-cores =2 设置每个excutores中的cpu核心数量,每个服务器的cpu核心数量一致
最大支持的task并行数量是 num-executors* executor-cores =6 需要按照服务器实际的cpu核心数指定
spark-submit --master yarn --num-executors=3 --executor-cores=2
2,数据并行度
- 就是task的数量,task由分区决定
- 为了保证task能充分利用cpu资源,实现并行计算,需要设置的分区数应该和资源并行度一致
- 在实际公司中根据公司资源并行度设置分区数
- 有的场景下公司会要数据并行度大于资源并行度,因为:
- 不要让cpu闲下来,最大程度利用集群的资源,
- cpu的一个核心同一时间支了个干一件事情,所以在100个核心的情况下,设置100个并行,就能让cpu100%出力,但是这种设置下的前提是task压力均衡;
- 如果task压力不均衡,有的执行的块,有的执行的慢,执行的块的在闲置等待
- 比如我们它task的分配的数量变多,那么有500个task并行,同一时间之后100个在运行,400个在等待中,这样就可以保证某个task执行完毕,后续有新的task补上;
- 从而实现性能的最佳利用
标签:task,shuffle,rdd,并行度,RDD,键值,原理,spark From: https://blog.csdn.net/qq_55006020/article/details/142832297
- 学习:知识的初次邂逅
- 复习:知识的温故知新
- 练习:知识的实践应用