首页 > 其他分享 >RDD(弹性分布式数据集)及常用算子

RDD(弹性分布式数据集)及常用算子

时间:2022-10-31 15:45:27浏览次数:49  
标签:String val RDD conf 算子 println 分布式

RDD(弹性分布式数据集)及常用算子

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据

处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行

计算的集合。

弹性

  • 存储的弹性:内存与磁盘的自动切换;

  • 容错的弹性:数据丢失可以自动恢复;

  • 计算的弹性:计算出错重试机制;

  • 分片的弹性:可根据需要重新分片。

分布式:数据存储在大数据集群不同节点上

数据集:RDD 封装了计算逻辑,并不保存数据

数据抽象:RDD 是一个抽象类,需要子类具体实现

不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在

  • 新的 RDD 里面封装计算逻辑

可分区、并行计算

五大特性:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs
Optionally, a list of preferred locations to compute each split on

image-20221030205745742

基础编程

RDD 创建

从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

val conf = new SparkConf()
.setMaster("local")
.setAppName("spark")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(
 List(1,2,3,4)
)
val rdd2 = sc.makeRDD(
 List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sc.stop()

从外部存储(文件)创建 RDD

val conf = new SparkConf()
.setMaster("local")
.setAppName("spark")
val sc = new SparkContext(conf)
val fileRDD: RDD[String] = sc.textFile("input")
fileRDD.collect().foreach(println)
sc.stop()

RDD 转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value

类型

/**
     * 在Spark所有的操作可以分为两类:
     * 1、Transformation操作(算子)
     * 2、Action操作(算子)
     *
     * 转换算子是懒执行的,需要由Action算子触发执行
     * 每个Action算子会触发一个Job
     *
     * Spark的程序的层级划分:
     * Application --> Job --> Stage --> Task
     *
     * 怎么区分Transformation算子和Action算子?
     * 看算子的返回值是否还是RDD,如果是由一个RDD转换成另一个RDD,则该算子是转换算子
     * 如果由一个RDD得到其他类型(非RDD类型或者没有返回值),则该算子是行为算子
     *
     * 在使用Spark处理数据时可以大体分为三个步骤:
     * 1、加载数据并构建成RDD
     * 2、对RDD进行各种各样的转换操作,即调用转换算子
     * 3、使用Action算子触发Spark任务的执行
     */

map算子

/**
     * map算子:转换算子
     * 需要接受一个函数f
     * 函数f:参数的个数只有一个,类型为RDD中数据的类型 => 返回值类型自己定义
     * 可以将函数f作用在RDD中的每一条数据上,需要函数f必须有返回值,最终会得到一个新的RDD
     * 传入一条数据得到一条数据
     */
	def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo03map")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)


    val linesRDD: RDD[String] = sc.textFile("Spark/data/words.txt")

    linesRDD.map(line => {
      println("执行了map方法")
      line
    }).foreach(println)

    linesRDD.map(line => {
      println("执行了map方法")
      line
    }).foreach(println)

    linesRDD.map(line => {
      println("执行了map方法")
      line
    }).foreach(println)
    linesRDD.map(line => {
      println("执行了map方法")
      line
    }).foreach(println)

    List(1,2,3,4).map(line=>{
      println("List的map方法不需要什么Action算子触发")
      line
    })
    }

flatMap:转换算子

