本次学习学习了RDD的编程概述
RDD创建
1、从文件系统中加载数据创建RDD
Spark采用textFile()方法来从文件系统中加载数据创建RDD该方法把文件的URI作为参数,这个URI可以是:本地文件系统的地址或者是分布式文件系统HDFS的地址或者是Amazon S3的地址等等
本地进行加载
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt") lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/wordcount/word.txt MapPartitionsRDD[1] at textFile at <console>:23
2、通过并行集合(数组)创建RDD
可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
scala> val array = Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
RDD操作
转换操作
计算轨迹,不进行计算
操作 |
含义 |
filter(func) |
筛选出满足函数func的元素,并返回一个新的数据集 |
map(func) |
将每个元素传递到函数func中,并将结果返回为一个新的数据集 |
flatMap(func) |
与map()相似,但每个输入元素都可以映射到0或多个输出结果 |
groupByKey() |
应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集 |
reduceByKey(func) |
应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果 |
行动操作
进行计算,从头开始
操作 |
含义 |
count() |
返回数据集中的元素个数 |
collect() |
以数组的形式返回数据集中的所有元素 |
first() |
返回数据集中的第一个元素 |
take(n) |
以数组的形式返回数据集中的前n个元素 |
reduce(func) |
通过函数func(输入两个参数并返回一个值)聚合数据集中的元素 |
foreach(func) |
将数据集中的每个元素传递到函数func中运行 |
惰性机制:不遇到行动操作之前不进行计算,前面转换操作记录要干的事情,遇到行动操作就从头开始做
持久化
可以通过持久化(缓存)机制避免这种重复计算的开销
可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化持久化后的
RDD将会被保留在计算节点的内存中被后面的行动操作重复使用
persist()的圆括号中包含的是持久化级别参数:
1、persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容
2、persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上
3、一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)
4、可以使用unpersist()方法手动地把持久化的RDD从缓存中移除
scala> val list = List("Hadoop","Spark","Hive") list: List[String] = List(Hadoop, Spark, Hive) //转换为RDD scala> val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24 //标记为持久化 scala> rdd.cache() res0: rdd.type = ParallelCollectionRDD[5] at parallelize at <console>:24 //进行操作,计入内存 scala> println(rdd.count()) 3 //利用上面缓存进行计算 scala> println(rdd.collect().mkString(",")) Hadoop,Spark,Hive
RDD分区
提高并行度,减小通信开销
分区的原则是使分区的个数尽量等于集群中CPU核心的数目
本地模式下默认为local[N]:N个
手动设置分区:
在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下:sc.textFile(path, partitionNum)/sc.parallelize(array, partitionNum)其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。
强制分区:通过转换操作得到新 RDD 时,直接调用 repartition 方法即可
scala> val data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2) data: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:24 scala> data.partitions.size //显示data这个RDD的分区数量 res2: Int=2 scala> val rdd = data.repartition(1) //对data这个RDD进行重新分区 rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :26 scala> rdd.partitions.size res4: Int = 1
实例:
import org.apache.spark.{Partitioner, SparkContext, SparkConf} //自定义分区类,需要继承org.apache.spark.Partitioner类 class MyPartitioner(numParts:Int) extends Partitioner{ //覆盖分区数 override def numPartitions: Int = numParts //覆盖分区号获取函数 override def getPartition(key: Any): Int = { key.toString.toInt%10 } } object TestPartitioner { def main(args: Array[String]) { val conf=new SparkConf() val sc=new SparkContext(conf) //模拟5个分区的数据 val data=sc.parallelize(1 to 10,5) //根据尾号转变为10个分区,分别写到10个文件 data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner") } }
打印元素
rdd.foreach(println) rdd.map(println)
注:在集群上的master主机上查看,可以使用collect()将worker数据抓取到master上查看,如果数据太多导致内存溢出,可以采用rdd.take(100).foreach(println)进行查看部分元素
Pair RDD的创建
创建
第一种创建方式:从文件中加载
val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
第二种创建方式:通过并行集合(数组)创建RDD
scala> val list = List("Hadoop","Spark","Hive","Spark") list: List[String] = List(Hadoop, Spark, Hive, Spark)标签:10,parallelize,val,RDD,假期,记录,scala,rdd,spark From: https://www.cnblogs.com/JIANGzihao0222/p/17983259
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:23
scala> pairRDD.foreach(println)
(Hive,1)
(Spark,1)
(Hadoop,1)
(Spark,1)