首页 > 其他分享 >Spark 优化

Spark 优化

时间:2024-10-23 21:14:22浏览次数:5  
标签:缓存 变量 val -- RDD 广播 Spark 优化

Spark 优化

  • 定义和目标
    定义: Spark 优化是指通过调整 Spark 应用程序的配置参数、代码结构和数据处理方式,以提高 Spark 作业的性能和效率。
    目标: 优化的目标包括减少作业的执行时间、降低资源消耗、提高吞吐量等。优化可以涉及到多个方面,如内存管理、数据分区、任务调度、代码优化等。

一、参数优化

1、num-executors: executor的数量
2、executor-memory:每个executor的内存
3、executor-cores:每个executor的核数
4、driver-memory:driver的内存
5、spark.storage.memoryFraction:用于缓存的内存占比默认0.6
6、spark.shuffle.memoryFraction:spark shuffle 使用内存占比,默认0.2
7、spark.locality.wait :task执行时,等待时间,默认3秒
50G的数据需要多少资源
1、资源充足:每一个task由一个core处理,效率最高的(会浪费资源)
总的资源:400core,800G内存
--num-executors 50
--executor-cores 8
--executor-memory 16G

2、资源充足,合理利用资源,核的数在task数量的1/3-1/2之间就可以(充分利用资源)
--num-executors 25
--executor-cores 8
--executor-memory 16G

3、资源不足:看剩余资源,总的核数在剩余核的1/3-1/2之间
5台服务器(48核,128G, 10TB硬盘)= 总资源大概(200核,500G)
executor的数量最好小于等于服务器数量
--num-executors 5
--executor-cores 20
--executor-memory 40G

二、代码优化

