首页 > 其他分享 >Scala迭代器与RDD五大属性

Scala迭代器与RDD五大属性

时间:2023-07-31 17:11:38浏览次数:29  
标签:map String 迭代 Scala 分区 next RDD

Scala迭代器与RDD五大属性

迭代器就是读数据的工具

Scala迭代器的理解

​ 迭代器是读数据的工具,例如Scala的List就提供了它自己的迭代器(读数据工具),而我们也可以自己写迭代器去读List里面的数据,而不使用List的迭代器。

自己编写迭代器读List数据

class MyFileIterable(filePath:String) extends Iterable[String]{
  override def iterator: Iterator[String] = new MyFileIterator(filePath)
}

class MyFileIterator(filePath:String) extends Iterator[String]{

  private val br = new BufferedReader(new FileReader(filePath))
  var line: String = null
  override def hasNext: Boolean = {
    line = br.readLine()
    line != null
  }
  override def next(): String = {
    line
  }
}

需要实现hasNext()和next()方法。注:由于后文需要使用groupBy,而Iterator并没有这个方法,因此我们再继承一个Iterable

  • 测试WordCount
object MyFileIteratorTest{
  def main(args: Array[String]): Unit = {
    val iterable = new MyFileIterable("data/wordcount/input/a.txt")
    iterable
      .flatMap(s=>s.split("\\s+"))
      .map(s=>(s,1))
      .groupBy(tp => tp._1)
      .map(tp => (tp._1,tp._2.size))
      .foreach(println)
  }
}

迭代器中map方法探究

map方法和hasnext、next方法是不同的,map是对原来迭代器里的数据做一个指定映射,而hasnext,next是一个取数的过程。让我们看看iterator.map的源码。

可以看到,map传入了一个计算逻辑f,返回了一个新的迭代器。而新迭代器的next()是原来迭代器next上套了一层计算逻辑A。

如果我们不进行hasnext next的话,就只是定义了iter1和iter2,并不会有数据计算,这就是lazy特性。

那源头迭代器是怎么样的呢?在末端迭代器调用next后,一直调用到最开始的源头迭代器,那源头迭代器一定是需要真正读数据的了,而读的方式则根据数据源的不同而设计的,就如上面的例子是用BufferedReader;而数据源有很多,例如:

  • hdfs
  • 本地磁盘文件
  • 内存中的数据
  • mysql

联系scala迭代器与RDD

经过上文,是不是感觉到迭代器和RDD有巨多的相似之处,RDD的本质就是迭代器!

RDD中的两种计算操作

Transcation转换操作

​ transcation转换操作的基本思路就和scala迭代器中的map操作基本一致,都是套计算逻辑。上源码。

Action执行操作

​ action执行操作也和上文一样,就是调用迭代器的next

Compute

​ transaction和action综合起来就是RDD里面的计算函数compute,而compute就是返回了一个迭代器!

RDD的五大属性

  • 一个分区列表
  • 作用到每个分区的计算函数
  • 依赖RDD列表
  • 【可选】对于KV类型的RDD会有一个Partitioner(例如,定义某个RDD是Hash分区的)
  • 【可选】每个分区的首选计算执行位置

计算函数(最核心)

抽象理解:计算函数就是作用在每一个partition上的逻辑运算函数;

具象理解:上文已经看到了,compute实质就是返回了一个迭代器,这个迭代器保存了这个RDD的计算逻辑。

依赖RDD列表(重要)

计算函数的核心是迭代器,而迭代器又需要依赖上一个迭代器,因此RDD也需要依赖上一个RDD。

依赖RDD列表存储了当前RDD所依赖的一个或多个前序RDD。

那列表代表什么意思呢?这里的列表并不是说记录了整条RDD链,它仅仅记录上一层RDD,但是上一层RDD并不一定只有一个;如map就确实只有一个父RDD,但如join则有两个父RDD了;所以这里需要使用列表。

分区列表(重要)

在分布式环境中,数据源需要被分成多块进行并行计算,而Spark也是如此的。

RDD中数据集的基本组成单位是分区

(注:并不一定RDD1的分区一一对应RDD2的分区,可以了解一下宽窄依赖)

对于RDD1来说,它的分区数是3,决定了并行度也是3,而每一个分区都会被一个计算任务处理。

因此,一个RDD一定需要记录数据的分区信息,这就是分区列表。这里再强调一下,RDD只描述数据,而不实际存储数据,这个分区也只是一种描述。

