首页 > 其他分享 >SparkCore系列(四)函数大全

SparkCore系列(四)函数大全

时间:2022-08-26 17:23:10浏览次数:64  
标签:map 函数 SparkCore collect foreach print println data 大全

有了上面三篇的函数,平时开发应该问题不大了。
这篇的主要目的是把所有的函数都过一遍,深入RDD的函数

RDD函数大全

数据准备
        val sparkconf = new SparkConf().setAppName("test_Spark_sql").setMaster("local[2]")
        val spark = SparkSession.builder().config(sparkconf).config("spark.driver.host", "localhost").getOrCreate().sparkContext
        spark.setCheckpointDir("/data/checkpoint/")
        val data = spark.makeRDD(Array(("C",3),("D",4),("A",11), ("B",2), ("B",1))).persist()
了解函数
        println(data.id)//为RDD起一个ID  默认是1开始+
        data.setName("rdd")//为RDD设置name
        println(data.name)//为RDD起一个name 可以设置名称 未设置打印null
        println(data.+("|"))//显示当前类的部分信息  例如:ParallelCollectionRDD[0] at makeRDD at Test_SensorsData.scala:35
        data.barrier()  //将当前阶段标记为障碍阶段  实验性的功能  可以不用关注
        println(data.toDebugString) //调试信息   (2) ParallelCollectionRDD[0] at makeRDD at Test_SensorsData.scala:35 [Memory Deserialized 1x Replicated]
        println(data.isInstanceOf[java.math.BigDecimal]) //判断是否是这个类型的类
        data.context //返回SparkContext
缓存,chechpoint 分区相关方法
        data.cache()//缓存数据  默认是内存缓存
        data.persist()//缓存数据  默认是内存缓存
        data.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
        data.unpersist()   //去掉缓存
        data.checkpoint()  //进行数据checkpint
        println(data.isCheckpointed)//当前RDD是否有Checkpoint
        println(data.getStorageLevel)//当前存储等级
        println(data.getCheckpointFile.mkString("|")) //打印Checkpoint文件名称
        data.localCheckpoint()//在本地文件进行 Checkpoint
        data.coalesce(1) //RDD重分区   直接合并文件
        data.repartition(1)//RDD重分区   有shuffle操作

        data.partitionBy(new org.apache.spark.HashPartitioner(4)).collect().foreach(print)
        data.partitioner.map(print)//是按照什么方式分区的
        print(data.partitions(0))//获取该RDD的第一个分区
        data.preferredLocations(data.partitions(0))//拿到这个RDD分区的最佳物理位置
        data.dependencies.foreach(println)//依赖关系 org.apache.spark.OneToOneDependency@6e1b9411
统计 取样 判断
        println(data.first())//第一个元素
        data.top(3).foreach(println)

        data.lookup("B").foreach(print)//根据Key查找
        print(data.map(_._2).reduce((x,y)=>(x+y)))//sum函数一样的
        println(data.max())//最大值
        println(data.min())//最小值
        println(data.count())//RDD长度    返回Long类型
        println(data.countByKey())//统计key出现的次数    返回Map类型
        println(data.countByValue())//统计key+value出现的次数    返回Map类型
        data.keys.collect().foreach(print)//拿到所有的key  不去重
        data.values.collect().foreach(print)//输出所有的value值

        data.randomSplit(Array(0.2,0.8)).apply(0).collect().map(println)//随即切分RDD  这个感觉没有啥实际意义

        data.collect().map(println) //将RDD的数据收集到Driver端
        val f : PartialFunction[Int,Boolean] = {case 1 => true case 2 => true case _ => false}
        data.map(x=>x._2).collect(f).foreach(x=>print(x+"|"))   //参数变化
        data.collectAsMap().foreach(print)//将数据集收成Map
        data.collectAsync().get().foreach(print)//将数据集收成FutureAction

        data.take(3).foreach(println) //返回多少数据
        data.takeOrdered(3).foreach(println) //先排序  在返回数据
        data.takeSample(true,5).foreach(println)//随机取样
        println(data.sample(true,0.5).collect().length)//是否允许多次采样|采样比例  随机采样函数
        println(data.sampleByKey(false,Map("A"->0.2,"B"->0.4,"C"->0.2,"D"->0.2)).collect().length)//是否允许多次采样|后面的KEY必须枚举  我要知道key枚举,还算个毛
        println(data.sampleByKeyExact(false,Map("A"->0.2,"B"->0.4,"C"->0.2,"D"->0.2)).collect().length)//是否允许多次采样|后面的KEY必须枚举  我要知道key枚举,还算个毛

        println(data.countApprox(1000,0.1d))//近似统计功能 第一个数字是任务统计时长 第二个数字是精度越小精度越高(0-1)之间
        println(data.countByKeyApprox(1000,0.1d))//统计key出现的次数    第一个数字是任务统计时长 第二个数字是精度越小精度越高(0-1)之间
        println(data.countByValueApprox(1000,0.35))//value近似统计功能 第一个数字是任务统计时长 第二个数字是精度越小精度越高(0-1)之间
        println(data.countApproxDistinct(0.2))//近似统计功能  默认精度0.5
        println(data.countApproxDistinct(31,54))//近似统计功能  第一个参数:4<=第二个参数<=32之间    第二个参数:0<=第二个参数<=32之间
        println(data.countApproxDistinctByKey(0.3))//近似统计功能  默认精度0.5  必须大于0.000017
        println(data.countApproxDistinctByKey(0.3,null))//近似统计功能  默认精度0.5  必须大于0.000017  第二个参数partitioner
        println(data.countApproxDistinctByKey(4,7,null))//近似统计功能  第一个参数:4<=第二个参数<=32之间    第二个参数:0<=第二个参数<=32之间  第三个参数partitioner
        println(data.countApproxDistinctByKey(4,7))//近似统计功能  第一个参数:精度    第二个参数:返回RDD的分区数

        println(data.isEmpty())     //当前RDD是否为空
        print(data.toJavaRDD().collect().toString)//转换成javaRDD
        println(data.toLocalIterator.take(1))//转换成迭代器