def main(args: Array[String]): Unit = {
    /**
     * flatMap:转换算子
     * 同map算子类似,只不过所接受的函数f需要返回一个可以遍历的类型
     * 最终会将函数f的返回值进行展开(扁平化处理),得到一个新的RDD
     * 传入一条数据 会得到 多条数据
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo04flatMap")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    // 另一种构建RDD的方式:基于Scala本地的集合例如List
    val intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
    intRDD.foreach(println)

    val strRDD: RDD[String] = sc.parallelize(List("java,java,scala", "scala,scala,python", "python,python,python"))

    strRDD.flatMap(_.split(",")).foreach(println)


  }

filter:转换算子

def main(args: Array[String]): Unit = {
    /**
     * filter:转换算子
     * 用于过滤数据,需要接受一个函数f
     * 函数f:参数只有一个,类型为RDD中每一条数据的类型 => 返回值类型必须为Boolean
     * 最终会基于函数f返回的Boolean值进行过滤,得到一个新的RDD
     * 如果函数f返回的Boolean为true则保留数据
     * 如果函数f返回的Boolean为false则过滤数据
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo05filter")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val seqRDD: RDD[Int] = sc.parallelize(1 to 100, 4)
    println(seqRDD.getNumPartitions) // getNumPartitions并不是算子,它只是RDD的一个属性
    //    seqRDD.foreach(println)

    // 将奇数过滤出来
    seqRDD.filter(i => i % 2 == 1).foreach(println)
    // 将偶数过滤出来
    seqRDD.filter(i => i % 2 == 0).foreach(println)

  }

sample:转换算子

def main(args: Array[String]): Unit = {
    /**
     * sample:转换算子
     * 用于对数据进行取样
     * 总共有三个参数:
     * withReplacement:有无放回
     * fraction:抽样的比例(这个比例并不是精确的,因为抽样是随机的)
     * seed:随机数种子
     */
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo06sample")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    stuRDD.sample(withReplacement = false, 0.1).foreach(println)

    // 如果想让每次抽样的数据都一样,则可以将seed进行固定
    stuRDD.sample(withReplacement = false, 0.01, 10).foreach(println)

  }

mapValues:转换算子

def main(args: Array[String]): Unit = {
    /**
     * mapValues:转换算子
     * 同map类似,只不过mapValues需要对KV格式的RDD的Value进行遍历处理
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo07mapValues")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val kvRDD: RDD[(String, Int)] = sc.parallelize(List("k1" -> 1, "k2" -> 2, "k3" -> 3))
    // 对每个Key对应的Value进行平方
    kvRDD.mapValues(i => i * i).foreach(println)
    // 使用map方法实现
    kvRDD.map(kv => (kv._1, kv._2 * kv._2)).foreach(println)
  }

join:转换算子

def main(args: Array[String]): Unit = {

    /**
     * join:转换算子
     * 需要作用在两个KV格式的RDD上,会将相同的Key的数据关联在一起
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo08join")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    // 加载学生数据,并转换成KV格式,以ID作为Key,其他数据作为Value
    val stuKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/students.txt").map(line => {
      val id: String = line.split(",")(0)
      // split 指定分割符切分字符串得到Array
      // mkString 指定拼接符将Array转换成字符串
      val values: String = line.split(",").tail.mkString("|")
      (id, values)
    })

    // 加载分数数据,并转换成KV格式,以ID作为Key,其他数据作为Value
    val scoKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/score.txt").map(line => {
      val id: String = line.split(",")(0)
      val values: String = line.split(",").tail.mkString("|")
      (id, values)
    })

    // join : 内连接
    val joinRDD1: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD)

    //    joinRDD1.foreach(println)

    //    stuKVRDD.leftOuterJoin(scoKVRDD).foreach(println)
    //    stuKVRDD.rightOuterJoin(scoKVRDD).foreach(println)
    stuKVRDD.fullOuterJoin(scoKVRDD).foreach(println)

  }

union:转换算子,用于将两个相类型的RDD进行连接

def main(args: Array[String]): Unit = {
    // union:转换算子,用于将两个相类型的RDD进行连接
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo09union")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    val sample01RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)
    val sample02RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)
    println(s"sample01RDD的分区数:${sample01RDD.getNumPartitions}")
    println(s"sample02RDD的分区数:${sample02RDD.getNumPartitions}")
    // union 操作最终得到的RDD的分区数等于两个RDD分区数之和
    println(s"union后的分区数:${sample01RDD.union(sample02RDD).getNumPartitions}")

    val intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))

    //    sample01RDD.union(intRDD) // 两个RDD的类型不一致无法进行union

    // union 等同于SQL中的union all
    sample01RDD.union(sample02RDD).foreach(println)

    // 如果要进行去重 即等同于SQL中的union 则可以在 union后再进行distinct
    sample01RDD.union(sample02RDD).distinct().foreach(println)

  }

groupBy:按照某个字段进行分组

def main(args: Array[String]): Unit = {
    /**
     * groupBy:按照某个字段进行分组
     */
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo10groupBy")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")

    // 统计班级人数
    stuRDD.groupBy(s => s.split(",")(4)).map(kv => s"${kv._1},${kv._2.size}").foreach(println)
  }

