首页 > 其他分享 >spark代码优化

spark代码优化

时间:2022-11-23 17:01:41浏览次数:40  
标签:task shuffle RDD 代码优化 key spark 序列化

Spark代码优化

RDD、DataFrame、DataStream、DataSet四者的区别?

image-20221104201950266

image-20221104200754148

1、RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。

2、DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。

3、RDD让我们能够决定怎么做,而DataFrame和DataSet让我们决定做什么,控制的粒度不一样。

4、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持(import spark.implicits._)

5、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

6、DataFrame与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值

7、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

8、在 DataFrameDataset 中, 数据和数据的 Schema 是分开存储的

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
RDD:
RDD一般和spark mlib同时使用,RDD不支持sparksql操作
RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。
RDD的最大好处就是简单,API的人性化程度很高。
RDD的劣势是性能限制,它是一个JVM驻内存对象,这也就决定了存在GC的限制和数据增加时Java序列化成本的升高。

Dataframe
与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。

DataFrame优势:
1、定制化内存管理
    数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制
2、优化的执行计划
查询计划通过Spark catalyst optimiser进行优化.
Dataframe的劣势:
在于在编译期缺少类型安全检查,导致运行时出错.

Dataset
是Dataframe API的一个扩展,是Spark最新的数据抽象
用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。
Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。
DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].

什么是spark mllib序列化与反序列化?

序列化:就是将对象转化成字节序列的过程。
反序列化:就是讲字节序列转化成对象的过程。

为什么要序列化与反序列化?
1、持久化:对象是存储在JVM中的堆区的,但是如果JVM停止运行了,对象也不存在了。序列化可以将对象转化成字节序列,可以写进硬盘文件中实现持久化。在新开启的JVM中可以读取字节序列进行反序列化成对象。
2、网络传输:网络直接传输数据,但是无法直接传输对象,可在传输前序列化,传输完成后反序列化成对象。所以所有可在网络上传输的对象都必须是可序列化的。


spark采用序列化的方式保存RDD数据以便减少内存使用。

Java序列化
在默认情况下,Spark采用Java的ObjectOutputStream序列化一个对象。该方式适用于所有实现了java.io.Serializable的类。通过继承java.io.Externalizable,你能进一步控制序列化的性能。Java序列化非常灵活,但是速度较慢,在某些情况下序列化的结果也比较大。

Kryo序列化
Spark也能使用Kryo(版本2)序列化对象。Kryo不但速度极快,而且产生的结果更为紧凑(通常能提高10倍)。Kryo的缺点是不支持所有类型,为了更好的性能,你需要提前注册程序中所使用的类(class)。

在 Spark 中有很多场景需要存储对象, 或者在网络中传输对象
1、Task 分发的时候, 需要将任务序列化, 分发到不同的 Executor 中执行
2、缓存 RDD 的时候, 需要保存 RDD 中的数据
3、广播变量的时候, 需要将变量序列化, 在集群中广播
4、RDD 的 Shuffle 过程中 Map 和 Reducer 之间需要交换数据
5、算子中如果引入了外部的变量, 这个外部的变量也需要被序列化
6、RDD 因为不保留数据的元信息, 所以必须要序列化整个对象, 常见的方式是 Java 的序列化器, 和 Kyro 序列化器
7、Dataset 和 DataFrame 中保留数据的元信息, 所以可以不再使用 Java 的序列化器和 Kyro 序列化器, 使用 8、Spark 特有的序列化协议, 生成 UnsafeInternalRow 用以保存数据, 这样不仅能减少数据量, 也能减少序列化和反序列化的开销, 其速度大概能达到 RDD 的序列化的 20 倍左右

1、spark调优(代码层面)

避免创建重复的RDD
尽可能复用同一个RDD
对多次使用的RDD进行持久化(缓存)
尽量避免使用shuffle类算子
使用map-side预聚合的shuffle操作
使用高性能的算子
广播大变量
使用Kryo优化序列化性能
优化数据结构
使用高性能的库fastutil

(1)、对多次使用的RDD进行缓存
优先进行MEMORY_ONLY(在数据量不是很大的情况下),如果数据量很大的话,反之使用这种方式会导致JVM的OOM内存溢出异常,如果溢出的情况,我们可以使用MEMORY_AND_DISK_SER策略进行持久化;

