首页 > 其他分享 >03-RDD算子

03-RDD算子

时间:2024-08-21 17:26:15浏览次数:5  
标签:03 parallelize val RDD println 算子 sc new SparkConf

1. 转换类算子

1.1 基本转换类算子

1、Map

新的RDD中的每个元素与旧的RDD的每个元素是一对一的关系。

object CH_0201_RDDAPI_Map {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Map")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val res = sc.parallelize(List("zhangsan", "lisi", "wangwu"))

    res.map(name => (name,Random.nextInt(10))).foreach(println)
  }
}

2、flatMap

新的RDD中每个元素与旧的RDD中的每个元素是一对多的关系。

object CH_0202_RDDAPI_FlatMap {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("FlatMap")
    val sparkContext = new SparkContext(sparkConf)
    sparkContext.setLogLevel("ERROR")

    val resource = sparkContext.parallelize(1 to 10)
    resource.flatMap(e => 1 to e).foreach(println)
  }
}

3、fiter

对原有的RDD的每个元素过滤产生新的RDD,新RDD的中的元素与旧的RDD的元素是一对一的关系。

object CH_0201_RDDAPI_Filter {
  def main(args: Array[String]): Unit = {
    val spConf = new SparkConf().setMaster("local").setAppName("filter")
    val sparkContext = new SparkContext(spConf)

    val rdd = sparkContext.parallelize(List(2, 9, 10, 6, 2, 3, 4))
    //    val filterRDD = rdd.filter((x: Int) => {
    //      x > 2
    //    }).collect()
    val filterRDD = rdd.filter((_ > 2)).collect()

    filterRDD.foreach(println)
  }
}

4、mapPartitions

把每个分区当作整体,对分区中数据进行操作。

object CH_0201_RDDAPI_MapPartitions {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("MapPartitions")
    val sparkContext = new SparkContext(sparkConf)
    sparkContext.setLogLevel("ERROR")

    val resource = sparkContext.parallelize(1 to 10)

    resource.mapPartitions((e) => {
      e.map(_ * 2)
    }).foreach(println)
  }
}

5、mapPartitionsWithIndex

把每个分区当作整体,对分区中数据进行操作,并带有分区索引的信息

object CH_0201_RDDAPI_MapPartitionsWithIndex {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MapPartitionsWithIndex")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val data: RDD[Int] = sc.parallelize(1 to 10,2)

    val res03: RDD[String] = data.mapPartitionsWithIndex(
      (pindex, piter) => {
        new Iterator[String] {
          println(s"---$pindex--conn--mysql------")
          override def hasNext = if (piter.hasNext == false) {
            println(s"---$pindex---close--mysql"); false
          } else true
          override def next() = {
            val value: Int = piter.next()
            println(s"---$pindex--select $value-----")
            value + "selected"
          }
        }
      }
    )
    res03.foreach(println)
  }
}

6、coalesce与repartition

coalesce与repartition表示重分区,coalesce接收两个参数,第一个参数表示重分区数,第二个参数表示是否需要shuffle。当分区数又少变多时,把shuffle设置为false,则原有的数据不会被发送的新的分区,此时需要将shuffle设置为true,当分区数又多变少时,把shuffle设置为false,则不会产生shuffle,数据通过IO移动到对应的分区。而Repartition就是coalesce的把参数设置为true的变形。

object CH_0201_RDDAPI_Repartition {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val data: RDD[Int] = sc.parallelize(1 to 10,5)

    println(s"data:${data.getNumPartitions}")

    //观察分区后,每个元素去到了哪个分区
    val data1: RDD[(Int, Int)] = data.mapPartitionsWithIndex(
      (pi, pt) => {
        pt.map(e => (pi, e))
      }
    )

    //repartition与coleace
//    val value = data1.repartition(8)
//    val value = data1.coalesce(8, true)
//    val value = data1.coalesce(8, false) //分区数由少变多,把shuffle设置为false,则不会把数据发送到新的分区
    val value = data1.coalesce(3, false) //分区数由多变少,把shuffle设置为false,则不会产生shuffle,数据通过IO移动到对应的分区
    val res: RDD[(Int, (Int, Int))] = value.mapPartitionsWithIndex(
      (pi, pt) => {
        pt.map(e => (pi, e))
      }
    )