排序
        data.sortBy(x=>x._1).collect().foreach(println)//按照给定的字段排序
        data.sortByKey().collect().foreach(println)//按照key排序  数据必须是key value格式的数据 不然会报错
转换操作
        data.glom().take(1).map(_.map(print))//把一个分区的所有的元素放到一个RDD[Array]
        data.map(x=>x).collect().foreach(println)  //map循环
        def mapValue (y:Int) = {y+1}
        data.mapValues(mapValue).collect().foreach(print)// value 转换函数
        data.mapPartitions(x=>x.map(x=>(x._1,x._2+1))).collect().foreach(println)  //每一次循环处理一个Partition
        data.mapPartitionsWithIndex((index, data)=>data.map(x=>(index,x._1,x._2+1)).toIterator).collect().foreach(println)//每一次循环处理一个Partition带下标
        data.pipe("cat").collect().foreach(println)  //可以执行外部命令的  暂时没有看懂  方法先记下
        data.filter(_._2>5).collect().map(println)//条件过滤

        data.flatMap(x=>Array(x._1,x._2)).collect().map(println)//一行变多行
        data.flatMapValues(x=>Array(x,x,x)).collect().foreach(print)//把value 展开
        println(data.map(x=>x._2).fold(0)((c1, c2) => {c1 + c2}))//聚合运算
        data.foreachPartition(x=>x.take(100).foreach(print))//对每个分区都进行这个操作  可以写数据库
join
        println(data.++(data))//sql中的unionAll操作   不去重
        data.union(data).collect().map(println)//union all
        println(data.cartesian(data))//两个RDD的笛卡尔积
        data.intersection(data.filter(_._2==2)).collect().map(print)//两个RDD交集
        data.intersection(data.filter(_._2==2),2).collect().map(print)//两个RDD交集
        data.subtract(data.filter(_._2.equals(1))).collect().map(println) //left anti
        data.subtract(data.filter(_._2.equals(1)),4).collect().map(println) //left anti  第二个参数分区数
        import org.apache.spark.HashPartitioner
        data.fullOuterJoin(data.filter(_._2 ==1)).collect().foreach(print)// sqlboy 应该能理解 full join
        data.fullOuterJoin(data.filter(_._2 ==1),4).collect().foreach(print)// 上同 分区
        data.fullOuterJoin(data.filter(_._2 ==1),new HashPartitioner(4)).collect().foreach(print)// 上同 分区
        data.leftOuterJoin(data).collect().foreach(print)// left join  第二个RDD的解过用Some包含起来 (B,(2,Some(2)))
        data.leftOuterJoin(data,4).collect().foreach(print)// 上同 分区
        data.leftOuterJoin(data,new HashPartitioner(4)).collect().foreach(print)// 上同 分区
        data.join(data).collect().foreach(print)//join
        data.join(data,4).collect().foreach(print)//上同 分区
        data.join(data,new HashPartitioner(4)).collect().foreach(print)//上同 分区
        data.rightOuterJoin(data).collect().foreach(print) // right join  第一个RDD的解过用Some包含起来 (B,(2,Some(2)))
        data.rightOuterJoin(data,4).collect().foreach(print) // 上同 分区
        data.rightOuterJoin(data,new HashPartitioner(4)).collect().foreach(print) // 上同 分区
        data.subtractByKey(data.filter(_._2==1)).collect().foreach(print)//left anti
        data.subtractByKey(data.filter(_._2==1),4).collect().foreach(print)//上同 分区
        data.subtractByKey(data.filter(_._2==1),new HashPartitioner(4)).collect().foreach(print)//上同 分区
        //------------------非正常join----------
        data.zip(data).collect().map(println)//两个RDD进行压缩  第一个和第一个合并
        data.zipWithIndex().collect().map(print)//带下标的压缩
        data.zipWithUniqueId().collect().map(print)//带下标的压缩
        data.cogroup(data).collect().map(print)//根据key 同一个rdd放到一个CompactBuffer  不同rdd元组
        data.cogroup(data,2).collect().map(print)//上同  分区个数
        data.cogroup(data,new HashPartitioner(2)).collect().map(print)//上同  分区个数
        data.cogroup(data.filter(x=>(x._2==1 || x._2==2)),data.filter(x=>(x._2==1 ))).collect().map(print)//三个RDD
        data.cogroup(data.filter(x=>(x._2==1 || x._2==2)),data.filter(x=>(x._2==1 )),4).collect().map(print)//上同  分区个数
        data.cogroup(data.filter(x=>(x._2==1 || x._2==2)),data.filter(x=>(x._2==1 )),new HashPartitioner(4)).collect().map(print)//上同  分区个数
        data.cogroup(data,data,data).collect().map(print)//四个RDD
        data.cogroup(data,data,data,4).collect().map(print)//上同  分区个数
        data.cogroup(data,data,data,new HashPartitioner(4)).collect().map(print)//上同  分区个数