通常不建议使用DISK_ONLY和后缀为2的级别:因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用。
(2)、使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey 

使用mapPartitions替代普通map Transformation算子

使用foreachPartition替代foreach Action算子 

使用filter之后进行coalesce操作
(增加分区必须产生shuffle,减少分区可以不产生shuffle)
(减少分区数或者重定义分区数用coalesce,增加分区数用reparation,会产生shuffle)
(shuffle之后的rdd的分区数) 
1、手动指定的优先级最高
2、设置参数spark.default.parallelism
3、如果前面两个都没设置默认设置使用前一个RDD的分区数

使用repartitionAndSortWithinPartitions替代repartition与sort类操作代码

repartition:coalesce(numPartitions,true) 增多分区使用这个 

coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition 
(3)、广播大变量
开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提 升性能 

函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络 传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节 点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能 

如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播 后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本 的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率 

广播大变量发送方式:Executor一开始并没有广播变量,而是task运行需要用到广播变量,会找executor的blockManager要,bloackManager找Driver里面的 blockManagerMaster要。
(4)、使用Kryo优化序列化性能
在Spark中,主要有三个地方涉及到了序列化:

在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输 
将自定义的类型作为RDD的泛型类型时,所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。 
使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个 partition都序列化成一个大的字节数组。

Kryo序列化器介绍: 

Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快 ,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可 以让网络传输的数据变少;在集群中耗费的内存资源大大减少。 

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和 反序列化的性能。Spark默认使用的是Java的序列化机制,也就是 ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同 时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。 官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有 使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦 

(5)、优化数据结构
Java中,有三种类型比较耗费内存: 

对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。 

字符串,每个字符串内部都有一个字符数组以及长度等额外信息。 

集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来 封装集合元素,比如Map.Entry。 

因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用 ,从而降低GC频率,提升性能。
(6)、使用高性能的库fastutil
fastutil介绍: 
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、 HashSet)的类库,提供了特殊类型的map、set、list和queue; 
fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来 替代自己平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减 小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素 的值的时候,提供更快的存取速度; 
fastutil最新版本要求Java 7以及以上版本; 
fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实 现了Java的Map接口),因此可以直接放入已有系统的任何代码中。 
fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的 map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。
使用? 

image-20221103210547170

2、spark调优(参数层面)

参数调优模板

spark-submit 
--class com.shujia.*.* 
--master yarn-cluster 
--num-executors 100   // 资源大小,数据量
--executor-memory 8G // cache数据量
--executor-cores 4  
--driver-memory 2G // 广播变量
--conf spark.default.parallelism=1000 // shuffle之后rdd分区数
--conf spark.storage.memoryFraction=0.4 // rdd持久化内存占比
--conf spark.shuffle.memoryFraction=0.4 //rdd shuffle内存占比
--conf spark.locality.wait=10 //task在Executor中的等待的超时时间
--conf spark.shuffle.file.buffer=64k
--conf spark.yarn.executor.memoryOverhead=2048M
--conf spark.network.timeout=600s
*.jar


任务使用的资源
400C   800G

数据本地性
Application任务执行流程: 
• 在Spark Application提交后,Driver会根据action算子划分成一个个的job,然后对每一 个job划分成一个个的stage,stage内部实际上是由一系列并行计算的task组成的,然后 以TaskSet的形式提交给你TaskScheduler,TaskScheduler在进行分配之前都会计算出 每一个task最优计算位置。Spark的task的分配算法优先将task发布到数据所在的节点上 ,从而达到数据最优计算位置。 

数据本地化级别
PROCESS_LOCAL 
NODE_LOCA 
NO_PREF
RACK_LOCAL 
ANY 

image-20221104133802772

image-20221104133828500

image-20221104133855167

JVM调优
概述:
Spark task执行算子函数,可能会创建很多对象,这些对象,都是要放入JVM年轻代中 
RDD的缓存数据也会放入到堆内存中 

配置
spark.storage.memoryFraction 默认是0.6 

image-20221104133457373