groupByKey:转换算子,需要作用在KV格式的RDD上

 def main(args: Array[String]): Unit = {
    /**
     * groupByKey:转换算子,需要作用在KV格式的RDD上
     */
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo11groupByKey")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    // 使用groupByKey统计班级人数
    // 将学生数据变成KV格式的RDD,以班级作为Key,1作为Value
    val clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))

    val grpRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()

    grpRDD.map(kv => s"${kv._1},${kv._2.size}").foreach(println)
  }

reduceByKey:转换算子,需要作用在KV格式的RDD上,不仅能实现分组,还能实现聚合

def main(args: Array[String]): Unit = {
    /**
     * reduceByKey:转换算子,需要作用在KV格式的RDD上,不仅能实现分组,还能实现聚合
     * 需要接受一个函数f
     * 函数f:两个参数,参数的类型同RDD的Value的类型一致,最终需要返回同RDD的Value的类型一致值
     * 实际上函数f可以看成一个聚合函数
     * 常见的聚合函数(操作):max、min、sum、count、avg
     * reduceByKey可以实现Map端的预聚合,类似MR中的Combiner
     * 并不是所有的操作都能使用预聚合,例如avg就无法实现
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo11groupByKey")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    // 使用reduceByKey统计班级人数
    // 将学生数据变成KV格式的RDD,以班级作为Key,1作为Value
    val clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))

    clazzKVRDD.reduceByKey((i1: Int, i2: Int) => i1 + i2).foreach(println)
    // 简写形式
    clazzKVRDD.reduceByKey((i1, i2) => i1 + i2).foreach(println)
    clazzKVRDD.reduceByKey(_ + _).foreach(println)

  }

aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合

def main(args: Array[String]): Unit = {
    /**
     * aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合
     * 可以弥补reduceByKey无法实现avg操作
     *
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo13aggregateByKey")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    val ageKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), s.split(",")(2).toInt))
    val clazzCntKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))

    // 统计每个班级年龄之和
    val ageSumRDD: RDD[(String, Int)] = ageKVRDD.reduceByKey(_ + _)

    // 统计每个班级人数
    val clazzCntRDD: RDD[(String, Int)] = clazzCntKVRDD.reduceByKey(_ + _)

    // 统计每个班级的平均年龄
    ageSumRDD.join(clazzCntRDD).map {
      case (clazz: String, (ageSum: Int, cnt: Int)) =>
        (clazz, ageSum.toDouble / cnt)
    }.foreach(println)


    /**
     * zeroValue:初始化的值,类型自定义,可以是数据容器
     * seqOp:在组内(每个分区内部即每个Map任务)进行的操作,相当是Map端的预聚合操作
     * combOp:在组之间(每个Reduce任务之间)进行的操作,相当于就是最终每个Reduce的操作
     */

    // 使用aggregateByKey统计班级年龄之和
    ageKVRDD.aggregateByKey(0)((age1: Int, age2: Int) => {
      age1 + age2 // 预聚合
    }, (map1AgeSum: Int, map2AgeSum: Int) => {
      map1AgeSum + map2AgeSum // 聚合
    }).foreach(println)

    // 使用aggregateByKey统计班级人数
    clazzCntKVRDD.aggregateByKey(0)((c1: Int, c2: Int) => {
      c1 + 1 // 预聚合
    }, (map1Cnt: Int, map2Cnt: Int) => {
      map1Cnt + map2Cnt // 聚合
    }).foreach(println)

    // 使用aggregateByKey统计班级的平均年龄
    ageKVRDD.aggregateByKey((0, 0))((t2: (Int, Int), age: Int) => {
      val mapAgeSum: Int = t2._1 + age
      val mapCnt: Int = t2._2 + 1
      (mapAgeSum, mapCnt)
    }, (map1U: (Int, Int), map2U: (Int, Int)) => {
      val ageSum: Int = map1U._1 + map2U._1
      val cnt: Int = map1U._2 + map2U._2
      (ageSum, cnt)
    }).map {
      case (clazz: String, (sumAge: Int, cnt: Int)) =>
        (clazz, sumAge.toDouble / cnt)
    }.foreach(println)



  }

