首页 > 其他分享 >Spark

Spark

时间:2024-08-02 21:50:46浏览次数:11  
标签:String val RDD 算子 spark 数据 Spark

Spark

core

spark作业执行的特点

* spark作业执行的特点:
* 1、只有遇到行动算子的时候,整个spark作业才会被触发执行
* 2、遇到几次,执行几次

算子

RDD

RDD: 弹性分布式数据集
* 弹性:数据量可大可小
* RDD类似于容器,但是本身存储的不是数据,是计算逻辑
* 当遇到行动算子的时候,整个spark作业才会被触发执行,是从第一个RDD开始执行,数据才开始产生流动
* 数据在RDD之间只是流动关系,不会存储
* 流动的数据量可以很大,也可以很小,所以称为弹性
* 分布式:
* spark本质上它是需要从HDFS中读取数据的,HDFS是分布式,数据block块将来可能会在不同的datanode上
* RDD中流动的数据,可能会来自不同的datanode中的block块数据
* 数据集:
* 计算流动过程中,可以短暂地将RDD看成一个容器,容器中有数据,默认情况下在内存中不会进行存储
* 后面会有办法将一个RDD的数据存储到磁盘中

RDD的5大特性

RDD的5大特性:(面试必问!)
* 1、RDD是由一系列分区构成
* 注意:
* 1)读文件时的minPartitions参数只能决定最小分区数,实际读取文件后的RDD分区数,由数据内容本身以及集群的分布来共同决定的
* 2)若设置minPartitions的大小比block块数量还少的话,实际上以block块数量来决定分区数
* 3)产生shuffle的算子调用时,可以传入numPartitions,实际真正改变RDD的分区数,设置多少,最终RDD就有多少分区
*
* 2、算子是作用在每一个分区上的
*
* 3、RDD与RDD之间存在一些依赖关系
* 1)窄依赖 前一个RDD中的某一个分区数据只会到后一个RDD中的某一个分区  一对一的关系
* 2)宽依赖 前一个RDD中的某一个分区数据会进入到后一个RDD中的不同分区中  一对多的关系  也可以通过查看是否产生shuffle来判断
* 3)整个spark作业会被宽依赖的个数划分若干个stage, Num(stage) = Num(宽依赖) + 1
* 4)当遇到产生shuffle的算子的时候,涉及到从前一个RDD写数据到磁盘中,从磁盘中读取数据到后一个RDD的现象,
*    注意:第一次触发执行的时候,磁盘是没有数据的,所以会从第一个RDD产生开始执行
*    当重复触发相同的执行的时候,对于同一个DAG有向无环图而言,会直接从shuffle之后的RDD开始执行,可以直接从磁盘读取数据。
* 5)一个阶段中,RDD有几个分区,就会有几个并行task任务
*
* 4、kv算子只能作用在kv的RDD上
*
* 5、spark会提供最优的任务计算方式,只移动计算,不移动数据。

map

//map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
//将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑

filter

//filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条

flatMap

flatMap: 将rdd中的数据每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合

Sample

/**
 * sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
 * 这个函数主要在机器学习的时候会用到
 */

GroupBy

/**
 * groupBy算子的使用
 *
 * 1、groupBy的算子,后面的分组条件是我们自己指定的
 * 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储
 */

GroupByKey

/**
 * GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
 * 也就说,只有kv格式的RDD才能调用kv格式的算子
 */

groupBy算子与groupByKey算子的区别

/**
 * 面试题:spark core中 groupBy算子与groupByKey算子的区别?
 * 1、代码格式上:
 * groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
 * groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
 *
 * 2、执行shuffle数据量来看
 *  groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
 *  所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
 */

groupByKey与reduceBykey的区别

/**
 * 面试题:
 * groupByKey与reduceBykey的区别?
 * 相同点:
 * 它们都是kv格式的算子,只有kv格式的RDD才能调用
 * 不同点:
 * 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
 * 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
 * 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
 *
 */

