标签:shuffle enabled sparksql sql false spark 优化 speculation
最近把spark文档里面配置那一页看了一下,在这记录一些可用的配置,免得后续再去查
文档地址:https://spark.apache.org/docs/3.0.1/configuration.html
Spark文档
运行环境
名称 | 默认值 | 配置解释 |
spark.executor.extraJavaOptions |
|
不能设置堆大小,可以设置jc日志:-verbose:gc -Xloggc:/tmp/-.gc |
spark.executor.memoryOverhead |
executorMemory * 0.10,最小为 384 |
堆外内存大小 |
spark.files |
|
需要放置的文件逗号分割列表 |
spark.jars |
|
需要放置的jar逗号分割列表 |
spark.jars.packages |
|
Maven 坐标的逗号分隔列表 |
spark.jars.excludes |
|
Maven 坐标的逗号分隔列表需要排除的列表 |
shuffle
名称 | 默认值 | 配置解释 |
spark.reducer.maxSizeInFlight |
48M |
每个reduce获取48M文件 |
spark.shuffle.compress |
true |
shuffle输出是否压缩 |
spark.shuffle.spill.compress |
true |
压缩 |
spark.shuffle.file.buffer |
32K |
shuffle内存缓冲区大小 |
spark.shuffle.io.maxRetries |
3 |
shuffle失败重试次数 |
spark.shuffle.io.retryWait |
5s |
shuffle失败等待时长 |
spark.shuffle.sort.bypassMergeThreshold |
200 |
如果没有map端聚合且分区小于这个数字,不进行排序 |
压缩
名称 | 默认值 | 配置解释 |
spark.broadcast.compress |
true |
广播是否压缩 |
spark.broadcast.compress |
false |
checkpoint是否压缩 |
spark.checkpoint.compress |
false |
压缩格式(spark.io.compression.codec) |
spark.io.compression.codec |
lz4 |
压缩格式 |
spark.rdd.compress |
false |
序列化RDD是否压缩 一般不需要压缩 后续用还需要解压缩麻烦 |
spark.serializer |
org.apache.spark.serializer.KryoSerializer |
序列化类 |
广播、网络
名称 | 默认值 | 配置解释 |
spark.broadcast.blockSize |
4 |
broadcast发送的时候,数据块大小 |
spark.default.parallelism |
200 |
RDD默认并行度 |
spark.sql.shuffle.partitions |
200 |
SparkSql默认并行度 |
spark.files.maxPartitionBytes |
读取文件最大量 |
128M |
spark.speculation |
true |
spark.speculation.interval=30000,spark.speculation.quantile=0.8,spark.speculation.multiplier=1.5 |
spark.locality.wait.process |
读取数据进程等待时间 |
spark.locality.wait |
spark.locality.wait.node |
读取节点等待时间 |
spark.locality.wait |
spark.locality.wait.rack |
读取相同机架等待时间 |
spark.locality.wait |
动态资源、动态执行计划
名称 | 默认值 | 配置解释 |
spark.dynamicAllocation.enabled |
false |
动态资源是否开启 |
spark.dynamicAllocation.minExecutors |
动态资源最小Executor |
0 |
spark.dynamicAllocation.maxExecutors |
动态资源最大Executor |
无穷 |
spark.dynamicAllocation.executorIdleTimeout |
60s |
Task执行完成等待这个时间就关掉 |
spark.sql.adaptive.enabled |
false |
查询执行过程中重新优化查询计划 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
128M |
自适应优化期间随机分区的建议大小 |
spark.sql.autoBroadcastJoinThreshold |
10M |
广播阈值大小 |
spark.sql.files.maxPartitionBytes |
128M |
单个分区读取文件最大字节数。仅在Parquet、JSON和ORC时有效。 |
spark.sql.parquet.compression.codec |
snappy |
parquet压缩方式 |
spark.sql.parquet.filterPushdown |
true |
parquet过滤器本地文件下推 |
spark.sql.parquet.recordLevelFilter.enabled |
false |
使用下推过滤器启用 Parquet 的本机记录级过滤&spark.sql.parquet.filterPushdown |
spark.sql.cbo.enabled |
false |
false |
spark.sql.cbo.joinReorder.dp.star.filter |
false |
星型连接过滤器启发式应用于基于成本的连接枚举 |
spark.sql.cbo.starSchemaDetection |
false |
启用基于星型模式检测的连接重新排序 |
spark.sql.cbo.joinReorder.enabled |
false |
CBO 中启用加入重新排序 |
spark.sql.cbo.planStats.enabled |
false |
逻辑计划将从目录中获取行数和列统计信息 |
其他配置
名称 | 默认值 | 配置解释 |
spark.sql.hive.convertInsertingPartitionedTable |
true |
内置的 ORC/Parquet 写入器插入数据到Hive分区 ORC/Parquet 表中 |
spark.sql.session.timeZone |
|
时间分区字段 |
spark.sql.sources.partitionOverwriteMode |
nonstrict |
动态分区,静态分区 |
可优化的配置
名称 | 默认值 | 配置解释 |
yarn.scheduler.maximum-allocation-mb |
64G |
yarn Container能够使用的最大资源 |
yarn.scheduler.minimum-allocation-mb |
512M |
yarn Container能够使用的最小资源 |
spark.shuffle.sort.bypassMergeThreshold |
200 |
如果没有map端聚合且分区小于这个数字,不进行排序 |
spark.default.parallelism |
200 |
RDD默认并行度 |
spark.sql.shuffle.partitions |
200 |
SparkSql默认并行度 |
spark.serializer |
org.apache.spark.serializer.KryoSerializer |
序列化类 |
spark.sql.files.maxPartitionBytes |
128M |
单个分区读取文件最大字节数。仅在Parquet、JSON和ORC时有效 |
spark.files.maxPartitionBytes |
128M |
读取文件最大量 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
128M |
自适应优化期间随机分区的建议大小 |
spark.speculation |
true |
spark.speculation.interval=30000,spark.speculation.quantile=0.8,spark.speculation.multiplier=1.5 |
spark.dynamicAllocation.enabled |
false |
动态资源是否开启 |
spark.dynamicAllocation.minExecutors |
动态资源最小Executor |
0 |
spark.dynamicAllocation.maxExecutors |
动态资源最大Executor |
无穷 |
spark.dynamicAllocation.executorIdleTimeout |
30s |
Task执行完成等待这个时间就关掉 |
spark.sql.adaptive.enabled |
false |
查询执行过程中重新优化查询计划 |
spark.sql.autoBroadcastJoinThreshold |
10M |
广播阈值大小 |
spark.sql.parquet.recordLevelFilter.enabled |
false |
使用下推过滤器启用 Parquet 的本机记录级过滤&spark.sql.parquet.filterPushdown |
根据文档参数,测试得到的优化方案:
1.根据机器的资源预留1g1c给系统,其他的默认可以作为资源分配的CPU和内存的比例:executor-memory,executor-cores,driver-memory(1或者512M都行) 例如 1:7
2.根据分配的资源数量确定并发数:spark.default.parallelism,spark.sql.shuffle.partitions = num-executors*executor-cores*(2-3)
3.根据文件格式及资源分配比例,等比例扩充spark.sql.files.maxPartitionBytes,spark.files.maxPartitionBytes,spark.reducer.maxSizeInFlight,spark.shuffle.file.buffer,spark.shuffle.sort.bypassMergeThreshold
先确定spark.sql.files.maxPartitionBytes大小,确定大小以后1024M/(当前128M)=8倍 然后剩余参数*8
4.增加其他配置spark.serializer,spark.sql.parquet.recordLevelFilter.enabled
5.增加慢任务检测机制:spark.speculation|true|spark.speculation.interval=30000,spark.speculation.quantile=0.8,spark.speculation.multiplier=1.5|
上面的参数就能确认spark的常用参数了,下面再增加一些sql的优化
1.主动进行分区和广播
有一些表经过数据处理后,数据量会很小,这时候可以进行主动的重分区。
2.SQL过长,多表join的时候,可以进行插入进临时表,会很大提升效率
3.对于数据增长过快的任务,可以使用动态资源,但是限定最大executor
标签:shuffle,
enabled,
sparksql,
sql,
false,
spark,
优化,
speculation
From: https://www.cnblogs.com/wuxiaolong4/p/16643990.html