一、任务占用资源计算
executor占用CPU = executor_instances * executor_cores * 10 * 0.8(0.1核)
executor占用内存 = executor_instances * (executor.memory + max(executor.memoryOverhead, OffHeap.size) + executor.pyspark.memory)(GB)
其中,若参数未手动设置,会分配默认值。
也就是说,使用默认参数,每个executor就会分配4g + max(5g, 3.7g) + 6g = 15g的内存,对于一般任务已经足够使用。
-- driver
spark.driver.cores 1
spark.driver.memory 4g
-- executor
spark.executor.cores 2
spark.executor.memory 4g
spark.executor.memoryOverhead 5g
spark.executor.pyspark.memory 6g
-- Bytes,约为3.7G
spark.memory.offHeap.size 4000000000
二、用户需要关注的参数
如上,在使用默认配置时,每个executor就会分配15g内存,已经足够一般任务使用。
所以用户一般只需配置spark.executor.instances,spark.sql.shuffle.partitions,spark.default.parallelism即可。
如果配置后发现还是报OOM错误,可适当提高内存参数,重要参数含义见下方。
推荐配置:
spark.executor.instances 50
spark.sql.shuffle.partitions 300
spark.default.parallelism 300
三、重要参数含义
1、driver相关参数
driver实际申请内存大小计算公式:driver.memory + driver.memoryOverhead
(1)spark.driver.memory
driver进程(JVM使用)的内存数,一般(memory/cores >= 2g)
通用配置:4g
df.collect()会返回所有数据的list,但是这个方法会将所有数据pull到driver,所以在遇到driver爆内存时,可以注意这一点。参数driver.memory调高。
参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。
(2)spark.driver.cores
默认1,driver程序使用的CPU内核数,若无过多driver单机处理操作,一般不需要配置
通用配置:2
(3)spark.driver.memoryOverhead
driver JVM堆外内存的大小,默认为max(384, 0.1 * spark.driver.memory)
此配置存在默认单位MB,因此直接配置数字或带具体单位,最少1g
通用配置:1g
2、executor相关参数
(1)spark.executor.instances
设置spark作业executor的个数executor.instances * executor.cores为当前application内并行运行task数,需要根据spark.sql.shuffle.partitions判断,一般保证executor.instances * executor.cores <= partitions / 2
通用配置:10
参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
(2)spark.executor.memory
每个executor进程(JVM使用)的内存大小
默认配置:4g
参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/4~1/3,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行
(3)spark.executor.cores
每个executor的core数目。每个core同一时间只能执行一个Task线程,cores的数目也就意味着每个executor并行task的数目。
每个task分配的内存大小是executor-memory/executor-cores,可以按照这个分析每个task所占用的内存大小,一般(memory/cores >= 2g)。
每个executor为1个进程,分配一个JVM,考虑到JVM加载task信息的数量,cores个数不要超过5,超出后会容易出现大量加载任务信息导致OOM的情况。
默认配置:2
(4)spark.executor.memoryOverhead
executor JVM堆外内存大小,一般运行非JVM的逻辑
此部分内存主要用于JVM自身,字符串, NIO Buffer(Driect Buffer)等开销。此部分为用户代码及Spark 不可操作的内存,不足时可通过调整参数解决。
此配置存在默认单位MB,因此直接配置数字或带具体单位,最少1g
默认配置:5g
(5)spark.executor.pyspark.memory
python的worker内存,仅在使用pyspark时生效
默认配置:6g
(6)spark.shuffle.spill.numElementsForceSpillThreshold
默认256000000,即256M。
shuffle超过该数据会强行落盘此配置存在单位B,因此直接配置数字即可
通用配置:256000000
(7)spark.sql.shuffle.partitions
对Spark SQL专用的设置
默认200,用于设置shuffle时partition的数目,只作用于SQL、DataSet的join/aggregations,无法对纯map操作生效。该参数代表了shuffle read task的并行度。
在用户shuffle OOM时,可考虑增大数目
通用配置:200
(8)spark.default.parallelism
在处理RDD时才会起作用,对Spark SQL的无效
默认200,与上面作用相同,只作用于RDD的join/reduceByKey等,无法对纯map操作生效。
通用配置:200
通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!
Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
(9)spark.memory.fraction
默认0.75,用于存放缓存数据和运行数据,剩余0.25为User Memory,存放用户定义的数据结构和Spark元数据信息。
在用户persist大量数据或者shuffle聚合数据量比较大时可以考虑增加该值
缓存持久化(persist) + 运行(shuffle+执行编写的代码) = memory.fraction,默认为0.75,persist用memory.storageFraction参数指定,默认0.5
(10)spark.memory.storageFraction(spark.memory.useLegacyMode(代表启用spark1.6前的版本)时,spark.storage.memoryFraction)
默认0.5, storage内存大小,用于存储缓存数据,剩余空间用于execute。
在Unified Memory Manage模式下,内存会自动调整,分配storage和execute使用,但是在storage内存不足时,会要回所有分配的内存。
在用户shuffle处理数据比较大时可减小该参数
(11)spark.memory.offHeap.size
设置JVM堆外内存大小,可以运行executor JVM相关计算,默认为5000000000,即5G。
一般任务中只有部分shuffle需要大量操作,内存可能OOM时启用,在spark.memory.offHeap.enabled设为true时启用。
此配置存在单位B,因此直接配置数字即可
默认配置:4000000000
标签:executor,配置,driver,默认,参数,内存,memory,Spark,spark From: https://www.cnblogs.com/yeyuzhuanjia/p/18076058