shuffle调优
概述: 
reduceByKey:要把分布在集群各个节点上的数据中的同一个key,对应的values,都给 集中到一个节点的一个executor的一个task中,对集合起来的value执行传入的函数进行 reduce操作,最后变成一个value 

配置 
spark.shuffle.manager, 默认是sort 
spark.shuffle.consolidateFiles,默认是false 
spark.shuffle.file.buffer,默认是32k 
spark.shuffle.memoryFraction,默认是0.2 


SparkConf.set("spark.shuffle.file.buffer","64k")
spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。


spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
错误:reduce oom
reduce task去map拉数据,reduce 一边拉数据一边聚合   reduce段有一块聚合内存(executor memory * 0.2)
解决办法:	1、增加reduce 聚合的内存的比例  设置spark.shuffle.memoryFraction
			2、 增加executor memory的大小  --executor-memory 5G
			3、减少reduce task每次拉取的数据量  设置spark.reducer.maxSizeInFlight  24m

spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
shuffle file not find    taskScheduler不负责重试task,由DAGScheduler负责重试stage
、

spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。


spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。


spark.shuffle.manager
默认值:sort
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。


spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。


spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
调节Executor堆外内存
概述: 
Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外 内存(netty是零拷贝),所以使用了堆外内存。 
什么时候需要调节Executor的堆外内存大小? 
shuffle file cannot find (DAGScheduler,resubmitting task) 
executor lost 
task lost 
out of memory 


问题原因: 
Executor由于内存不足或者对外内存不足了,挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件,Reducer端不能够拉取数 据 
Executor并没有挂掉,而是在拉取数据的过程出现了问题

上述情况下,就可以去考虑调节一下executor的堆外内存。也许就可以避免报错; 

解决办法: 
yarn下:--conf spark.yarn.executor.memoryOverhead=2048 单位M 
standlone下:--conf spark.executor.memoryOverhead=2048单位M 

默认情况下,这个堆外内存上限默认是每一个executor的内存大小的10%;真正处理大数据的时候, 这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G (1024M),甚至说2G、4G 

调节等待时长 
executor在进行shuffle write,优先从自己本地关联的BlockManager中获取某份数据如果本地 block manager没有的话,那么会通过TransferService,去远程连接其他节点上executor的block manager去获取,尝试建立远程的网络连接,并且去拉取数据 

频繁的让JVM堆内存满溢,进行垃圾回收。正好碰到那个exeuctor的JVM在垃圾回收。处于垃圾回 收过程中,所有的工作线程全部停止;相当于只要一旦进行垃圾回收,spark / executor停止工作, 无法提供响应,spark默认的网络连接的超时时长,是60s;如果卡住60s都无法建立连接的话,那 么这个task就失败了。 

解决?--conf spark.core.connection.ack.wait.timeout=300 


3、数据倾斜七种解决方案

使用Hive ETL预处理数据 
过滤少数导致倾斜的key 
提高shuffle操作的并行度 
双重聚合 
将reduce join转为map join 
采样倾斜key并分拆join操作 
使用随机前缀和扩容RDD进行join 
1、使用Hive ETL预处理数据
方案适用场景:如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个 key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表 执行某个分析操作,那么比较适合使用这种技术方案。 

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对 数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么 在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。 

方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子 ,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。 因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作 时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。 

2、过滤少数导致倾斜的key
方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很 适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导 致了数据倾斜。 

方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别 重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤 掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时, 动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后 计算出每个key的数量,取数据量最多的key过滤掉即可。 

方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生 数据倾斜。 

3、提高shuffle操作的并行度
方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于 Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很 多场景来说都有点过小。 

方案实现原理:增加shuffle read task的数量,可以让//原本分配给一个task的多个key分配给多个 task//
,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条 数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时 间都会变短了。 

4、双重聚合
局部聚合(加前缀)-->全局聚合(删前缀)
方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by 语句进行分组聚合时,比较适用这种方案。 

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key 都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着 对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会 变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次 进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。 

方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被 一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。 接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果 

image-20221104133703525

5、将reduce join转为map join
方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中 的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。 

方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作, 进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过 collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD 执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每 一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式 连接起来。 

方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉 取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的, 则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不 会发生shuffle操作,也就不会发生数据倾斜 


