Spark 优化
- 定义和目标
定义: Spark 优化是指通过调整 Spark 应用程序的配置参数、代码结构和数据处理方式,以提高 Spark 作业的性能和效率。
目标: 优化的目标包括减少作业的执行时间、降低资源消耗、提高吞吐量等。优化可以涉及到多个方面,如内存管理、数据分区、任务调度、代码优化等。
一、参数优化
1、num-executors: executor的数量
2、executor-memory:每个executor的内存
3、executor-cores:每个executor的核数
4、driver-memory:driver的内存
5、spark.storage.memoryFraction:用于缓存的内存占比默认0.6
6、spark.shuffle.memoryFraction:spark shuffle 使用内存占比,默认0.2
7、spark.locality.wait :task执行时,等待时间,默认3秒
50G的数据需要多少资源
1、资源充足:每一个task由一个core处理,效率最高的(会浪费资源)
总的资源:400core,800G内存
--num-executors 50
--executor-cores 8
--executor-memory 16G
2、资源充足,合理利用资源,核的数在task数量的1/3-1/2之间就可以(充分利用资源)
--num-executors 25
--executor-cores 8
--executor-memory 16G
3、资源不足:看剩余资源,总的核数在剩余核的1/3-1/2之间
5台服务器(48核,128G, 10TB硬盘)= 总资源大概(200核,500G)
executor的数量最好小于等于服务器数量
--num-executors 5
--executor-cores 20
--executor-memory 40G
二、代码优化
缓存
缓存会将表的数据加载到Executo`r的内存或者磁盘上,如果表的数据量太大了,超过内存的上线,就没有必要使用缓存了,所以在使用缓存时需要注意以下几点:
1、理解缓存的作用和优势
(1)提高性能
当一个 RDD(弹性分布式数据集)被缓存后,Spark 会将其数据存储在内存或磁盘中,以便后续的操作可以更快地访问这些数据,避免重复计算。
例如,在一个迭代算法中,如果中间结果被缓存,那么每次迭代都可以直接从缓存中读取数据,而不需要重新计算,从而大大提高了算法的执行速度。
(2)减少网络传输
如果一个 RDD 是从远程数据源(如 HDFS 或 S3)读取的,缓存可以避免每次操作都从远程数据源读取数据,减少网络传输开销。
例如,在一个数据仓库应用中,如果需要多次查询同一个数据源,将该数据源对应的 RDD 缓存起来可以显著减少网络传输时间。
2、选择合适的存储级别
Spark 提供了多种存储级别,包括 MEMORY_ONLY、MEMORY_AND_DISK、DISK_ONLY 等。在选择存储级别时,需要注意各自的特点:
MEMORY_ONLY:内存充足,最快的访问速度,适用内存充足数据量不是很大时,建议优先选择这个
MEMORY_AND_DISK_SER:压缩之后放内存,放不下在放磁盘
MEMORY_ONLY_SER:压缩之后放内存,CPU的时间换内存空间
DISK_ONLY:如果数据可靠性要求较高,可以选择将数据存储在磁盘上,但尽量不选择
3、代码实现
-- 在sql中使用
cache table students;
uncache table students;
-- 在DSL中使用
studentDF.cache()
studentDF.unpersist()
-- 在RDD中使用
rdd.cache()
rdd.unpersist()
使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey
reduceByKey
1、对相同key的value进行聚合计算
2、会在每一个map task中对相同key的value预聚合,可以减少shuffle过程中传输的数据量,提高效率
3、只能处理相对简单的逻辑
studentsRDD
.reduceByKey(_+_)
.foreach(println)
aggregateByKey
是一个对(K,V)类型的 RDD(弹性分布式数据集)进行聚合操作的函数。它允许你在每个分区内进行部分聚合,然后在跨分区进行全局聚合,从而有效地处理大规模数据集。
val countRDD: RDD[(String, Int)] = kvRD
.aggregateByKey(0)(
seqOp = (u, v) => u + v, //map端聚合逻辑
combOp = (u1, u2) => u1 + u2 // reduce端聚合逻辑
)
使用mapPartitions
替代普通map Transformation
算子
mapPartitions
和foreachPartition
一样,当与需要读取外部数据时使用
比如创建数据库连接,使用MapPartitions
时,只会在分区内创建一次
使用foreachPartitions
替代foreach Action
算子
object DemoToMysql {
def main(args: Array[String]): Unit = {
//1、创建spark的执行环境
val conf = new SparkConf()
//设置运行模式
conf.setMaster("local")
conf.setAppName("wc")
val sc = new SparkContext(conf)
//2、读取数据
//RDD:弹性的分布式数据集(相当于List)
val linesRDD: RDD[String] = sc.textFile("data/words.txt")
//一行转换多行
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))
val kvRD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
//统计单词的数量
val countRDD: RDD[(String, Int)] = kvRD.reduceByKey((x, y) => x + y)
val start: Long = System.currentTimeMillis()
//foreachPartition: 训练分区
countRDD.foreachPartition(iter => {
//1 创建数据库连接
//每一个分区创建一个数据库连接
val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
val end: Long = System.currentTimeMillis()
println(end - start)
//在分区内循环
iter.foreach {
case (word, count) =>
//2 编写sql插入数据
val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")
stat.setString(1, word)
stat.setInt(2, count)
stat.execute()
}
con.close()
})
coalesce(numPartitions,false)
减少分区 没有shuffle只是合并 partition
object DemoRePartition {
def main(args: Array[String]): Unit = {
//1、创建spark的执行环境
val conf = new SparkConf()
//设置运行模式
conf.setMaster("local")
conf.setAppName("wc")
val sc = new SparkContext(conf)
//可以在读取数据时指定最小分区数设定RDD的分区数,需要保证每一个分区被等分
val studentsRDD: RDD[String] = sc.textFile("data/test3", 3)
println(s"studentsRDD分区数:${studentsRDD.getNumPartitions}")
//读取数据时,如果有很多小文件,可以合并小文件
val mergeRDD: RDD[String] = studentsRDD.coalesce(2, shuffle = false)
println(s"mergeRDD分区数:${mergeRDD.getNumPartitions}")
mergeRDD.foreach(println)
// //repartition: 重分区,会产生shuffle
// val rePartitionRDD: RDD[String] = studentsRDD.repartition(100)
// println(s"rePartitionRDD分区数:${rePartitionRDD.getNumPartitions}")
// rePartitionRDD.saveAsTextFile("data/test3")
//
// //coalesce(1, shuffle = false): 合并分区,不会产生shuffle
// //一班用于最后合并小文件
// val coalesceRDD: RDD[String] = rePartitionRDD.coalesce(1, shuffle = false)
// println(s"coalesceRDD分区数:${rePartitionRDD.getNumPartitions}")
//
// coalesceRDD.saveAsTextFile("data/test4")
}
}
广播大变量(Map join)
概念
在 Spark 中,广播变量是一种共享变量。当在分布式计算环境中,多个任务(Task)可能需要访问相同的数据时,如果每个任务都去获取原始数据副本,会导致数据的重复存储和传输,浪费大量的网络资源和内存。广播变量就是为了解决这个问题而设计的。它允许程序员将一个只读的数据变量(如一个大的查找表或者配置参数)在每个节点(Node)上缓存一份副本,这样各个任务就可以在本地访问这个数据,而不需要通过网络反复传输。
例如,假设你有一个机器学习任务,需要使用一个预训练好的模型参数文件,这个文件比较大。如果没有广播变量,每次在节点上执行的任务都要从存储系统中读取这个文件,这会导致大量的 I/O 开销。通过使用广播变量,这个文件可以被广播到各个节点并缓存起来,任务在本地就可以直接使用,大大提高了效率。
创建和使用
创建广播变量
:在 Spark 中,可以使用SparkContext.broadcast()方法来创建广播变量。
//1、将变量广播到Executor端
val broIds: Broadcast[Array[String]] = sc.broadcast(ids)
使用广播变量
:在 Spark 的转换(Transform)和动作(Action)操作中,可以使用广播变量。在 RDD(弹性分布式数据集)的操作函数(如map、filter等)中,可以通过value属性来访问广播变量的值
//2、获取广播变量
broIds.value.contains(id)
广播变量的优势和注意事项
优势
:
减少网络传输
:通过在每个节点上缓存数据,避免了数据的重复网络传输,特别是对于大数据集或者频繁使用的数据,这可以显著减少网络 I/O 开销。
提高性能
:由于数据在本地节点缓存,任务可以更快地访问数据,从而提高了整体计算速度。对于一些对性能敏感的应用场景,如实时数据处理或者迭代计算,广播变量的使用可以带来明显的性能提升。
注意事项
:
数据不可变性
:广播变量是只读的。这是因为在分布式环境中,如果多个任务可以修改广播变量的值,会导致数据的不一致性和难以预测的结果。所以在使用广播变量时,要确保广播的数据在整个计算过程中不需要修改。
变量生命周期
:广播变量的生命周期是与 SparkContext 相关联的。当 SparkContext 被销毁时,广播变量也会被销毁。因此,在使用广播变量时,要注意 SparkContext 的生命周期管理,避免在不适当的时候引用已经销毁的广播变量。
大小
:一般来说,广播变量的变量大小不超过100M
今天的分享就到这里了,期待下次给你们分享更多干货!!!