cartesian:转换算子,可以对两个RDD做笛卡尔积

def main(args: Array[String]): Unit = {
    /**
     * cartesian:转换算子,可以对两个RDD做笛卡尔积
     *
     * 当数据重复时 很容易触发笛卡尔积 造成数据的膨胀
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo14cartesian")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val idNameKVRDD: RDD[(String, String)] = sc.parallelize(List(("001", "zs"), ("002", "ls"), ("003", "ww")))
    val genderAgeKVRDD: RDD[(String, Int)] = sc.parallelize(List(("男", 25), ("女", 20), ("男", 22)))

    idNameKVRDD.cartesian(genderAgeKVRDD).foreach(println)

  }

sortBy:转换算子 可以指定一个字段进行排序 默认升序

def main(args: Array[String]): Unit = {
    /**
     * sortBy:转换算子 可以指定一个字段进行排序 默认升序
     */
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo15sortBy")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val intRDD: RDD[Int] = sc.parallelize(List(1, 3, 6, 5, 2, 4, 6, 8, 9, 7))

    intRDD.sortBy(i => i).foreach(println) // 升序
    intRDD.sortBy(i => -i).foreach(println) // 降序
    intRDD.sortBy(i => i, ascending = false).foreach(println) // 降序

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")

    // 按照年龄进行降序
    stuRDD.sortBy(s => -s.split(",")(2).toInt).foreach(println)

  }

常见的Action算子

def main(args: Array[String]): Unit = {
    /**
     * 常见的Action算子:foreach、take、collect、count、reduce、save相关
     * 每个Action算子都会触发一个job
     *
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo16Action")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")

    /**
     * foreach:对每条数据进行处理,跟map算子的区别在于,foreach算子没有返回值
     */

    stuRDD.foreach(println)

    // 将stuRDD中的每条数据保存到MySQL中
    /**
     * 建表语句:
     * CREATE TABLE `stu_rdd` (
     * `id` int(10) NOT NULL AUTO_INCREMENT,
     * `name` char(5) DEFAULT NULL,
     * `age` int(11) DEFAULT NULL,
     * `gender` char(2) DEFAULT NULL,
     * `clazz` char(4) DEFAULT NULL,
     * PRIMARY KEY (`id`)
     * ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
     */

    // 每一条数据都会创建一次连接,频繁地创建销毁连接效率太低,不合适
    //    stuRDD.foreach(line => {
    //      val splits: Array[String] = line.split(",")
    //      // 1、建立连接
    //      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false", "root", "123456")
    //      println("建立了一次连接")
    //      // 2、创建prepareStatement
    //      val pSt: PreparedStatement = conn.prepareStatement("insert into stu_rdd(id,name,age,gender,clazz) values(?,?,?,?,?)")
    //
    //      // 3、传入参数
    //      pSt.setInt(1, splits(0).toInt)
    //      pSt.setString(2, splits(1))
    //      pSt.setInt(3, splits(2).toInt)
    //      pSt.setString(4, splits(3))
    //      pSt.setString(5, splits(4))
    //
    //      // 4、执行SQL
    //      pSt.execute()
    //
    //      // 5、关闭连接
    //      conn.close()
    //
    //    })

    /**
     * take : Action算子,可以将指定条数的数据转换成Scala中的Array
     *
     */
    // 这里的foreach是Array的方法,不是算子
    stuRDD.take(5).foreach(println)

    /**
     * collect : Action算子,可以将RDD中所有的数据转换成Scala中的Array
     */
    // 这里的foreach是Array的方法,不是算子
    stuRDD.collect().foreach(println)

    /**
     * count : Action算子,统计RDD中数据的条数
     */
    println(stuRDD.count())

    /**
     * reduce : Action算子,将所有的数据作为一组进行聚合操作
     */
    // 统计所有学生的年龄之和
    println(stuRDD.map(_.split(",")(2).toInt).reduce(_ + _))

    /**
     * save相关:
     * saveAsTextFile、saveAsObjectFile
     */
  }