    println(s"data:${res.getNumPartitions}")

    data1.foreach(println)
    println("---------------")
    res.foreach(println)

  }
}

7、union

取两个RDD的并集

object CH_0202_RDDAPI_Union {

  def main(args: Array[String]): Unit = {
    val spConf = new SparkConf().setMaster("local").setAppName("filter")
    val sparkContext = new SparkContext(spConf)

    val rdd1 = sparkContext.parallelize(List(2, 9, 10, 6))
    val rdd2 = sparkContext.parallelize(List(2, 3, 4))
    println(rdd1.partitions.size)
    println(rdd2.partitions.size)

    val res = rdd1.union(rdd2)
    println(res.partitions.size)

    res.foreach(println)

    while (true) {}
  }

}

8、intersection

取两个RDD的交集

object CH_0201_RDDAPI_Intersection {
  def main(args: Array[String]): Unit = {
    val spConf = new SparkConf().setMaster("local").setAppName("filter")
    val sparkContext = new SparkContext(spConf)

    val rdd1 = sparkContext.parallelize(List(2, 9, 10, 6))
    val rdd2 = sparkContext.parallelize(List(2, 3, 4))

    //取交集
    val value = rdd1.intersection(rdd2)
    value.foreach(println)

  }
}

9、substract

取两个RDD的差集

object CH_0201_RDDAPI_Subtract {

  def main(args: Array[String]): Unit = {
    val spConf = new SparkConf().setMaster("local").setAppName("filter")
    val sparkContext = new SparkContext(spConf)

    val rdd1 = sparkContext.parallelize(List(2, 9, 10, 6))
    val rdd2 = sparkContext.parallelize(List(2, 3, 4))

    //取差集
    val value = rdd1.subtract(rdd2)
    value.foreach(println)
  }

}

10、distinct

去重

object CH_0201_RDDAPI_Distinct {
  def main(args: Array[String]): Unit = {
    val spConf = new SparkConf().setMaster("local").setAppName("filter")
    val sparkContext = new SparkContext(spConf)

    val rdd = sparkContext.parallelize(List(2, 9, 10, 6, 2, 9, 4))
    //    val distinctRDD = rdd.map((_, 1)).reduceByKey(_ + _).map(_._1)
    val distinctRDD = rdd.distinct() //等价于上面的的写法
    distinctRDD.foreach(println)
  }
}

11、cartesian

笛卡尔积,cartesian是窄依赖,分布在不同机器上的数据集汇集到同一台机器有两种方式,一种是shuffle,shuffle将数据集的每条记录通过分区器发送到不同的分区中去,另外一种则是通过I/O的形式,将数据集全部拷贝到同一台机器。cartesian属于第二种,中间只有IO的过程,没有分区,所以是窄依赖。

object CH_0201_RDDAPI_Cartesian {

  def main(args: Array[String]): Unit = {
    val spConf = new SparkConf().setMaster("local").setAppName("filter")
    val sparkContext = new SparkContext(spConf)

    val rdd1 = sparkContext.parallelize(List(2, 9, 10, 6))
    val rdd2 = sparkContext.parallelize(List(2, 3, 4))

    //cartesian是窄依赖,分布在不同机器上的数据集汇集到同一台机器有两种方式,一种是shuffle
    //shuffle将数据集的每条记录通过分区器发送到不同的分区中去,另外一种则是通过I/O的形式
    //将数据集全部拷贝到同一台机器。cartesian属于第二种,中间只有IO的过程,没有分区,所以
    //是窄依赖。
    val res = rdd1.cartesian(rdd2)

    res.foreach(println)
  }

}

1.2 键值转换类算子

1、groupByKey

相同的key分组聚合

object CH_0201_RDDAPI_GroupByKey {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val data: RDD[(String, Int)] = sc.parallelize(List(
      ("zhangsan", 234),
      ("zhangsan", 5667),
      ("zhangsan", 343),
      ("lisi", 212),
      ("lisi", 44),
      ("lisi", 33),
      ("wangwu", 535),
      ("wangwu", 22)
    ))

    val groupByKeyRDD = data.groupByKey()
    groupByKeyRDD.foreach(println)
  }

}

2、reduceByKey