Union

//两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
//分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定

Join

join算子也要作用在kv格式的RDD上

内连接 两个rdd共同拥有的键才会进行关联
右连接 保证右边rdd键的完整性

MapValues

/**
 * mapValues函数也是作用在kv格式的算子上
 * 将每个元素的值传递给后面的函数,进行处理得到新的值,键不变,这个处理后的组合重新返回到新的RDD中
 */

MapPartitions

/**
 * mapPartitions:一次处理一个分区中的数据
 * 它与map的区别在于,map是每次处理一条数据就返回一条数据到下一个rdd
 * 而mapPartitions一次处理一个分区的数据,处理完再返回
 * 最后的处理效果和map的处理效果是一样的
 *
 * mapPartition可以优化与数据库连接的次数
 */

SortBy

对一个集合中的元素进行排序

foreach

/**
 * 行动算子,就可以触发一次作业执行,有几次行动算子调用,就会触发几次
 *
 * rdd是懒加载的性质
 */

collect

collect将rdd转成合适的scala中的数据结构

Cache

/**
 * 缓存:
 * 缓存的目的是为了spark core作业执行的时候,缩短rdd的执行链,能够更快的得到结果
 * 缓存的实现方式:
 *  1、需要缓存的rdd调用cache函数
 *  2、persist(StorageLevel.MEMORY_ONLY) 修改缓存级别
 *
 */

Checkpoint

/**
 * 永久将执行过程中RDD中流动的数据存储到磁盘(hdfs)中
 * checkpoint
 *
 * 需要设置checkpoint的路径,统一设置的
 *
 * checkpoint也相当于一个行动算子,触发作业执行
 * 第二次DAG有向无环图执行的时候,直接从最后一个有检查点的rdd开始向下执行
 */

checkpoint和cache的区别

/**
 * checkpoint和cache的区别?
 * cache是将一个复杂的RDD做缓存,将来执行的时候,只是这个rdd会从缓存中取
 * checkpoint是永久将rdd数据持久化,将来执行的时候,直接从检查点的rdd往后执行
 */

standalone

/**
 * standalone
 *  - client模式提交命令:
 *    spark-submit --class com.shujia.core.Demo18Standalone --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar 10
 *
 *  - cluster模式提交命令:
 *    spark-submit --class com.shujia.core.Demo18Standalone --master spark://master:7077 --executor-memory 512M --total-executor-cores 1 --deploy-mode cluster spark-1.0.jar 10
 */

累加器Accumulator

必须要有行动算子触发作业执行
    //创建累加器变量
    //    val c1: LongAccumulator = sc.longAccumulator("c1")

    //    linesRDD.foreach((e: String) => {
    //      num += 1
    //      println("-----------------------")
    //      println(num)
    //    })
    //    println(s"num的值为:$num") // 0  1000

    //使用累加器
    //    val c1: LongAccumulator = sc.longAccumulator("c1")
    //    linesRDD.foreach((e:String)=>{
    //      c1.add(1)
    //    })
    //    println(s"累加之后的值为:${c1.value}")

    //使用累加器
    //    val c1: LongAccumulator = sc.longAccumulator("c1")
    //    linesRDD.map((e: String) => {
    //      c1.add(1)
    //    }).collect()
    //    println(s"累加之后的值为:${c1.value}")
* 写spark core程序的注意事项
* 1、RDD中无法嵌套使用RDD
* 2、RDD中无法使用SparkContext

广播大变量

使用SparkContext中的一个功能,将Driver端的变量广播到executor执行的节点上的blockManager中
    val bc: Broadcast[mutable.Map[String, String]] = sc.broadcast(map1)


    val scoreRDD: RDD[String] = sc.textFile("spark/data/score.txt")

    //未使用广播变量