KV类型RDD的分区器【可选】

分区器就是一种分区规则,但分区发生变化的时候,需要有分区规则来对数据进行分区。

那什么时候需要用到分区器呢?

每个分区的首选计算执行位置【可选】

分区放在哪台机器计算是有讲究的,设想,如果分区1的数据存在机器A上,那是不是在机器A执行该分区的执行任务更为合理呢?减少网络传输能有效提高效率。而这个属性就是记录了哪个分区在哪里算最好。

总结

​ 本文从Scala迭代器引出了RDD的本质,并且介绍了RDD源码所说的五大属性的含义。再进一步理解,就需要从整个Spark的运行模式开始了。

标签:map,String,迭代,Scala,分区,next,RDD
From: https://www.cnblogs.com/nanguahh/p/17593895.html

相关文章

  • mysql udf mof escalate privilege
    原理udf=‘userdefinedfunction‘,即‘用户自定义函数’。文件后缀为‘.dll’,常用c语言编写。通过在udf文件中定义新函数,对MYSQL的功能进行扩充,可以执行系统任意命令。将MYSQL账号root转化为系统system权限。思路获取udf文件上传udf到指定位置sqlmap有现成的udf文件,分为32......
  • 设计模式-迭代器模式在Java中使用示例
    场景为开发一套销售管理系统,在对该系统进行分析和设计时,发现经常需要对系统中的商品数据、客户数据等进行遍历,为了复用这些遍历代码,开发人员设计了一个抽象的数据集合类AbstractObjectList,而将存储商品和客户等数据的类作为其子类AbstractObjectList类的子类ProductList和Custo......
  • 16迭代器/for循环本质/异常处理
    常见内置函数(补充)1.help()查看注释信息help(len)2.id()返回一串相当于内存地址的数字print(id('jason'))3.int()类型转换、机制转换4.isinstance()判断数据类型print(type('jason')isstr)#类型判断别扭的很不推荐print(isinstance('jason',str......
  • scrum工具管理敏捷产品迭代
    ​什么是Sprint Backlog? Sprint Backlog是Scrum的主要工件之一。在Scrum中,团队按照迭代的方式工作,每个迭代称为一个Sprint。在Sprint开始之前,PO会准备好产品Backlog,准备好的产品Backlog应该是经过梳理、估算和优先级排列的。在Sprint开始时,第一件事情是Sprint计划会议,在Sprin......
  • Python【22】 __iter__, __next__, iterable, iterator, 可迭代对象, 迭代
    参考:https://www.jianshu.com/p/1b0686bc166d......
  • python迭代
    Python迭代Python是一种高级编程语言,它提供了许多强大的功能和工具,其中之一就是迭代。迭代是Python中一个非常重要的概念,它允许我们对数据进行逐个访问和处理,而不需要显式地编写循环。什么是迭代?迭代是指重复执行一系列操作的过程。在编程中,迭代通常用于遍历数据集合,例如列表、......
  • openpyxl模块-按行,列迭代数据
    #!/usr/bin/envpythonfromopenpyxlimportWorkbookbook=Workbook()sheet=book.activerows=((88,46,57),(89,38,12),(23,59,78),(56,21,98),(24,18,43),(34,15,67))forrowinrows:sheet.append(row)for......
  • Abaqus 中的步进、增量、迭代和尝试概念 硕迪科技
    Abaqus中的步进、增量、迭代和尝试等可能会在概念上让Abaqus初学者感到困惑。清楚地了解分析步骤、荷载增量和迭代之间的区别非常重要。在这篇文章中快速了解Abaqus步骤和增量迭代。在ABAQUS中,步进增量迭代是解决非线性问题的一种数值计算方法。这种方法通常用于模拟材料的非......
  • Test Parameter-Scalar Parameter创建
    TestParameter-ScalarParameter创建_哔哩哔哩_bilibili1.背景:例:编写一个传感器输入值的测试,测试环境包括Tester(测试方),ECU(被测对象),Tester发送传感器变量SensorA和SensorB,ECU反馈Force值的过程,根据不同传感器输入值测试Force反馈值,即便有自定义的TestCase模板,每次还是需要输入......
  • Scala语言
    Scala前言 基础语法    基本数据类型     变量   修饰限定符    循环    方法和函数         闭包 字符串    数组       ......