接上文
内存优化
用以下三张表,做性能测试
RDD
1.1.1cache
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
object MemoryTuning {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("test")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("fs.defaultFS", "hdfs://mycluster")
ssc.hadoopConfiguration.set("dfs.nameservices", "mycluster")
useRddCache(sparkSession)
}
def useRddCache(sparkSession: SparkSession): Unit = {
val result = sparkSession.sql("select * from dwd.dwd_course_pay ").rdd
result.cache()
result.foreachPartition((p: Iterator[Row]) => p.foreach(item => println(item.get(0))))
while (true) {
//因为历史服务器上看不到,storage内存占用,所以这里加个死循环 不让sparkcontext立马结束
}
}
}
打成jar,上传到集群并去跑yarn任务,并在yarn界面查看spark ui
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --queue spark --class com.atguigu.sparksqltuning.MemoryTuning spark-sql-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
通过spark ui看到,rdd使用默认cache缓存级别,占用内存4.3GB,并且storage内存还不够,只缓存了75%
1.1.2kryo+序列化缓存
停止任务,使用kryo序列化并且使用rdd序列化缓存级别。使用kryo序列化需要修改spark的序列化模式,并且需要进程注册类操作。
import java.sql.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel
object MemoryTuning {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("test")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[CoursePay]))
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("fs.defaultFS", "hdfs://mycluster")
ssc.hadoopConfiguration.set("dfs.nameservices", "mycluster")
// useRddCache(sparkSession)
useRddKryo(sparkSession)
}
case class CoursePay(orderid: String, discount: BigDecimal, paymoney: BigDecimal, createtime: Timestamp, dt: String, dn: String)
def useRddKryo(sparkSession: SparkSession): Unit = {
import sparkSession.implicits._
val result = sparkSession.sql("select * from dwd.dwd_course_pay ").as[CoursePay].rdd
result.persist(StorageLevel.MEMORY_ONLY_SER)
result.foreachPartition((p: Iterator[CoursePay]) => p.foreach(item => println(item.orderid)))
while (true) {
//因为历史服务器上看不到,storage内存占用,所以这里加个死循环 不让sparkcontext立马结束
}
}
}
打成jar包在yarn上运行,查看storage所占内存,内存占用减少了1445.8mb并且缓存了100%。使用序列化缓存配合kryo序列化,可以优化存储内存占用
1.1.3选择
根据官网的描述,那么可以推断出,如果yarn内存资源充足情况下,使用默认级别MEMORY_ONLY是对CPU的支持最好的。但是序列化缓存可以让体积更小,那么当yarn内存资源不充足情况下可以考虑使用MEMORY_ONLY_SER配合kryo使用序列化缓存。
DataFrame、DataSet
根据官网描述,DataSet类似RDD,但是并不使用JAVA序列化也不使用Kryo序列化,而是使用一种特有的编码器进行序列化对象。那么来使用DataSet进行缓存
1.1.1cache
package com.atguigu.sparksqltuning
import java.sql.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel
object MemoryTuning {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("test")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("fs.defaultFS", "hdfs://mycluster")
ssc.hadoopConfiguration.set("dfs.nameservices", "mycluster")
// useRddCache(sparkSession)
// useRddKryo(sparkSession)
userDataSet(sparkSession)
}
case class CoursePay(orderid: String, discount: BigDecimal, paymoney: BigDecimal, createtime: Timestamp, dt: String, dn: String)
def userDataSet(sparkSession: SparkSession): Unit = {
import sparkSession.implicits._
val result = sparkSession.sql("select * from dwd.dwd_course_pay ").as[CoursePay]
result.cache()
result.foreachPartition((p: Iterator[CoursePay]) => p.foreach(item => println(item.orderid)))
while (true) {
}
}
}
提交任务,在yarn上查看spark ui,查看storage内存占用。内存使用612.3mb。
并且DataSet的cache默认缓存级别与RDD不一样,是MEMORY_AND_DISK
1.1.1序列化缓存
import java.sql.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel
object MemoryTuning {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("test")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("fs.defaultFS", "hdfs://mycluster")
ssc.hadoopConfiguration.set("dfs.nameservices", "mycluster")
// useRddCache(sparkSession)
// useRddKryo(sparkSession)
userDataSet(sparkSession)
}
case class CoursePay(orderid: String, discount: BigDecimal, paymoney: BigDecimal, createtime: Timestamp, dt: String, dn: String)
def userDataSet(sparkSession: SparkSession): Unit = {
import sparkSession.implicits._
val result = sparkSession.sql("select * from dwd.dwd_course_pay ").as[CoursePay]
result.persist(StorageLevel.MEMORY_AND_DISK_SER)
result.foreachPartition((p: Iterator[CoursePay]) => p.foreach(item => println(item.orderid)))
while (true) {
}
}
}
打成jar包,提交yarn。查看spark ui,storage占用内存646.2mb。和默认cache缓存级别差别不大。所以Dataframe可以直接使用cache。
所以从性能上来讲,DataSet,DataFrame是大于RDD的建议开发中使用DataSet、DataFrame
分区和参数控制
Spark sql默认shuffle分区个数为200,参数由spark.sql.shuffle.partitions控制,此参数只能控制Spark sql、DataFrame、DataSet分区个数。不能控制RDD分区个数
所以如果两表进行join形成一张新表,如果新表的分区不进行缩小分区操作,那么就会有200份文件插入到hdfs上,这样就有可能导致小文件过多的问题。那么一般在插入表数据前都会进行缩小分区操作来解决小文件过多问题。
2.1小文件过多场景
还是由上面视图三张表为例,进行join,先不进行缩小分区操作。查看效果。为了演示效果,先禁用了广播join。广播join下面会进行说明。
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object PartitionTuning {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("test")
.set("spark.sql.autoBroadcastJoinThreshold","-1")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("fs.defaultFS", "hdfs://mycluster")
ssc.hadoopConfiguration.set("dfs.nameservices", "mycluster")
testJoin(sparkSession)
}
def testJoin(sparkSession: SparkSession) = {
//查询出三张表 并进行join 插入到最终表中
val saleCourse = sparkSession.sql("select *from dwd.dwd_sale_course")
val coursePay = sparkSession.sql("select * from dwd.dwd_course_pay")
.withColumnRenamed("discount", "pay_discount")
.withColumnRenamed("createtime", "pay_createtime")
val courseShoppingCart = sparkSession.sql("select *from dwd.dwd_course_shopping_cart")
.drop("coursename")
.withColumnRenamed("discount", "cart_discount")
.withColumnRenamed("createtime", "cart_createtime")
saleCourse.join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
.join(coursePay, Seq("orderid", "dt", "dn"), "left")
.select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
, "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
"cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
.write.mode(SaveMode.Overwrite).insertInto("dws.dws_salecourse_detail")
}
}
提交yarn任务查看spark ui
可以看到有2个stage的分区个数是200,这两个stage分别对应代码两处join。再往里面点击查看task的运行情况。
可以看到cpu申请到的task任务数也不平均,那么就会造成cpu空转的情况,就像当前hadoop102,hadoop103的运行情况一样。那么这也是需要解决的问题。先来看小文件过多问题,通过浏览器访问NameNode查看对应路径产生的文件个数
一共产生了200份小文件,那么先解决小文件过多的问题.
2.2解决小文件过多问题
解决小文件过多问题也非常简单,在spark当中一个分区最终落盘形成一个文件,那么解决小文件过多问题只需将分区缩小即可。在插入表前,添加coalesce算子指定缩小后的分区个数。那么使用此算子需要注意,coalesce算子缩小分区后那么实际处理插入数据的任务只有一个,可能会导致oom,所以需要适当控制,并且coalesce算子里的参数只能填写比原有数据分区小的值,比如当前表的分区是200,那么填写参数必须小于200,否则无效。当然缩小分区后任务的耗时肯定会变久。
修改参数后,打成jar包重新在yarn上运行此任务。最后write阶段分区个数是20,再来看对应hdfs路径下产生的文件个数。
最终产生的文件个数为20个
2.3合理利用CPU资源
根据提交命令spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --queue spark --class com.atguigu.sparksqltuning.PartitionTuning spark-sql-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
去向yarn申请的executor vcore资源个数为12个(num-executors*executor-cores),如果不修改spark sql分区个数,那么就会像上图所展示存在cpu空转的情况。这个时候需要合理控制shuffle分区个数。如果想要让任务运行的最快当然是一个task对应一个vcore,但是数仓一般不会这样设置,为了合理利用资源,一般会将分区(也就是task)设置成vcore的2倍到3倍。
修改参数spark.sql.shuffle.partitions,此参数默认值为200。
那么根据我们当前任务的提交参数,将此参数设置为24或36为最优效果。
设置完参数,yarn上提交任务,再次运行
查看spark ui,点击相应stage,查看task详情
这张图就很明显了,分别hadoop101,hadop102,hadoop103各自申请到4个vcore,然后每个vcore都分配到了3个任务,也都是差不多时间点结束。充分利用了cpu的资源。
那么spark sql当中修改分区的方式就有3种了,分别是算子coalesce、repartition和参数spark.sql.shuffle.partitions
广播join
Spark join策略中,如果当一张小表足够小并且可以先缓存到内存中,那么可以使用Broadcast Hash Join,其原理就是先将小表聚合到driver端,再由driver端广播分发到各个大表分区中,那么再次进行join的时候,就相当于大表的各自分区的数据与小表进行本地join,从而规避了shuffle。
广播join默认值为10MB,由spark.sql.autoBroadcastJoinThreshold参数控制
那么观察历史任务,代码中我以将广播join禁用了
有两处stage,shuffle分区为36.分贝对应两个join。
再来看物理执行计划图
两张表都走了SortMerge Join。那么根据表大小可以看出哪些是小表,哪些是大表
可以针对小表join大表进行优化。
3.1通过参数进行优化
.set("spark.sql.autoBroadcastJoinThreshold", "10485760 ")
修改完参数后,再次打成jar包,在yarn上运行此任务查看stage,和执行计划图。当然默认值为10mb,当表小于10mb也可以不设置。
可以看到多出了一个broadcast exchange的stage,此操作就是广播小表的操作。这个stage在spark3.0中会显示,2.x系列版本中不会显示。join产生shffle task的个数为36的stage只剩下一个了,说明一个join的stage已经被优化掉了。再来查看执行计划图。
第一个小表join大表的stage从sortmerge join转换为了broadcast hashjoin.
整个任务的耗时也从2.5分钟优化到了2.2分钟
3.2通过api进行优化
通过api进行广播join的优化,为了避免参数自动生效,先将参数禁用。
再通过api对小表进行广播,广播join方法来自org.apache.spark.sql.functions类下,所以使用api时需要先导包,然后对小表进行广播,再去做join
再次打成jar包,在yarn上运行查看效果
同样生效,相比来说api更加灵活,因为参数总有临界值,使用api可以自己来控制。此处优化效果1.9分钟。sql任务每次跑耗时都会有差异,但针对小表join大表,广播join一定是起到了优化的效果。
标签:join,val,sparkSession,分区,SQL,sql,Spark,优化,spark From: https://www.cnblogs.com/kkk247843405/p/16842587.html