//    val resRDD: RDD[(String, String, String)] = scoreRDD.map((line: String) => {
//      val array1: Array[String] = line.split(",")
//      val id: String = array1(0)
//      //      通过map1的变量,通过键获取值
//      val info: String = map1.getOrElse(id, "查无此人")
//      val score: String = array1(2)
//      (id, info, score)
//    })

    //使用广播变量
    val resRDD: RDD[(String, String, String)] = scoreRDD.map((line: String) => {
      val array1: Array[String] = line.split(",")
      val id: String = array1(0)
        //通过广播过来的大变量,进行关联数据
      val map2: mutable.Map[String, String] = bc.value
      val info: String = map2.getOrElse(id, "查无此人")
      val score: String = array1(2)
      (id, info, score)
    })

SQL

/**
 * spark sql处理数据的步骤
 * 1、读取数据源
 * 2、将读取到的DF注册成一个临时视图
 * 3、使用sparkSession的sql函数,编写sql语句操作临时视图,返回的依旧是一个DataFrame
 * 4、将结果写出到hdfs上
 */
/**
 * spark sql是spark core的上层api,如果要想使用rdd的编程
 * 可以直接通过sparkSession获取SparkContext对象
 */
 spark sql的核心数据类型是DataFrame
/**
 * sql语句是无法直接作用在DataFrame上面的
 * 需要提前将要使用sql分析的DataFrame注册成一张表(临时视图)
 */
/**
 * 如果要想使用DSL语法编写spark sql的话,需要导入两个隐式转换
 */
/**
 * 读取json数据文件,转成DF
 * 读取json数据的时候,是不需要指定表结构,可以自动根据json的键值来构建DataFrame
 */
/**
 * spark core的核心数据结构是:RDD
 * spark sql的核心数据结构是DataFrame
 */
/**
 * 开窗:over
 *  聚合开窗函数:sum  count  lag(取上一条)  lead(取后一条)
 *  排序开窗函数:row_number rank dense_rank
 *

Streaming

/**
 * Spark core: SparkContext 核心数据结构:RDD
 * Spark sql: SparkSession 核心数据结构:DataFrame
 * Spark streaming: StreamingContext  核心数据结构:DStream(底层封装了RDD)
 */
val conf = new SparkConf()
    conf.setMaster("local[2]") // 给定核数
    conf.setAppName("spark Streaming 单词统计")
    val sparkContext = new SparkContext(conf)

    //创建Spark Streaming的运行环境,和前两个模块是不一样的
    //Spark Streaming是依赖于Spark core的环境的
    //this(sparkContext: SparkContext, batchDuration: Duration)
    //Spark Streaming处理之前,是有一个接收数据的过程
    //batchDuration,表示接收多少时间段内的数据
    val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

    //Spark Streaming程序理论上是一旦启动,就不会停止,除非报错,人为停止,停电等其他突然场景导致程序终止
    //监控一个端口号中的数据,手动向端口号中打数据
    val rids: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
    //hello world

    val wordsDS: DStream[String] = rids.flatMap(_.split(" "))
    val kvDS: DStream[(String, Int)] = wordsDS.map((_, 1))
    val resDS: DStream[(String, Int)] = kvDS.reduceByKey(_ + _)

//    val resDS: DStream[(String, Int)] = rids.flatMap(_.split(" "))
//      .map((_, 1))
//      .reduceByKey(_ + _)

    println("--------------------------------------")
    resDS.print()
    println("--------------------------------------")

    /**
     * sparkStreaming启动的方式和前两个模块启动方式不一样
     */
    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()

foreachRDD

/**
 * foreachRDD:在DS中使用rdd的语法操作数据
 * 缺点:该函数是没有返回值的
 * 需求:我们在想使用DS中的RDD的同时,想要使用结束后,会得到一个新的DS
 */

tranformat


面试题:foreachRDD与transform的区别

SaveFile

//将结果存储到磁盘中
//只能设置文件夹的名字和文件的后缀
//每一批次运行,都会产生新的小文件夹,文件夹中有结果数据文件