对于(k,v)键值对的数据,相同的key的值进行reduce得到新的值,与原来的key组合行成新的(k,v)键值对。

object CH_0201_RDDAPI_ReduceByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
    //求并集
    val rdd3 = rdd1 union rdd2
    //按key进行分组
    val rdd5 = rdd3.reduceByKey(_ + _)
    rdd5.foreach(println)
  }
}

3、sortByKey

按照相同的key进行排序

object CH_0201_RDDAPI_sortByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
    //求并集
    val rdd3 = rdd1 union rdd2
    //按key进行分组
    val rdd4 = rdd3.reduceByKey(_ + _)
    rdd4.sortByKey(false).foreach(println)
  }
}

4、sortByKey

按照指定的值进行排序

object CH_0201_RDDAPI_SortBy {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
    //求并集
    val rdd3 = rdd1 union rdd2
    //按key进行分组
    val rdd4 = rdd3.reduceByKey(_ + _)
    rdd4.sortBy(e => e._2,false).foreach(println)
  }

}

5、mapValues

对K,V键值对的RDD中的V进行操作,生成新的RDD

object CH_0201_RDDAPI_MapValues {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val sourceRDD = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"))
    sourceRDD.map(e=>(e,e.length)).mapValues(num => num * num).foreach(println)
  }
}

6、flatMapValues

对K,V键值对的RDD中的V摊平,并与原有的key组合生成新的RDD

object CH_0201_RDDAPI_FlatMapValues {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val data: RDD[(String, Int)] = sc.parallelize(List(
      ("zhangsan", 234),
      ("zhangsan", 5667),
      ("zhangsan", 343),
      ("lisi", 212),
      ("lisi", 44),
      ("lisi", 33),
      ("wangwu", 535),
      ("wangwu", 22)
    ))

    //1.按照key分组合并
    val groupByKeyRDD = data.groupByKey()

    //2.将groupByKeyRDD用flatMapValues将分组后的数据摊平,并于原来的key组合成新的RDD
    val value1 = groupByKeyRDD.flatMapValues(e => e.iterator)
    value1.foreach(println)
  }

}

7、combineByKey

按照key对指定的值进行组合,合并

object CH_0201_RDDAPI_CombineByKey {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val data: RDD[(String, Int)] = sc.parallelize(List(
      ("zhangsan", 234),
      ("zhangsan", 5667),
      ("zhangsan", 343),
      ("lisi", 212),
      ("lisi", 44),
      ("lisi", 33),
      ("wangwu", 535),
      ("wangwu", 22)
    ))


    val res = data.combineByKey(
      //createCombiner
      (value: Int) => (value, 1),
      (oldVal: (Int, Int), newVal: Int) => (oldVal._1 + newVal, oldVal._2 + 1),
      (v1: (Int, Int), v2: (Int, Int)) => (v1._1 + v2._1, v1._2 + v2._2)
    )

    res.mapValues(e => e._1 / e._2).foreach(println)

  }

}

8、cogroup

对RDD中相同的key进行分组,与groupByKey不同的是,cogroup可以将多个RDD相同的key进行分组,而groupByKey只能将单个的RDD相同的key进行分组

object CH_0201_RDDAPI_Cogroup {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
    //cogroup
    val rdd3 = rdd1.cogroup(rdd2)

    rdd3.foreach(println)

    rdd1.union(rdd2).groupByKey.foreach(println)

    while (true){}
  }
}

9、join、leftOuterJoin、rightOuterJoin、fullOuterJoin

内关联、左外关联、右外关联、全外关联

object CH_0201_RDDAPI_Join {
  def main(args: Array[String]): Unit = {
    val spConf = new SparkConf().setMaster("local").setAppName("filter")
    val sparkContext = new SparkContext(spConf)

    sparkContext.setLogLevel("INFO")

    val rdd1 = sparkContext.parallelize(List(("shanghai", 1900), ("jiangsu", 8000), ("zhejiang", 2000), ("hunan", 3000)))
    val rdd2 = sparkContext.parallelize(List(("shanghai", 2000), ("jiangsu", 30000), ("zhejiang", 1900), ("changde", 9999)))

    //join
    val joinRDD = rdd1.join(rdd2)
    joinRDD.foreach(println)

    //leftOuterJoin
    val leftOuterJoinRDD = rdd1.leftOuterJoin(rdd2)
    leftOuterJoinRDD.foreach(println)

    //rightOuterJoin
    val rightOuterJoinRDD = rdd1.rightOuterJoin(rdd2)
    rightOuterJoinRDD.foreach(println)

    //fullOuterJoin
    val fullOuterJoinRDD = rdd1.fullOuterJoin(rdd2)
    fullOuterJoinRDD.foreach(println)
  }
}

