首页 > 其他分享 >Spark+实例解读

Spark+实例解读

时间:2024-07-26 14:55:29浏览次数:19  
标签:map val RDD label 解读 rdd 实例 算子 Spark

第一部分 Spark入门

第二部分 SparkCore

2 RDD

2.1 转换算子-map

map是将RDD的数据一条条处理,返回新的RDD

# 定义方法
def add(data):
    return data*10
print(rdd.map(add).collect)
# 定义lamabda表达式
rdd.map(lambda data:data*10)
2.2 转换算子-flatMap

flatMap对RDD执行map操作,然后执行解除嵌套操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)
    }.reduceByKey(_ + _).map { case ((feature, label), num) =>
      (feature, List((label, num))) //feature,label,cnt
    }.reduceByKey(_ ::: _).mapValues { x =>
      val size_entro = x.map(_._2).sum
      val res = x.map(_._2.toDouble / size_entro).map { t =>
        -t * (Math.log(t) / Math.log(2))
      }.sum
      size_entro * res
    }.mapValues { x => x / size }.map(_._2).sum

 

2.3转换算子-reduceByKey

针对KV型RRDD自动按照key进行分组,然后按照提供的聚合逻辑,对组内数据value完成聚合操作

rdd.reduceByKey(func)

      val clickStat = joinDf
        .where(F.col("active_type")==="click")
        .rdd
        .map(row => {
          val mapInfo = Option(row.getMap[String,Double](row.fieldIndex(feat)))
          mapInfo match {
            case Some(x) => x
            case _ => null
          }
        })
        .filter(_!=null)
        .flatMap(x=>x)
        .reduceByKey(_+_)
2.4 转换算子-mapValues

针对二元元组RDD,对其内部的二元元组的value进行map操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)
    }.reduceByKey(_ + _).map { case ((feature, label), num) =>
      (feature, List((label, num))) //feature,label,cnt
    }.reduceByKey(_ ::: _).mapValues { x =>
      val size_entro = x.map(_._2).sum
      val res = x.map(_._2.toDouble / size_entro).map { t =>
        -t * (Math.log(t) / Math.log(2))
      }.sum
      size_entro * res
    }.mapValues { x => x / size }.map(_._2).sum
2.5 转换算子-groupBy

将RDD的数据进行分组

rdd.groupBy(func)

rdd = sc.parallelize([('a',1),('a',11),('b',1)])
# 通过这个函数确认按照谁来分组(返回谁即可)
print(rdd.groupBy(lambda x:x[0]).collect())
print(rdd.groupBy(lambda x:x[0]).collect())
# 结果为:
​
    val userContentListHis = spark.thriftSequenceFile(inpath_his, classOf[LongVideoUserContentStat])
      .map(l=>{
        (l.getUid,l.getContent_properties.get(0).getId)
      }).toDF("uid", "docid")
      .groupBy($"uid")
2.6 转换算子-filter

过滤想要的数据进行保存

rdd = sc.parallelize([1,2,3,4,5,6])
rdd.filter(lamdba x:x%2 == 1) # 只保留奇数
    val treatmentUser = spark.read.option("header", false).option("sep", "\t").csv(inpath)
      .select("_c0")
      .withColumnRenamed("_c0", "userid")
      .withColumn("flow", getexpId($"userid"))
      .filter($"flow" >= start and $"flow" <= end)
      .select("userid")
      .dropDuplicates()
2.7 转换算子-其他算子
distinct算子
rdd.distinct() 一般不写去重分区
    val userContentHis = hisPathList.map(path =>{
      val hisData = spark.thriftSequenceFile(path, classOf[LongVideoUserContentStat])
      println(s"hisData ==>${hisData.count()}")
      hisData
    }).reduce(_ union _).distinct().repartition(partition)
union算子 
2个rdd合并成一个rdd:rdd.union(other_rdd)
只合并不去重  rdd的类型不同也是可以合并的
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,2,3,4])
rdd3 = rdd1.union(rdd2)
2.8 算子面试题
1.groupByKey和reduceByKey的区别:
groupByKey仅仅有分组功能而已,reduceByKey除了分组还有聚合作用,是一个分组+聚合一体化的算子. 分组前先聚合再shuffle,预聚合,被shuffle的数据极大的减少,提升了性能.数据量越大,reduceByKey的性能优势也就越大.
​
2.rdd的分区数怎么查看? 
通过getNumPartitions API查看,返回int
​
3.Transformation和Action的区别:
转换算子的返回值100%是rdd,而Action算子不一定.转换算子是懒加载的,只有遇到Action才会执行
​
4.哪两个算子不经过Driver直接输出?
foreach 和 saveAsTextFile

3 RDD的持久化