优化

Cache

    /**
     * 缓存的目的是避免每一次job作业执行的时候,都需要从第一个rdd算起
     * 对重复使用RDD进行缓存
     * cache 设置不了缓存级别
     * persist 可以设置缓存级别
     */
//    stuRDD.cache() // 默认的缓存级别是MEMORY_ONLY
    stuRDD.persist(StorageLevel.MEMORY_ONLY_SER)

AggregateByKey

//aggregateByKey
//aggregateByKey(zeroValue: U)(seqOp: (U, V) => U,  combOp: (U, U) => U)
//zeroValue: 初始值,这个参数只会被后面第一个参数函数所使用
//seqOp: 相当于map端预聚合的逻辑
//combOp: 相当于reduce端的聚合逻辑
val resRDD3: RDD[(String, Int)] = clazzKVRDD.aggregateByKey(0)(
  //相当于map端预聚合的逻辑
  (a1: Int, a2: Int) => a1 + a2,
  //相当于reduce端的聚合逻辑
  (b1: Int, b2: Int) => b1 + b2
)

MapPartitions

/**
 * 实际上针对上面的案例,我们可以针对rdd中的每一个分区创建一个工具对象,在每条数据上使用
 * mapPartitions,将每一个分区中的数据封装成了一个迭代器
 */
val resRDD: RDD[(String, String, String)] = lineRDD.mapPartitions((itr: Iterator[String]) => {
  println("===================创建一次对象=============================")
  val sdf = new SimpleDateFormat("yyyy/MM/dd")
  val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  itr.map((line: String) => {
    val info: Array[String] = line.split("\t")
    val t1: String = info(1)
    val date: Date = sdf.parse(t1)
    val t2: String = sdf2.format(date)
    (info(0), t2, info(2))
  })
})

Coalese

/**
 * coalesce
 *
 * 1、默认增大分区是不会产生shuffle的
 * 2、合并分区直接给分区数,不会产生shuffle
 */
    val resRDD1: RDD[String] = lineRDD.coalesce(10, shuffle = true)
    println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")
//    resRDD1.foreach(println)

    val resRDD2: RDD[String] = resRDD1.coalesce(1,shuffle = true)
    println(s"resRDD2的分区数:${resRDD2.getNumPartitions}")
    resRDD2.foreach(println)
/**
 * Coalesce算子通常是用在合并小文件时候使用
 * 对应的spark core中的话,通常使用该算子进行合并分区
 */

client模式提交作业资源调度流程

1,首先,由客户端上的Driver提交作业命令

2,向ResourceManger申请资源

3,ResourceManger会检查一些权限,资源空间,在一个相对空闲的子结点上开启一个ApplicationMaster进程

4,然后ApplicationMaster向ResourceMaster申请资源,启动Executer

5,最后由Execter反向注册给Driver,告诉Driver资源申请完毕,可以发送任务

client模式提交作业资源调度以及任务调度流程

6,当任务遇到action算子时,触发整个作业调度

7,首先,将任务提交给DAG有向无环图(DAG Scheduler)

8,在DAG Scheduler里根据rdd之间的血统,找出宽窄依赖

9,通过宽窄依赖划分stage阶段

10,根据stage阶段,将stage中的task任务封装成一个taskSet对象

11,将这些taskSet对象发送给后续的Task Scheduler

12,从DAG Scheduler中发送过来的taskSet对象中取出task任务

13,根据RDD5大特性的最后一个特性,只移动计算不移动数据,将task任务发送到对应的executor的线程池中执行。

cluster模式提交作业资源调度

cluster模式提交作业不会在客户端产生Driver进程,提交之后,提交进程就可以关闭

1、客户端向ResourceManger申请资源

2、ResourceManger会检查一些权限,资源空间,在一个i相对空闲的子结点上开启一个ApplicationMaster进程

3、这里的ApplicationMaster相当于一个Driver,在启动完之后会申请Executor资源