文件操作
        data.saveAsTextFile("/data/dd")//保存文件 文本格式 在里面还是(A,11)
        data.saveAsTextFile("/data", classOf[org.apache.hadoop.io.compress.CompressionCodec])//保存文件  压缩
        data.saveAsObjectFile("")//保存文件格式  SequenceFile格式
        //--------NewAPI------
        import org.apache.hadoop.mapred.JobConf
        val jobConf = new JobConf()
        jobConf.setOutputKeyClass(classOf[java.lang.String])
        jobConf.setOutputValueClass(classOf[java.lang.Integer])
        jobConf.set("mapred.output.dir", "/data/dd/")
        data.saveAsNewAPIHadoopDataset(jobConf)
        //必须这个类下面的包才行  org.apache.hadoop.mapred.TextOutputFormat 这个不行
        data.map(x=>(java.lang.String.valueOf(x._1),java.lang.Integer.valueOf(x._2))).saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[String,Integer]]("/data/dd/")
        data.saveAsNewAPIHadoopFile("/data/dd/",classOf[java.lang.String],classOf[java.lang.Integer],classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[java.lang.String,java.lang.Integer]])
        //--------OldAPI------
        val conf = new JobConf()
        conf.set("mapreduce.output.fileoutputformat.outputdir","/data/dd/")
        conf.setOutputKeyClass(classOf[java.lang.String])
        conf.setOutputValueClass(classOf[java.lang.Integer])
        conf.setOutputFormat(classOf[org.apache.hadoop.mapred.TextOutputFormat[java.lang.String,java.lang.Integer]])
        conf.setOutputCommitter(classOf[org.apache.hadoop.mapred.FileOutputCommitter])
        data.saveAsHadoopDataset(conf)
        data.saveAsHadoopFile("/data/dd/",classOf[java.lang.String],classOf[java.lang.Integer],classOf[org.apache.hadoop.mapred.TextOutputFormat[java.lang.String,java.lang.Integer]])
        data.map(x=>(java.lang.String.valueOf(x._1),java.lang.Integer.valueOf(x._2))).saveAsHadoopFile[org.apache.hadoop.mapred.TextOutputFormat[java.lang.String,java.lang.Integer]]("/data/dd/")
