本次学习学习了常用键值对rdd的操作
常用的键值对RDD转换操作
reduceByKey(func)
reduceByKey(func)的功能是,使用func函数合并具有相同键的值
(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println) (Spark,2) (Hive,1) (Hadoop,1)
groupByKey()
groupByKey()的功能是,对具有相同键的值进行分组
比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的结果是:("spark",(1,2))和("hadoop",(3,5))
reduceByKey和groupByKey的区别
reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义
groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作
scala> val words = Array("one","two","two","three","three","three") words: Array[String] = Array(one, two, two, three, three, three) scala> val wordPairRDD = sc.parallelize(words).map(word => (word,1)) wordPairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:24 scala> val wordCountsWithReduce = wordPairRDD.reduceByKey(_+_) wordCountsWithReduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:23 scala> val wordCountsWithGroup = wordPairRDD.groupByKey().map(t => (t._1,t._2.sum)) wordCountsWithReduce: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:23
这样进行后结果一样
scala> wordCountsWithReduce.foreach(println) (two,2) (one,1) (three,3) scala> wordCountsWithGroup.foreach(println) (two,2) (one,1) (three,3)
groupByKey通信开销比reduceByKey大
常用的键值对RDD转换操作
keys
keys只会把Pair RDD中的key返回形成一个新的RDD
scala> val list = List("Hadoop","Spark","Hive","Spark") list: List[String] = List(Hadoop, Spark, Hive, Spark) scala> val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:24 scala> val pairRDD = rdd.map(word => (word,1)) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:23 scala> pairRDD.foreach(println) (Spark,1) (Hadoop,1) (Hive,1) (Spark,1) scala> pairRDD.keys res4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at keys at <console>:24 scala> pairRDD.keys.foreach(println) Hive Spark Spark Hadoop
values
values只会把Pair RDD中的value返回形成一个新的RDD
scala> pairRDD.values res6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at values at <console>:24 scala> pairRDD.values.foreach(println) 1 1 1 1
sortByKey()
前提:键值对
sortByKey()的功能是返回一个根据键排序的RDD,默认升序/降序:pairRDD.sortByKey(false)
scala> pairRDD.sortByKey().foreach(println)
(Hive,1)
(Spark,1)
(Spark,1)
(Hadoop,1)
sortByKey()和sortBy()
scala> val d1 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9))) d1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:23 scala> d1.reduceByKey(_+_).sortByKey(false).collect res10: Array[(String, Int)] = Array((g,21), (f,29), (e,17), (d,9), (c,27), (b,38), (a,42)) scala> val d2 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9))) d2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:23 scala> d2.reduceByKey(_+_).sortBy(_._2,false).collect res11: Array[(String, Int)] = Array((a,42), (b,38), (f,29), (c,27), (g,21), (e,17), (d,9))
sortBy(_._2,false):通过每一个键值对赋值到_,根据_.2进行排序
mapValues(func)
对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化
scala> pairRDD.mapValues(x => x+1) res12: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[32] at mapValues at <console>:24 scala> pairRDD.mapValues(x => x+1).foreach(println) (Hive,2) (Spark,2) (Spark,2) (Hadoop,2)
join
join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5))) pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:23 scala> val pairRDD2 = sc.parallelize(Array(("spark","fast"))) pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at parallelize at <console>:23 scala> pairRDD1.join(pairRDD2).foreach(println) (spark,(1,fast)) (spark,(2,fast))
综合实例
题目:给定一组键值对("spark",2),("hadoop",6),("hadoop",4),("spark",6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[39] at parallelize at <console>:23 scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect() res15: Array[(String, Int)] = Array((spark,4), (hadoop,5))标签:11,String,scala,假期,记录,RDD,spark,Spark,rdd From: https://www.cnblogs.com/JIANGzihao0222/p/17983271