4、然后将task任务发送到Executor中执行

spark yarn cluster模式相较于client模式而言,

不会在提交作业的节点上产生Driver进程,而是选择一个相对空闲的子节点ApplicationMaster,这里的ApplicationMaster相当于一个Driver这样的话,就减轻了客户端的压力,可以提交更多的spark作业

提交作业的节点与Driver的启动节点不是同一个的,就会导致我们无法在提交作业的节点上看到日志,如果是yarn cluster模式提交的话,我们只能够通过yarn logs -applicationld的命令查看yarn的作业日志来看到运行结里

提高数据本地化级别

PROCESS_LOACL进程本地化

task要计算的数据在同一个Executor的进程中

NODE_LOCAL节点本地化

task要计算的数据是在同一个Worker的不同Executor进程中
task计算的数据在同一个Worker的磁盘上
Spark计算的数据来源于HDFS,那么最好的数据本地化级别就是NODE_LOCAL

NODE_PREF

比如说sparkSQL读取MySql中的数据

RACK_LOCAL机架本地化

(1)、TASK计算的数据在Worker 2的Executor进程中
(2)、TASK计算的数据在Worker 2的磁盘上

ANY

跨机架

Spark数据的本地化:计算移动,数据不移动
Spark里面的数据本地化分为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY
Spark里面数据本地化有谁负责计算?
DAG Scheduler和Task Scheduler

Spqrk调优

避免创建重复的RDD
尽可能复用同一个RDD
对多次使用的RDD进行持久化 <===
尽量避免使用shuffle类算子
使用map-side预聚合的shuffle操作
使用高性能的算子 <===
广播大变量<===
使用Kryo优化序列化性能
优化数据结构
使用高性能的库fastutil

数据倾斜的七种解决方案(面试)

使用Hive ETL预处理数据===》

在Spark作业进行之前,通过Hive来对数据进行预处理,将Spark需要做的join或者聚合操作提前到Hive中就可以避免数据倾斜了,不过这种情况适用于Hive表中数据分布不均,且对表频繁的执行分析操作。

过滤少数导致倾斜的key===》

如果发现导致倾斜的key就少数几个,对计算本身影响不大,可以通过Spark SQL中的Where对其进行过滤或者在Spark Core中对RDD执行filter算子过滤掉这些key

提高shuffle操作的并行度

通过设置spark.sql.shuffle.partitions,增加shuffle read task 的数量,让每个task处理更少的数据,从而缩短task的执行时间

双重聚合

这个方案就是实现两阶段聚合,先进行局部聚合,先给key打上一个随机数,然后对其先进行一次reduceByKey操作,再将得到的结果进行全局聚合,就可以得到全局聚合了

将reduce join转为map join

不使用join算子进行连接操作,而通过Broadcast变量与map类算子实现join操作, 进而规避掉shuffle类的操作,避免数据倾斜的发生

采样倾斜key并分拆join操作==》

1、对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个 key的数量,计算出来数据量最大的是哪几个key。
2、然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以 内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
3、接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数 据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个 RDD。
4、再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打 散成n份,分散到多个task中去进行join了。
5、而另外两个普通的RDD就照常join即可。
6、最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

使用随机前缀和扩容RDD进行join

通过查看RDD/Hive表中的数据分布情况,找到那个造成 数据倾斜的RDD/Hive表,
然后将该RDD的每条数据都打上一个n以内的随机前缀,同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一 个0~n的前缀。
最后将两个处理后的RDD进行join即可

标签:String,val,RDD,算子,spark,数据,Spark
From: https://www.cnblogs.com/justice-pro/p/18339669