标签:String,val,RDD,conf,算子,println,分布式
From: https://www.cnblogs.com/bfy0221/p/16842384.html

相关文章

  • RDD的方法
    方法介绍简单使用 flatmap对RDD中的每一个元素进行先map再压扁,最后返回操作的结果scala>sc.parallelize(Array("abc","def","hij")).collectres31:Array[Strin......
  • 浅谈Redis与分布式锁
    后续进行补充,先放个链接在这,哈哈 https://zhuanlan.zhihu.com/p/378797329#:~:text=%E6%83%B3%E8%A6%81%E5%AE%9E%E7%8E%B0%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%EF%BC......
  • Spark有状态算子
    Spark有状态算子不仅可以计算当前批次的结果,还可以结合上一次的结果,并对两次结果进行汇总packagecom.streamingimportorg.apache.spark.sql.SparkSessionimportor......
  • 集群和分布式系统
    左:中心化中:去中心化右:分布式参考​​​中心化和去中心化,集群和分布式之间的区别和联系​​​​分布式系统的经典基础理论——中心化与去中心化​​​​分布式与集群的区......
  • 分布式下获取单号的一个方式
    分布式下,获取单号有多种方式。1.UUID乱序,且很长,不利于数据库做索引查询和空间浪费2.数据库自增序列,每次都要访问数据库,IO开销大,高并发时几乎不可用3.Redis自增,优点......
  • Redis实现分布式缓存
    单机的Redis存在四大问题:数据丢失、并发能力弱、故障恢复问题、存储能力1、Redis持久化(解决数据丢失问题)有两种持久化方案:RDB/AOF★RDB(数据备份文件):把内存......
  • 【分布式技术专题】「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分
    分布式事务分布式事务的场景什么场景下会出现分布式事务?TX协议⼀种分布式事务协议,包含⼆阶段提交(2PC),三阶段提交(3PC)两种实现。二阶段提交方案:强一致性事务的发起者称协调者,事......
  • redisson分布式限流[RRateLimiter]源码分析
    接下来在讲一讲平时用的比较多的限流模块--RRateLimiter1.简单使用publicstaticvoidmain(String[]args)throwsInterruptedException{RRateLimiterrateLimit......
  • 分布式ID生成方案总结整理
    目录1、为什么需要分布式ID?2、业务系统对分布式ID有什么要求?3、分布式ID生成方案3.1UUID3.2、数据库自增3.3、号段模式3.4、Redis实现3.4、雪花算法(SnowFlake)3.5、百度......
  • 记录一次redis分布式锁的坑
    redis分布式锁的实现方式是:lock(){sync(this){//无法获取自旋setnx(key,UUID)setex(60s)returnUUID}}unlock......