1、查看执行计划
1、直接sql查看: explain select ... from ...
2、ds.explain()
2、执行计划的处理流程
sql代码 -> 未决断的逻辑执行计划 -> 根据元数据生成已决断的逻辑执行计划 -> 生成物理执行计划 -> 模型评估 -> 选择物理执行计划 -> 生成执行代码
3、cbo优化
注意:要想使用cbo优化必须先收集表、列的信息,否则cbo优化不起作用
收集信息:
收集表的信息: ANALYZE TABLE 表名 COMPUTE STATISTICS
收集列的信息: ANALYZE TABLE 表名 COMPUTE STATISTICS FOR COLUMNS 列1,列2,列3
使用cbo:
cbo优化可以优化多表join的顺序、调整表的join的策略
spark.sql.cbo.enabled: 是否开启cbo优化
spark.sql.cbo.joinReorder.enabled: 是否调整多表Join的顺序
spark.sql.cbo.joinReorder.dp.threshold: 设置多表jion的表数量的阈值,一旦join的表数量超过该阈值则不优化多表join的顺序
4、广播join
场景: 大表join小表,可以将小表数据广播出去避免shuffle操作
使用广播join
1、设置小表的大小阈值:
在sparksession创建的时候使用config方法配置spark.sql.autoBroadcastJoinThreshold,后续join的时候一旦表的大小小于该阈值,则认为是小表,会广播小表数据
2、使用hint语法强制广播
select /*+ BROADCASTJOIN(别名1) */ .... from 表1 别名1 join 表2 别名2 on ...
select /*+ BROADCAST(别名1) */ .... from 表1 别名1 join 表2 别名2 on ...
select /*+ MAPJOIN(别名1) */ .... from 表1 别名1 join 表2 别名2 on ...
5、smb join
场景: 大表 join 大表可以使用smb join,可以减少join的时候扫描的表的数据量,提高效率
使用前提:
1、join的两个大表必须是分桶表,两个分桶表的桶的个数必须一样或者是倍数关系
2、join的时候,join的列 = 分桶的的列
6、AQE自适应优化【spark3.x版本才有】
1、动态合并分区: spark会根据分区的数据量将小数据量的多个分区合并成一个分区,可以提高资源的利用率
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.coalescePartitions.enabled: 是否开启动态合并分区
spark.sql.adaptive.coalescePartitions.initialPartitionNum: 初始分区数
spark.sql.adaptive.coalescePartitions.minPartitionNum: 合并之后的最小分区数
当RDD的分区数处于spark.sql.adaptive.coalescePartitions.initialPartitionNum与spark.sql.adaptive.coalescePartitions.minPartitionNum范围内才会合并
spark.sql.adaptive.advisoryPartitionSizeInBytes: 合并分区之后,分区的数据量的预期大小
2、动态切换join策略: 在join的时候,会动态选择性能最高的join策略,提高效率
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.localShuffleReader.enabled:在不需要进行shuffle重分区时,尝试使用本地shuffle读取器。将sort-meger join 转换为广播join
3、动态申请资源: 当计算过程中资源不足会自动申请资源
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.dynamicAllocation.enabled: 是否开启动态资源申请
spark.dynamicAllocation.shuffleTracking.enabled: 是否开启shuffle状态跟踪
4、动态join数据倾斜: join的时候如果出现了数据倾斜,会动态调整分区的数据量,优化数据倾斜导致的性能问题。
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.skewJoin.enable: 是否开启join数据倾斜的优化
spark.sql.adaptive.skewJoin.skewedPartitionFactor: N
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: M
spark.sql.adaptive.advisoryPartitionSizeInBytes: G [代表优化之后,分区数数据的预期大小]
sparksql判断出现数据倾斜的依据[需要两个条件同时满足]:
当某个分区处理的数据量>= N * 所有task处理数据量的中位数
当某个分区处理的数据量>= M
7、手动处理数据倾斜
1、如何判断有数据倾斜
可以从4040webui界面查看所有task的执行时间与处理的数据量
如果某个task处理的数据量与执行时间相比其他task多很多,此时可以判定该task存在数据倾斜
2、数据倾斜的出现的阶段
数据倾斜一般在shuffle阶段才会出现
因为在shuffle的时候需要根据key.hashCode%分区数得到分区号,如果某个key的数据量很多,此时全部聚在一个分区导致数据倾斜
3、数据倾斜出现的场景
1、key为null的数据很多
解决方案
1、如果null值数据对业务不影响,可以直接过滤掉
2、如果null值数据对业务影响,此时先可以给key弄一个随机数,将数据打散
2、groupBy的时候,某个key的数据很多导致数据倾斜
解决方案:
1、局部聚合
1、给分组的字段加上随机数
2、按照加上随机数的key分组聚合
2、全局聚合
1、去掉key中的随机数
2、按照key再次分区聚合
3、join的时候,某个key的数据很多导致数据倾斜
1、大表join小表出现数据倾斜
解决方案: 在sparksession中配置spark.sql.autoBroadcastJoinThreshold,将小表广播出去,避免shuffle
2、大表 join 大表,个别key数据量多导致的数据倾斜
解决方案:
1、将倾斜表中倾斜key与非倾斜key数据单独过滤出来
2、将非倾斜表中倾斜key与非倾斜key数据单独过滤出来
3、将倾斜表中非倾斜key的数据与非倾斜表中非倾斜key的数据正常join
4、将倾斜表中倾斜key的数据的join的列加上N以内的随机数
5、将非倾斜表中倾斜key的数据扩容N倍
6、将第四步与第五步的数据join
7、将第六步的结果与第三步的结果union all
3、大表 join 大表,大量key数据量都比较多导致的数据倾斜
解决方案:
1、将倾斜表中join的列加上N[N最多只能是10以内]以内的随机数
2、将非倾斜表中的数据扩容N[N最多只能是10以内]倍
3、将第一步与第二步的数据join