缓存
缓存会将表的数据加载到Executo`r的内存或者磁盘上,如果表的数据量太大了,超过内存的上线,就没有必要使用缓存了,所以在使用缓存时需要注意以下几点:

1、理解缓存的作用和优势

(1)提高性能
当一个 RDD(弹性分布式数据集)被缓存后,Spark 会将其数据存储在内存或磁盘中,以便后续的操作可以更快地访问这些数据,避免重复计算。
例如,在一个迭代算法中,如果中间结果被缓存,那么每次迭代都可以直接从缓存中读取数据,而不需要重新计算,从而大大提高了算法的执行速度。
(2)减少网络传输
如果一个 RDD 是从远程数据源(如 HDFS 或 S3)读取的,缓存可以避免每次操作都从远程数据源读取数据,减少网络传输开销。
例如,在一个数据仓库应用中,如果需要多次查询同一个数据源,将该数据源对应的 RDD 缓存起来可以显著减少网络传输时间。

2、选择合适的存储级别

Spark 提供了多种存储级别,包括 MEMORY_ONLY、MEMORY_AND_DISK、DISK_ONLY 等。在选择存储级别时,需要注意各自的特点:

MEMORY_ONLY:内存充足,最快的访问速度,适用内存充足数据量不是很大时,建议优先选择这个

MEMORY_AND_DISK_SER:压缩之后放内存,放不下在放磁盘

MEMORY_ONLY_SER:压缩之后放内存,CPU的时间换内存空间

DISK_ONLY:如果数据可靠性要求较高,可以选择将数据存储在磁盘上,但尽量不选择

3、代码实现

-- 在sql中使用 
cache table students;
uncache table students;

-- 在DSL中使用
 studentDF.cache()
 studentDF.unpersist()

 -- 在RDD中使用
 rdd.cache()
 rdd.unpersist()

使用高性能的算子

使用reduceByKey/aggregateByKey替代groupByKey

reduceByKey

1、对相同key的value进行聚合计算
2、会在每一个map task中对相同key的value预聚合,可以减少shuffle过程中传输的数据量,提高效率
3、只能处理相对简单的逻辑

studentsRDD
      .reduceByKey(_+_)
      .foreach(println)

aggregateByKey是一个对(K,V)类型的 RDD(弹性分布式数据集)进行聚合操作的函数。它允许你在每个分区内进行部分聚合,然后在跨分区进行全局聚合,从而有效地处理大规模数据集。

val countRDD: RDD[(String, Int)] = kvRD
      .aggregateByKey(0)(
        seqOp = (u, v) => u + v, //map端聚合逻辑
        combOp = (u1, u2) => u1 + u2 // reduce端聚合逻辑
      )

使用mapPartitions替代普通map Transformation算子

mapPartitionsforeachPartition一样,当与需要读取外部数据时使用

比如创建数据库连接,使用MapPartitions时,只会在分区内创建一次

使用foreachPartitions替代foreach Action算子

 object DemoToMysql {
  def main(args: Array[String]): Unit = {
    //1、创建spark的执行环境
    val conf = new SparkConf()
    //设置运行模式
    conf.setMaster("local")
    conf.setAppName("wc")
    val sc = new SparkContext(conf)
    //2、读取数据
    //RDD:弹性的分布式数据集(相当于List)
    val linesRDD: RDD[String] = sc.textFile("data/words.txt")
    
    //一行转换多行
    val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))
    val kvRD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))

    //统计单词的数量
    val countRDD: RDD[(String, Int)] = kvRD.reduceByKey((x, y) => x + y)

    val start: Long = System.currentTimeMillis()
    //foreachPartition: 训练分区
    countRDD.foreachPartition(iter => {
      //1 创建数据库连接
      //每一个分区创建一个数据库连接
      val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
      val end: Long = System.currentTimeMillis()
      println(end - start)

      //在分区内循环
      iter.foreach {
        case (word, count) =>
          //2 编写sql插入数据
          val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")
          stat.setString(1, word)
          stat.setInt(2, count)
          stat.execute()
      }
      con.close()
    })

coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition

object DemoRePartition {
  def main(args: Array[String]): Unit = {
    //1、创建spark的执行环境
    val conf = new SparkConf()
    //设置运行模式
    conf.setMaster("local")
    conf.setAppName("wc")
    val sc = new SparkContext(conf)

    //可以在读取数据时指定最小分区数设定RDD的分区数,需要保证每一个分区被等分
    val studentsRDD: RDD[String] = sc.textFile("data/test3", 3)
    println(s"studentsRDD分区数:${studentsRDD.getNumPartitions}")

    //读取数据时,如果有很多小文件,可以合并小文件
    val mergeRDD: RDD[String] = studentsRDD.coalesce(2, shuffle = false)
    println(s"mergeRDD分区数:${mergeRDD.getNumPartitions}")
    mergeRDD.foreach(println)


//    //repartition: 重分区,会产生shuffle
//    val rePartitionRDD: RDD[String] = studentsRDD.repartition(100)
//    println(s"rePartitionRDD分区数:${rePartitionRDD.getNumPartitions}")
//    rePartitionRDD.saveAsTextFile("data/test3")
//
//    //coalesce(1, shuffle = false): 合并分区,不会产生shuffle
//    //一班用于最后合并小文件
//    val coalesceRDD: RDD[String] = rePartitionRDD.coalesce(1, shuffle = false)
//    println(s"coalesceRDD分区数:${rePartitionRDD.getNumPartitions}")
//
//    coalesceRDD.saveAsTextFile("data/test4")
  }
}

广播大变量(Map join)
概念
在 Spark 中,广播变量是一种共享变量。当在分布式计算环境中,多个任务(Task)可能需要访问相同的数据时,如果每个任务都去获取原始数据副本,会导致数据的重复存储和传输,浪费大量的网络资源和内存。广播变量就是为了解决这个问题而设计的。它允许程序员将一个只读的数据变量(如一个大的查找表或者配置参数)在每个节点(Node)上缓存一份副本,这样各个任务就可以在本地访问这个数据,而不需要通过网络反复传输。
例如,假设你有一个机器学习任务,需要使用一个预训练好的模型参数文件,这个文件比较大。如果没有广播变量,每次在节点上执行的任务都要从存储系统中读取这个文件,这会导致大量的 I/O 开销。通过使用广播变量,这个文件可以被广播到各个节点并缓存起来,任务在本地就可以直接使用,大大提高了效率。
创建和使用
创建广播变量:在 Spark 中,可以使用SparkContext.broadcast()方法来创建广播变量。

//1、将变量广播到Executor端
    val broIds: Broadcast[Array[String]] = sc.broadcast(ids)

使用广播变量:在 Spark 的转换(Transform)和动作(Action)操作中,可以使用广播变量。在 RDD(弹性分布式数据集)的操作函数(如map、filter等)中,可以通过value属性来访问广播变量的值

//2、获取广播变量
        broIds.value.contains(id)

广播变量的优势和注意事项
优势
减少网络传输:通过在每个节点上缓存数据,避免了数据的重复网络传输,特别是对于大数据集或者频繁使用的数据,这可以显著减少网络 I/O 开销。
提高性能:由于数据在本地节点缓存,任务可以更快地访问数据,从而提高了整体计算速度。对于一些对性能敏感的应用场景,如实时数据处理或者迭代计算,广播变量的使用可以带来明显的性能提升。
注意事项
数据不可变性:广播变量是只读的。这是因为在分布式环境中,如果多个任务可以修改广播变量的值,会导致数据的不一致性和难以预测的结果。所以在使用广播变量时,要确保广播的数据在整个计算过程中不需要修改。
变量生命周期:广播变量的生命周期是与 SparkContext 相关联的。当 SparkContext 被销毁时,广播变量也会被销毁。因此,在使用广播变量时,要注意 SparkContext 的生命周期管理,避免在不适当的时候引用已经销毁的广播变量。
大小:一般来说,广播变量的变量大小不超过100M
今天的分享就到这里了,期待下次给你们分享更多干货!!!

标签:缓存,变量,val,--,RDD,广播,Spark,优化
From: https://www.cnblogs.com/zhuzhuya/p/18498114

相关文章

  • 质量流量计优化燃油滤清器泄漏测试的效率
    组件的质量部分取决于其压力或泄漏完整性。组件的一些示例包括燃料箱、机油和燃料过滤器、散热器、空调和加热系统组件、歧管、齿轮箱、软管等。在向汽车制造商交付零部件之前,零部件供应商通常需要测试和记录零部件的质量。一家领先的燃油滤清器制造商希望提高其质量控制测......
  • 反弹shell优化
    做个记录https://www.bilibili.com/video/BV1qp4y1Z7Pv?t=385.6有时反弹得到的shell是这样的:这种shell只能进行ls、cat这样的简单的查询动作,而对于进入mysql-uroot-p这种进入MySQL终端的交互行为就无能为力,这时就要在此shell上进行优化优化方式1:python3-c'importpty;p......
  • 图论优化
    图论优化三元环计数首先给所有边定向,从度数小的点指向度数大的点,如果度数一样,则从编号小的指向编号大的,最终形成一张DAG。枚举\(u\)以及\(u\)指向的点\(v\)以及\(v\)指向的点\(w\),如果\(u\)也指向\(w\)则成三元环。如果要一开始是有向图计数则最后判断一下\(u,v,w\)的方向即可......
  • 【PowerShell】如何优化脚本性能?
    优化PowerShell脚本性能可以从多个方面着手,以下是一些常见的策略和具体的例子来说明如何实现这些优化:1.减少不必要的循环描述:在处理大量数据时,避免使用过多的循环。可以考虑使用管道和内置cmdlet来替代。示例:低效代码:$files=Get-ChildItem-PathC:\Tempforeach(......
  • 内存优化的秘密:深入理解 Linux 中的 madvise
    madvise是一个在Linux和其他类Unix操作系统中使用的系统调用,用于向内核提供关于内存映射区域的建议。它可以帮助操作系统优化内存使用,以提高性能。使用场景madvise函数通常用于以下几种情况:预取数据:如果应用程序知道将来会使用某些数据,可以建议操作系统提前加载这些数据到内......
  • CATIA软件许可优化策略探讨
    在当今的工程设计领域,CATIA软件已成为众多企业和设计师的首选工具。然而,随着软件使用的普及和复杂度的提升,如何优化许可策略以提高使用效率并降低成本,成为了一个亟待探讨的课题。本文将围绕CATIA软件许可优化策略展开讨论,旨在帮助企业实现更高效、更经济的软件应用。一、了解许可......
  • 字符串优化
    字符串问题\(\mathcalO(nm)-\mathcalO(1)\)比较字符串子串大小令\(lcp_{x,y}=\operatorname{lcp}(s[x\simn],s[y\simn])\),有\[lcp_{x,y}=\left\{\begin{aligned}&lcp_{x+1,y+1}+1&&s_x=s_y\\&0&&s_x\not=s_y\end{aligned}\right.\]......
  • Vite 优化配置方案
    前言Vite是一个快速的前端构建工具,特别适用于现代前端框架如Vue和React。为了进一步提升项目的性能和开发体验,我们可以对Vite进行一些优化配置。本文将介绍一些常见的优化策略,并提供详细的配置示例和注释。1.安装必要的插件首先,我们需要安装一些常用的Vite插件来帮......
  • unity资源自动优化
    #ifUNITY_EDITORusingSystem.Collections;usingSystem.Collections.Generic;usingSystem.IO;usingSystem.Text;usingUnityEditor;usingUnityEngine;publicclassAutoOptimizeAssetes:UnityEditor.AssetPostprocessor{///<summary>///音频资......
  • 【Unity】发布微信小游戏-资源优化
    资源优化方向记录:1、首包场景里面使用的字体重新生成一个,只包含首包可能使用到的字符,可以将几M的字体缩到几时KB 2、减少大尺寸贴图使用,合理压缩图片格式3、使用AssetStudio等工具检查首包资源,查看包含了那部分资源,是否引用,是否过大 这里查到了一部分无使用的资源贴图......