3.1 RDD的持久化

rdd是过程数据 rdd进行相互迭代计算,执行开启时,新的RDD生成,老的RDD消失

3.2 RDD的缓存

val rawLog = profilePushLogReader(spark, date, span).persist()
3.3 RDD的checkPoint

也是将RDD的数据保存起来,仅支持磁盘存储,被认为是安全的, 不保留血缘关系

3.4 缓存面试题

4 案例

4.1 搜素引擎日志分析案例
4.2
4.3 ....
4.4 计算资源面试题
1.如何尽量提升任务计算的资源?
计算cpu核心和内存量,通过--executor-memory指定executor内存,通过--executor-cores指定executor的核心数
​

5 广播变量 累加器

标签:map,val,RDD,label,解读,rdd,实例,算子,Spark
From: https://blog.csdn.net/wangning0714/article/details/140715644

相关文章

  • Spring Boot 对接文心一言,实现ai抠图实例
    上篇文章:SpringBoot对接文心一言讲述了在springboot项目中如何集成文心一言。现在我们来做个实例,实现AI抠图。文心一言的抠图功能通常需要通过调用文心一言的API来实现。在SpringBoot项目中,你可以通过RestTemplate或者WebClient来发起HTTP请求调用文心一言的API。实......
  • 运行 Spark-Shell 程序时出现错误
    我正在尝试创建SparkShell程序,但在运行时出现错误。下面是我正在执行的代码。frompyspark.sqlimport*frompysparkimportSparkConffromlib.loggerimportLog4j#conf=SparkConf()#conf.set("spark.executor.extraJavaOptions","-Dlog4j.configuration=f......
  • 将多个文件并行读取到 Pyspark 中的单独数据帧中
    我正在尝试将大型txt文件读入数据帧。每个文件大小为10-15GB,因为IO需要很长时间。我想并行读取多个文件并将它们放入单独的数据帧中。我尝试了下面的代码frommultiprocessing.poolimportThreadPooldefread_file(file_path):returnspark.read.csv(file......
  • 尝试使用 PySpark show 函数显示结果时出错
    我正在尝试在PySpark中显示我的结果。我正在使用Spark3.5.1和安装了Java8的PySpark3.5.1,一切都设置良好。建议添加此内容的一些答案:importfindsparkfindspark.init()或添加此内容到配置:.config("spark.memory.offHeap.enabled","true")\.config("s......
  • stable diffusion文生图代码解读
    使用diffusers运行stablediffusion,文生图过程代码解读。只按照下面这种最简单的运行代码,省略了一些参数的处理步骤。fromdiffusersimportDiffusionPipelinepipeline=DiffusionPipeline.from_pretrained(MODEL_PATH,torch_dtype=torch.float16)pipeline.to("cuda......
  • Springboot 的Bean生命周期五步、七步、十步详解以及框架源码解读生命周期-面试热点-x
    文章目录Springboot的Bean生命周期五步、七步、十步详解以及框架源码解读生命周期为什么要知道Bean的生命周期Bean的生命周期之五步堆栈信息:代码验证执行结果为Bean生命周期之七步执行结果Bean生命周期之十步增加的三步测试代码如下:执行结果:Bean的生命周期总结其他......
  • 变量的定义、分类和使用的实例代码
    目录什么是变量变量的定义格式:变量的分类类变量和成员变量的区别什么是变量定义:在程序执行的过程中,有可能发生改变的值(可以简单理解为用来存储数据的盒子)变量的定义格式:数据类型变量名=数据值;inta=2;//这就是变量a的定义语句,赋初始值2变量的分类java中主要有:局......
  • 黑暗之魂2缺失steam_api.dll该如何处理?全面解读《黑暗之魂2》steam_api.dll丢失及高效
    在游戏的世界中,《黑暗之魂2》凭借其独特的魅力吸引了众多玩家。然而,有时玩家会遭遇steam_api.dll文件丢失的困扰,这无疑给游戏体验带来了极大的阻碍。在这篇文章中,我们将为您进行全面解读。首先深入剖析steam_api.dll文件丢失的原因。可能是由于游戏安装过程中的错误、系统更新......
  • 将 Pandas 数据帧转换为 Spark 数据帧错误
    我正在尝试将PandasDF转换为Sparkone。DFhead:10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,54310000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,61110000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,......
  • Makefile知识点总结(Linux下开发Risc-V单片机实例)
    Makefile会不会写makefile,从一个侧面决定一个人是否具备完成大型工程的能力。Makefile和make命令一起配合使用,为什么要使用makefile,原因以及优点在下文解释。简单辨析一下建立工程的三种方式Makefile使用非常广泛,通用性强,可跨平台但是语法比较严格,写一个通用,便于管理......