首页 > 其他分享 >假期学习记录10

假期学习记录10

时间:2024-01-23 19:33:28浏览次数:14  
标签:10 parallelize val RDD 假期 记录 scala rdd spark

  本次学习学习了RDD的编程概述

RDD创建

1、从文件系统中加载数据创建RDD

Spark采用textFile()方法来从文件系统中加载数据创建RDD该方法把文件的URI作为参数,这个URI可以是:本地文件系统的地址或者是分布式文件系统HDFS的地址或者是Amazon S3的地址等等

本地进行加载

scala> val lines = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/wordcount/word.txt MapPartitionsRDD[1] at textFile at <console>:23

2、通过并行集合(数组)创建RDD

可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

scala> val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

RDD操作

转换操作

计算轨迹,不进行计算

操作

含义

filter(func)

筛选出满足函数func的元素,并返回一个新的数据集

map(func)

将每个元素传递到函数func中,并将结果返回为一个新的数据集

flatMap(func)

与map()相似,但每个输入元素都可以映射到0或多个输出结果

groupByKey()

应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

reduceByKey(func)

应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果

行动操作

进行计算,从头开始

操作

含义

count()

返回数据集中的元素个数

collect()

以数组的形式返回数据集中的所有元素

first()

返回数据集中的第一个元素

take(n)

以数组的形式返回数据集中的前n个元素

reduce(func)

通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

foreach(func)

将数据集中的每个元素传递到函数func中运行

惰性机制:不遇到行动操作之前不进行计算,前面转换操作记录要干的事情,遇到行动操作就从头开始做

持久化

可以通过持久化(缓存)机制避免这种重复计算的开销

可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化持久化后的

RDD将会被保留在计算节点的内存中被后面的行动操作重复使用

persist()的圆括号中包含的是持久化级别参数:

1、persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容

2、persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上

3、一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)

4、可以使用unpersist()方法手动地把持久化的RDD从缓存中移除

scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
//转换为RDD
scala> val rdd = sc.parallelize(list)  
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
//标记为持久化
scala> rdd.cache()
res0: rdd.type = ParallelCollectionRDD[5] at parallelize at <console>:24
//进行操作,计入内存
scala> println(rdd.count())
3
//利用上面缓存进行计算
scala> println(rdd.collect().mkString(","))
Hadoop,Spark,Hive

RDD分区

提高并行度,减小通信开销

分区的原则是使分区的个数尽量等于集群中CPU核心的数目

本地模式下默认为local[N]:N个

手动设置分区:

在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下:sc.textFile(path, partitionNum)/sc.parallelize(array, partitionNum)其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。

强制分区:通过转换操作得到新 RDD 时,直接调用 repartition 方法即可

scala> val  data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)
data: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> data.partitions.size  //显示data这个RDD的分区数量
res2: Int=2
scala> val  rdd = data.repartition(1)  //对data这个RDD进行重新分区
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :26
scala> rdd.partitions.size
res4: Int = 1

实例:

import org.apache.spark.{Partitioner, SparkContext, SparkConf}
//自定义分区类,需要继承org.apache.spark.Partitioner类
class MyPartitioner(numParts:Int) extends Partitioner{
  //覆盖分区数
  override def numPartitions: Int = numParts
  //覆盖分区号获取函数
  override def getPartition(key: Any): Int = {
    key.toString.toInt%10
  }
}
object TestPartitioner {
  def main(args: Array[String]) {
    val conf=new SparkConf()
    val sc=new SparkContext(conf)
    //模拟5个分区的数据
    val data=sc.parallelize(1 to 10,5)
    //根据尾号转变为10个分区,分别写到10个文件
    data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")
  }
}

打印元素

rdd.foreach(println)
rdd.map(println)

注:在集群上的master主机上查看,可以使用collect()将worker数据抓取到master上查看,如果数据太多导致内存溢出,可以采用rdd.take(100).foreach(println)进行查看部分元素

Pair RDD的创建

创建

第一种创建方式:从文件中加载

val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")

第二种创建方式:通过并行集合(数组)创建RDD

scala> val list = List("Hadoop","Spark","Hive","Spark")
list: List[String] = List(Hadoop, Spark, Hive, Spark)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:23

scala> pairRDD.foreach(println)
(Hive,1)
(Spark,1)
(Hadoop,1)
(Spark,1)