RDD 聚合函数
        //--------聚合后还是RDD------
        data.distinct()//所有字段去重 方法  背后是reduceBykey
        data.distinct(4)//reduceBykey的时候 使用第二个参数
        data.groupBy(_._1).collect().map(print)//把这一列作为新的列,然后groupby (D,CompactBuffer((D,4))) (A,CompactBuffer((A,11)))
        def get2(tuple2:Tuple2[String,Int]): String = tuple2._1
        data.groupBy(get2 _,4).collect().map(print)//把这一列作为新的列,然后groupby 按照四个分区
        data.keyBy(get2).collect().map(print)//新建一列key
        data.aggregateByKey(0)(_+_,_+_).collect().map(print)//根据key聚合 第一个函数partition内聚合  第二个函数partition之间聚合
        data.aggregateByKey(0,4)(_+_,_+_).collect().map(print)//上同  分区个数
        data.combineByKey(x=>List[Int](x),(x:List[Int],y:Int)=>x.::(y),(x:List[Int],y:List[Int])=>x.:::(y)).collect().foreach(println)//聚合函数 分区形式的总是报错
        data.combineByKey(x=>x,(x:Int,y:Int)=>x+y,(x:Int,y:Int)=>x+y).collect().foreach(println)
        data.combineByKeyWithClassTag(x=>List[Int](x),(x:List[Int],y:Int)=>x.::(y),(x:List[Int],y:List[Int])=>x.:::(y)).collect().foreach(println)//聚合函数 分区形式的总是报错
        data.combineByKeyWithClassTag(x=>x,(x:Int,y:Int)=>x+y,(x:Int,y:Int)=>x+y).collect().foreach(println)
        data.foldByKey(0)(_+_).collect().foreach(print)//分区内和分区间的都是用这个聚合方式
        data.foldByKey(0,new HashPartitioner(4))(_+_).collect().foreach(print)//上同  分区个数
        data.foldByKey(0,4)(_+_).collect().foreach(print)//上同  分区个数
        data.groupByKey.collect().foreach(print) //group by key  value放到一个CompactBuffer里面
        data.groupByKey(4).collect().foreach(print) //上同 分区
        data.groupByKey(new HashPartitioner(4)).collect().foreach(print) //上同 分区
        data.groupWith(data).collect().foreach(print)//groupByKey + join (B,(CompactBuffer(2, 1),CompactBuffer(2, 1)))
        data.groupWith(data,data).collect().foreach(print)//上同 三个RDD
        data.groupWith(data,data,data).collect().foreach(print)//上同 四个RDD
        data.reduceByKey(_+_).collect().foreach(print)//把value聚合
        data.reduceByKey(_+_,4).collect().foreach(print)//上同 分区
        data.reduceByKey(new HashPartitioner(4),_+_).collect().foreach(print)//上同 分区
        //--------聚合后是非RDD------
        def combOp(c1: Int, c2: Int): Int = {c1 + c2}
        println(data.map(x=>x._2).treeAggregate(0)(combOp,combOp))  //以树的方式合并这个RDD   先根据树的深度预聚合,后面在聚合 第一个参数初始化数字
        println(data.map(x=>x._2).treeReduce(combOp,2))//以树的方式合并这个RDD   先根据树的深度预聚合,后面在聚合
        println(data.map(_._2).aggregate(0)((x,init)=>(x+init),(par1,par2)=>(par1+par2)))
        data.reduceByKeyLocally(_+_).foreach(print)//客户端聚合后   直接把map结果发送给master  这个一般还是不用的好

标签:map,函数,SparkCore,collect,foreach,print,println,data,大全
From: https://www.cnblogs.com/wuxiaolong4/p/16628274.html

相关文章

  • python3 函数 定义函数与切片
     如果我们要计算一个圆的面积,就是3.14*r*r,如果每次就算,则每次都要写一遍,就很麻烦,所以有了函数,我们就可以通过调用函数的方法,直接使用就行了。 这里我们可以访问 ......
  • MySQL数据库工具-SQLYog快捷键大全
    Ctrl+M  创建一个新的连接Ctrl+N  使用当前设置新建连接Ctrl+F4  断开当前连接对象浏览器F5  刷新对象浏览器(默认)Ctrl+B  设置焦点于对象浏览器SQL窗......
  • 如何把thinkphp5的项目迁移到阿里云函数计算来应对流量洪峰?
    原文链接:https://developer.aliyun.com/article/9827461.为什么要迁移到阿里云函数?我的项目是一个节日礼品领取项目,过节的时候会有短时间的流量洪峰。平时访问量很低。......
  • Lazars常用函数
    var i: Integer; Row: String; Parts: TStringArray; S1, S2, S3, S4: String;begin Row :=  '51,40,45,44,44,40,'; Parts......
  • 使用函数计算自定义运行时快速部署一个 SpringBoot 项目 | 文末有礼
    作者:谱一段风华笔墨什么是函数计算阿里云函数计算FC是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为您准......
  • 【C标准库】详解strerror函数
    创作不易,感谢支持strerror头文件:string.h描述:strerror()函数接受一个参数:errnum,它是一个表示错误代码的整数值。此函数将错误代码转换为说明错误的合适字符串指针并返......
  • 向量距离与相似度函数
    假设当前有两个nn维向量xx和yy (除非特别说明,本文默认依此写法表示向量),可以通过两个向量之间的距离或者相似度来判定这两个向量的相近程度,显然两个向量之间距离越小,相似......
  • python中常见的几个函数
    functionuselen()用来求元组利润表或者字符串等的长度str()将数据转化成字符串类型......
  • vue3 基础-生命周期函数
    在vue中,生命周期函数可理解为"在某个时刻,会自动执行的函数".先直观感受一下图示.一共就八个:<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-......
  • js-IIFE-即使调用的函数表达式
    将函数矮化成表达式,一次性函数varfoo=function(){ console.log(2)}();+function(){ console.log(2)}();-function(){ console.log(2)}();!function(){......