首页 > 其他分享 >假期学习记录11

假期学习记录11

时间:2024-01-23 19:45:57浏览次数:27  
标签:11 String scala 假期 记录 RDD spark Spark rdd

  本次学习学习了常用键值对rdd的操作

常用的键值对RDD转换操作

reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值

(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)

scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)

groupByKey()

groupByKey()的功能是,对具有相同键的值进行分组

比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的结果是:("spark",(1,2))和("hadoop",(3,5))

reduceByKey和groupByKey的区别

reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义

groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作

scala> val words = Array("one","two","two","three","three","three")
words: Array[String] = Array(one, two, two, three, three, three)

scala> val wordPairRDD = sc.parallelize(words).map(word => (word,1))
wordPairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:24

scala> val wordCountsWithReduce = wordPairRDD.reduceByKey(_+_)
wordCountsWithReduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:23

scala> val wordCountsWithGroup = wordPairRDD.groupByKey().map(t => (t._1,t._2.sum))
wordCountsWithReduce: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:23

这样进行后结果一样

scala> wordCountsWithReduce.foreach(println)
(two,2)
(one,1)
(three,3)

scala> wordCountsWithGroup.foreach(println)
(two,2)
(one,1)
(three,3)

groupByKey通信开销比reduceByKey大

常用的键值对RDD转换操作

keys

keys只会把Pair RDD中的key返回形成一个新的RDD

scala> val list = List("Hadoop","Spark","Hive","Spark")
list: List[String] = List(Hadoop, Spark, Hive, Spark)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:23

scala> pairRDD.foreach(println)
(Spark,1)
(Hadoop,1)
(Hive,1)
(Spark,1)

scala> pairRDD.keys
res4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at keys at <console>:24

scala> pairRDD.keys.foreach(println)
Hive
Spark
Spark
Hadoop

values

values只会把Pair RDD中的value返回形成一个新的RDD

scala> pairRDD.values
res6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at values at <console>:24

scala> pairRDD.values.foreach(println)
1
1
1
1

sortByKey()

前提:键值对

sortByKey()的功能是返回一个根据键排序的RDD,默认升序/降序:pairRDD.sortByKey(false)

scala> pairRDD.sortByKey().foreach(println)
(Hive,1)
(Spark,1)
(Spark,1)
(Hadoop,1)

sortByKey()和sortBy()

scala> val d1 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)))
d1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:23

scala> d1.reduceByKey(_+_).sortByKey(false).collect
res10: Array[(String, Int)] = Array((g,21), (f,29), (e,17), (d,9), (c,27), (b,38), (a,42))

scala> val d2 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)))
d2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:23

scala> d2.reduceByKey(_+_).sortBy(_._2,false).collect
res11: Array[(String, Int)] = Array((a,42), (b,38), (f,29), (c,27), (g,21), (e,17), (d,9))

sortBy(_._2,false):通过每一个键值对赋值到_,根据_.2进行排序

mapValues(func)

对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化

scala> pairRDD.mapValues(x => x+1)
res12: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[32] at mapValues at <console>:24

scala> pairRDD.mapValues(x => x+1).foreach(println)
(Hive,2)
(Spark,2)
(Spark,2)
(Hadoop,2)
join

join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:23

scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))
pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at parallelize at <console>:23

scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast)) 

综合实例

题目:给定一组键值对("spark",2),("hadoop",6),("hadoop",4),("spark",6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[39] at parallelize at <console>:23

scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
res15: Array[(String, Int)] = Array((spark,4), (hadoop,5))

标签:11,String,scala,假期,记录,RDD,spark,Spark,rdd
From: https://www.cnblogs.com/JIANGzihao0222/p/17983271

相关文章

  • 金句记录
      Ournewtrailsavingtechniqueoffersdifferenttradeoffsincomparisonwithchronologicalbacktrackingandoftenyieldssuperiorperformance.与按时间顺序回溯相比,我们新的跟踪保存技术提供了不同的权衡,并且通常会产生卓越的性能。   UsingC-......
  • 假期学习记录10
    本次学习学习了RDD的编程概述RDD创建1、从文件系统中加载数据创建RDDSpark采用textFile()方法来从文件系统中加载数据创建RDD该方法把文件的URI作为参数,这个URI可以是:本地文件系统的地址或者是分布式文件系统HDFS的地址或者是AmazonS3的地址等等本地进行加载scala>val......
  • XShell利用X11转发图形化界面教程
    本文仅包含MobaXterm、XShell和PuTTY进行X11转发的教程,其他工具请自行摸索。以下所有步骤均为必要步骤!请确保你都有,主要就是一下几步。ssh免密登录安装xmingXShell/PuTTY设置如果你使用的是无图形化界面的服务器,那么你需要先安装X-windowsudoapt-getinstallxserver-xorgsudo......
  • linux CentOS MobaXterm 通过X11 Forwarding 在本地开启图形可视化窗口
    第一步操作系统安装图形界面X11Forwardingdnfinstallxorg-x11-xauthxorg-x11-fonts-*xorg-x11-font-utilsxorg-x11-fonts-Type1xclock1第二步修改参数,启用X11Forwardingvim/etc/ssh/sshd_config1修改参数X11Forwardingyes和X11UseLocalhostno#AllowAgentForwarding......
  • Android 11+ auto connect wifi issue
    FromAndroid11andabove,whenwegetaandroiddevice.wemayhaveissueswhenweconnectthewifi,fromandroiddevice,afterconnecttotheinternetwifi,itmayshows"Connected,limitedconnection",thisisbecausetheandroidOSwilltry......
  • 记录一下跑flink官方案例 table Api 进行实时报告
     按照官方文档下载https://github.com/apache/flink-playgrounds  flink-playgrounds代码并在idea里面打开 按照官方案例在spendReport上面加上相关代码 dockfile  echo"taskmanager.numberOfTaskSlots:30">>/opt/flink/conf/flink-conf.yaml;不然会报资......
  • uniapp打包h5在Android的webview中打开出现localStorage.setitem为null的记录
    使用android直接打开h5的链接,报错localStorage.setItem为null原因是要打开Android的webview的存储设置valwebView=findViewById<WebView>(R.id.webview)valsettings=webView.settingssettings.domStorageEnabled=truesettings.datab......
  • KnightCTF 2024 WEB做题记录
    WEBLeviAckerman题目信息LeviAckermanisarobot! N:B:Thereisnoneedtodobruteforce. Author:saifTarget:http://66.228.53.87:5000/我的解答:签到题,题目提示了robot!直接访问robots.txt得到路径Disallow:/l3v1_4ck3rm4n.html再次访问路径得到flagK......
  • git 查看某一文件的修改记录
    要查看某一文件的修改记录,可以使用以下命令: ```gitlog<文件路径>``` 例如,要查看文件`index.html`的修改记录,可以使用以下命令: ```gitlogindex.html```  这将显示该文件的所有提交记录,包括提交的作者、日期和提交消息。你可以使用上下箭头浏览记录,并按......
  • 吊钩电子秤方案主控芯片CSU8RP1185
    在生活中,买菜时常常出现缺斤少两的情况,这种情况多是商家秤有很大问题,往往消费者是最吃亏的,这种情况下,我们最好是带个吊钩电子秤,测量菜的重量,有问题直接拨打举报电话举报商家,使商家得到应有的惩罚。话说回来,重点在于吊钩电子秤,这种小型的电子秤,市面上有的生产商有很多,知名......