相关文章

  • Spark内存计算引擎原理与代码实例讲解
    Spark内存计算引擎原理与代码实例讲解关键词:Spark,内存计算,RDD,DAG,Shuffle,容错,分布式计算1.背景介绍1.1问题的由来随着大数据时代的到来,传统的基于磁盘的MapReduce计算框架已经无法满足实时计算、迭代计算等场景对计算性能的要求。Spark应运而生,其基于内......
  • 【Spark高级应用】使用Spark进行高级数据处理与分析
    Spark高级应用使用Spark进行高级数据处理与分析引言在大数据时代,快速处理和分析海量数据是每个企业面临的重大挑战。ApacheSpark作为一种高效的分布式计算框架,凭借其高速、易用、通用和灵活的特点,已经成为大数据处理和分析的首选工具。本文将深入探讨如何使用Spark进行......
  • 图书《数据资产管理核心技术与应用》核心章节节选-3.1.2. 从Spark 执行计划中获取数据
    本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。从Spark执行计划中获取数据血缘因为数据处理任务会涉及到数据的转换和处理,所以从数据任务中解析血缘也是获取数据血缘的渠道之一,Spark是大数据中数据处理最常用的一个技术组件,既可以做实......
  • 如何在 PySpark 中将二进制图像数据转换为 RGB 数组?
    我有一个具有以下架构的pysparkdf:root|--array_bytes:binary(nullable=true)我希望能够将其转换为图像数组。我可以使用以下代码在Pandas中完成此操作:df_pandas=df.toPandas()defbytes_to_array(byte_data):arr=np.frombuffer(byte_data,dtype=np......
  • 无法过滤掉 PySpark 中巨大数据集中的数据帧
    我有一个巨大的PySpark数据框,其中包含1.5B行,包括列fieldA我有一个8.8M唯一fieldA值的列表,我想从1.5B行中过滤掉。但是,我认为由于数据量较大,我不断收到类似StackOverflowError或OutOfMemoryError的错误。我尝试将8.8M列表拆分......
  • AI创作商业系统软件源码(SparkAi系统) AI换脸/智能体GPTs应用/AI视频生成AI绘画/文档分
    AI创作商业系统软件源码(SparkAi系统)AI换脸/智能体GPTs应用/AI视频生成AI绘画/文档分析/GPT4.0模型支持目录一、人工智能SparkAi创作系统二、功能模块介绍系统快速体验三、系统功能模块3.1AI全模型支持/插件系统AI模型提问AI智能体文档分析多模态识图理解TTS&......
  • 《Milvus Cloud向量数据库》——Spark Connector 工作原理及使用场景
    SparkConnector工作原理及使用场景深度解析在大数据处理与机器学习领域,ApacheSpark和Databricks已成为处理海量数据的首选工具。它们不仅能够高效地处理结构化数据,还擅长以批量的方式处理非结构化数据,进行数据清洗,并调用模型生成Embedding向量。然而,在处理完这些数据......
  • Catalyst优化器:让你的Spark SQL查询提速10倍
    目录1逻辑优化阶段2.1逻辑计划解析2.2逻辑计划优化2.2.1Catalys的优化过程2.2.2CacheManager优化2物理优化阶段2.1优化SparkPlan2.1.1Catalyst的 Join策略2.1.2 如何决定选择哪一种Join策略2.2PhysicalPlan2.2.1EnsureRequirements规则Spar......
  • 需要在 Windows 10 上安装 Pyspark 的帮助
    我正在尝试在我的笔记本电脑上安装Pyspark并按照https://medium.com/@deepaksrawat1906/a-step-by-step-guide-to-installing-pyspark-on-windows完成所有步骤-3589f0139a30https://phoenixnap.com/kb/install-spark-on-windows-10当我去设置我的Spark......
  • 01-从WordCount程序理解Spark术语及术语间的关系
    1.应用程序(Application)通过下面的代码设置应用程序名称,设置后再UI中可以看到相应的名称。//1.设置Application的名称valconf=newSparkConf()conf.setAppName("WordCount")conf.setMaster("local")2.作业(Job)Job由scala的执行算子生成,每个执行的算子会调起runjob,从而......