2. 执行类算子

标签:03,parallelize,val,RDD,println,算子,sc,new,SparkConf
From: https://www.cnblogs.com/crispy-bro/p/18365747

相关文章

  • 全球创新药商业化服务平台市场展望:2030年预计达到113660百万美元
    随着全球医药行业的快速发展和创新药物研发的不断涌现,创新药商业化服务平台行业作为支持新药上市和商业化的关键服务领域,其市场前景受到业界广泛关注。据恒州恒思(YHresearch)团队研究的数据显示,2023年全球创新药商业化服务平台市场规模已达到33210百万美元,并预计在未来六年内,该......
  • 深度调研全球催产药品市场:2030年市场规模达到220.8百万美元
    在全球范围内,催产药品市场作为医疗行业的重要组成部分,正日益受到关注。据恒州恒思(YHresearch)团队研究,本报告全面分析了催产药品的发展趋势、主要竞争者、供应链结构、研发进展及法规政策环境,并预测了该行业的未来投资机会与增长点。2023年全球催产药品市场规模大约为148.1百万......
  • No qualifying bean of type 'feign' available: expected at least 1 bean which qua
    问题:刚用低代码平台引入的一个module,但是启动报错Exceptionencounteredduringcontextinitialization-cancellingrefreshattempt:org.springframework.beans.factory.UnsatisfiedDependencyException:Errorcreatingbeanwithname'ServiceImpl'definedinfile[Ser......
  • 038、Vue3+TypeScript基础,使用router.push进行路由跳转并传参
    01、main.js//引入createApp用于创建Vue实例import{createApp}from'vue'//引入App.vue根组件importAppfrom'./App.vue'//引入路由importrouterfrom'./router'constapp=createApp(App);//使用路由app.use(router);//App.vue的根元素id为appapp......
  • 037、Vue3+TypeScript基础,使用router.push进行导航式路由跳转
    01、main.js代码如下://引入createApp用于创建Vue实例import{createApp}from'vue'//引入App.vue根组件importAppfrom'./App.vue'//引入路由importrouterfrom'./router'constapp=createApp(App);//使用路由app.use(router);//App.vue的根元素id为ap......
  • 如何用纯CSS绘制三角形--03
    下拉菜单中的箭头通常用于提示用户点击以展开菜单。CSS三角形实现这个箭头: <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0">......
  • 036、Vue3+TypeScript基础,路由中使用replace不让前进后退
    01、代码如下:<template><divclass="app"><h2class="title">App.Vue路由测试</h2><!--导航区--><divclass="navigate"><router-linkreplaceto="/Home"class="nav......
  • NOI2003 逃学的小孩 题解
    NOI2003逃学的小孩题解传送门。题目简述给定一棵树\(T\),需要选择三个点\(A,B,C\),需要从\(C\)走到\(A,B\)​​的最远距离。(第一段题目是在讲剧情吗。。)前置知识图树树的直径思路简述这题在蓝题(提高+/省选-)中还是比较水的^_^来看看样例吧用瞪眼法(——数学......
  • Android T don't abort background activity starts
    log:2024-08-2015:45:12.457581-1128ActivityTaskManagersystem_processISTARTu0{act=android.intent.action.MAINcat=[android.intent.category.LAUNCHER]flg=0x10000000pkg=acr.browser.lightningcmp=acr.browser.lightning/.Ma......
  • 题解:CF1032B Personalized Cup
    本题题意给一个字符串,将其分成等长度的字符串,但是分的行数不能超过\(5\)行,每行的长度不得超过\(20\)。如果无法等分的,则用*来补足长度。输出在行数最小的前提下,列数最少的一种方案。思路由于字符串范围最多也就\(20\times5\),直接分类讨论即可。ACcode#include<bits/st......