RDD操作
对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用,转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作;
常用的RDD转换操作,总结如下
filter(func)操作:筛选出满足函数func的元素,并返回一个新的数据集
scala> val lines =sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt)
scala> val linesWithSpark=lines.filter(line => line.contains("Spark"))
map(func)操作:map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集
scala> data=Array(1,2,3,4,5)
scala> val rdd1= sc.parallelize(data)
scala> val rdd2=rdd1.map(x=>x+10)
另一个实例:
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val words=lines.map(line => line.split(" "))
flatMap(func)操作:拍扁操作
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val words=lines.flatMap(line => line.split(" "))
groupByKey()操作:应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集;
reduceByKey(func)操作:应用于(K,V)键值对的数据集返回新(K, V)形式数据集,其中每个值是将每个key传递到函数func中进行聚合后得到的结果
RDD持久
Spark RDD采用惰性求值的机制,但是每次遇到行动操作都会从头开始执行计算,每次调用行动操作都会触发一次从头开始的计算,这对于迭代计算而言代价是很大的,迭代计算经常需要多次重复使用同一组数据:
scala> val list = List("Hadoop","Spark","Hive")
scala> val rdd = sc.parallelize(list)
scala> println(rdd.count()) //行动操作,触发一次真正从头到尾的计算
scala> println(rdd.collect().mkString(",")) //行动操作,触发一次真正从头到尾的计算
可以通过持久化(缓存)机制避免这种重复计算的开销,可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用;
persist()的圆括号中包含的是持久化级别参数,persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容;persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上;一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY),同时可以使用unpersist()方法手动地把持久化的RDD从缓存中移除。
针对上面的实例,增加持久化语句以后的执行过程如下:
scala> val list = List("Hadoop","Spark","Hive")
scala> val rdd = sc.parallelize(list)
scala> rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用