标签:10,parallelize,val,RDD,假期,记录,scala,rdd,spark
From: https://www.cnblogs.com/JIANGzihao0222/p/17983259

相关文章

  • 看起不起眼,却能一天加100人的引流方法
    如果正在创业的你因为缺客源而导致生意停滞不前那么接下来,我分享的你要认真听了,这五种引流方法一定能帮到你。——❶截流法就是去别的博主下面截取他的流量,从而将他的粉丝吸引到你的私域。这个方法看似不起眼,做的人却很多不仅能吸引大量人群,还很精-准。比如你吸引创业粉,就去搜索创......
  • 问题:10、为了保证通信的可靠性,按照国际标准,AIS应______频道收发AIS信息。
    问题:10、为了保证通信的可靠性,按照国际标准,AIS应______频道收发AIS信息。A.使用87BB.使用88BC.交替使用87B和88BD.同时使用87B和88B参考答案如图所示......
  • 30V、单 N 沟道NTMFS4C908NAT3G、NTMFS4C910NAT3G功率 MOSFET
    NTMFS4C908N、NTMFS4C910NN沟道MOSFET是30V、单N沟道功率MOSFET,具有低RDS(on)值和低电容,可最大限度地降低导通和驱动器损耗。这些MOSFET采用8-SOFL封装尺寸,设计紧凑。MOSFET符合AEC-Q101标准并具有PPAP功能典型应用包括电池保护、电机控制、电源开关、开关电源、负载开关和电磁......
  • 记录一下跑flink官方案例 table Api 进行实时报告
     按照官方文档下载https://github.com/apache/flink-playgrounds  flink-playgrounds代码并在idea里面打开 按照官方案例在spendReport上面加上相关代码 dockfile  echo"taskmanager.numberOfTaskSlots:30">>/opt/flink/conf/flink-conf.yaml;不然会报资......
  • uniapp打包h5在Android的webview中打开出现localStorage.setitem为null的记录
    使用android直接打开h5的链接,报错localStorage.setItem为null原因是要打开Android的webview的存储设置valwebView=findViewById<WebView>(R.id.webview)valsettings=webView.settingssettings.domStorageEnabled=truesettings.datab......
  • KnightCTF 2024 WEB做题记录
    WEBLeviAckerman题目信息LeviAckermanisarobot! N:B:Thereisnoneedtodobruteforce. Author:saifTarget:http://66.228.53.87:5000/我的解答:签到题,题目提示了robot!直接访问robots.txt得到路径Disallow:/l3v1_4ck3rm4n.html再次访问路径得到flagK......
  • macOS Monterey 12.7.3 (21H1015) 正式版发布,ISO、IPSW、PKG 下载 (安全更新)
    macOSMonterey12.7.3(21H1015)正式版发布,ISO、IPSW、PKG下载1月22日,北京时间今日凌晨,macOSSonoma14.3发布,同时带来了macOSMonterey12.7.3和macOSVentru13.6.4安全更新。本站下载的macOS软件包,既可以拖拽到Applications(应用程序)下直接安装,也可以制作启动U......
  • macOS Monterey 12.7.3 (21H1015) Boot ISO 原版可引导镜像下载
    macOSMonterey12.7.3(21H1015)BootISO原版可引导镜像下载1月22日,北京时间今日凌晨,macOSSonoma14.3发布,同时带来了macOSMonterey12.7.3和macOSVentru13.6.4安全更新。本站下载的macOS软件包,既可以拖拽到Applications(应用程序)下直接安装,也可以制作启动U盘......
  • git 查看某一文件的修改记录
    要查看某一文件的修改记录,可以使用以下命令: ```gitlog<文件路径>``` 例如,要查看文件`index.html`的修改记录,可以使用以下命令: ```gitlogindex.html```  这将显示该文件的所有提交记录,包括提交的作者、日期和提交消息。你可以使用上下箭头浏览记录,并按......
  • P2569 [SCOI2010] 股票交易 题解
    P2569[SCOI2010]股票交易搬运工稍微复杂一点的单调队列优化DP直接设\(f_{i\j}\)表示在第\(i\)天,手上还剩\(j\)个股票时的最大收入。容易写出状态转移方程:\(f_{i\j}=max\{f_{k\t}+(t-j)\cdotw\}\),这样不好看,我们可以拆成这样的形式:\[f_{i\j}=max\{f_{k\t}+t\cdo......