1、sparkcore中:RDD进行广播,不能直接进行广播,要将数据拉到divers端转成map集合进行广播,然后用小表进行广播
2、sparkSQL中:(当两个dataframe进行关联时,他会自己进行广播,不区分左右表,手动广播方式有:)
(1)、一种是sparkSQL进行SQL语句的书写使用/*+broadcast(a) */* 进行广播
(2)、在使用DSL时,用一个DateFrame去广播可以设置使用.hint("broadcast"),相同字段名进行关联来广播

6、采样倾斜key并分拆join操作
方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五 ”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一 个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均 匀,那么采用这个解决方案是比较合适的。

方案实现思路: 
对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个 key的数量,计算出来数据量最大的是哪几个key。 
然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以 内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。 
接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数 据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个 RDD。(或者直接进行MapJoin)
再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打 散成n份,分散到多个task中去进行join了。 
而另外两个普通的RDD就照常join即可。 
最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。 

7、使用随机前缀和扩容RDD进行join
方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。 

方案实现思路: 
该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。 
然后将该RDD的每条数据都打上一个n以内的随机前缀。 
同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。 
最后将两个处理后的RDD进行join即可。 

方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的 “不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方 案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处 理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大 量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对 内存资源要求很高。 


标签:task,shuffle,RDD,代码优化,key,spark,序列化
From: https://www.cnblogs.com/tanggf/p/16918939.html

相关文章

  • spark概述与搭建
    Spark概述与搭建1、离线计算,基于内存,所以比MapReduce(基于磁盘)快(Flink真正实时型框架)2、spark处理量级在GB量级3、spark构成:BDAS,将数据变成DataFrame(DF基于pandas框架,表......
  • Spark+ClickHouse实战企业级数据仓库,进军大厂必备(19章全)
    分享一套Spark+ClickHouse大数据课程——《Spark+ClickHouse实战企业级数据仓库,进军大厂必备》,课程一共19章,提供源码+软件下载!《Spark+ClickHouse实战企业级数据仓库,进军大......
  • 对‘将输入框内的值传输给后台’的代码优化/简化
    原代码是将输入框的值在点击确认或者下一步的按钮事件中添加以下代码:来确保将绑定输入框的数组转化成字符串传给后台this.base.Number="";for(leti=0;i......
  • spark中生成时间序列数据的函数stack和sequence
    Sequence函数用Sequence函数生成时间序列函数,真的是非常简便易用,之前因为没找到,所以走了不少弯路。println("指定开始和结束数字,生成对应的数字序列,通过第三个参数来......
  • spark (六) RDD算子(operator)
    目录1转换算子(transformer)(将旧的RDD包装成新RDD)1.1单值类型1.1.1map1.1.2mapPartition1.1.3mapPartitionsWithIndex1.1.4flatMap1.1.5glom1.1.6groupBy1.1.7f......
  • JAVA 代码优化
    1基本类型使用优化1.1尽量重用对象特别是对于String对象的使用,如需拼接字符串,使用如下例子://拼接字符串,不重视效率的写法Stringstr1="aaa";str1=str1+"bbb"......
  • spark (五) RDD的创建 & 分区
    目录1.RDD的创建方式1.1从内存创建RDD1.2从外部存储(文件)创建RDD1.3从其他的RDD创建1.4直接newRDD2.分区(partition)2.1makeRDD的分区2.2读取文件的分区例子2.2.1......
  • IDEA提交任务到spark standalone集群
    参考文章:在idea里面怎么远程提交spark任务到yarn集群代码注意setJars,提交的代码,要提前打好包。否则会报找不到类的错误个人理解就相当于运行的main方法是起了一个spar......
  • Flink/Spark中ETL的简单模版
    我们往往可以忽略外界的干扰因素,避免焦虑,专心做自己想做的事情,反正焦虑又解决不了问题引言使用flink或者spark的时候,写好固定的模版很重要,对于一下etl的实时任务,只需要执行......
  • 63:循环代码优化技巧(极其重要)
    ###循环代码优化虽然计算机越来越快,空间也越来越大,我们仍然要在性能问题上“斤斤计较”。编写循环时,遵守下面三个原则可以大大提高运行效率,避免不必要的低效计算:1.尽量......