Scala迭代器与RDD五大属性
迭代器就是读数据的工具
Scala迭代器的理解
迭代器是读数据的工具,例如Scala的List就提供了它自己的迭代器(读数据工具),而我们也可以自己写迭代器去读List里面的数据,而不使用List的迭代器。
自己编写迭代器读List数据
class MyFileIterable(filePath:String) extends Iterable[String]{
override def iterator: Iterator[String] = new MyFileIterator(filePath)
}
class MyFileIterator(filePath:String) extends Iterator[String]{
private val br = new BufferedReader(new FileReader(filePath))
var line: String = null
override def hasNext: Boolean = {
line = br.readLine()
line != null
}
override def next(): String = {
line
}
}
需要实现hasNext()和next()方法。注:由于后文需要使用groupBy,而Iterator并没有这个方法,因此我们再继承一个Iterable
- 测试WordCount
object MyFileIteratorTest{
def main(args: Array[String]): Unit = {
val iterable = new MyFileIterable("data/wordcount/input/a.txt")
iterable
.flatMap(s=>s.split("\\s+"))
.map(s=>(s,1))
.groupBy(tp => tp._1)
.map(tp => (tp._1,tp._2.size))
.foreach(println)
}
}
迭代器中map方法探究
map方法和hasnext、next方法是不同的,map是对原来迭代器里的数据做一个指定映射,而hasnext,next是一个取数的过程。让我们看看iterator.map的源码。
可以看到,map传入了一个计算逻辑f,返回了一个新的迭代器。而新迭代器的next()是原来迭代器next上套了一层计算逻辑A。
如果我们不进行hasnext next的话,就只是定义了iter1和iter2,并不会有数据计算,这就是lazy特性。
那源头迭代器是怎么样的呢?在末端迭代器调用next后,一直调用到最开始的源头迭代器,那源头迭代器一定是需要真正读数据的了,而读的方式则根据数据源的不同而设计的,就如上面的例子是用BufferedReader;而数据源有很多,例如:
- hdfs
- 本地磁盘文件
- 内存中的数据
- mysql
联系scala迭代器与RDD
经过上文,是不是感觉到迭代器和RDD有巨多的相似之处,RDD的本质就是迭代器!
RDD中的两种计算操作
Transcation转换操作
transcation转换操作的基本思路就和scala迭代器中的map操作基本一致,都是套计算逻辑。上源码。
Action执行操作
action执行操作也和上文一样,就是调用迭代器的next
Compute
transaction和action综合起来就是RDD里面的计算函数compute,而compute就是返回了一个迭代器!
RDD的五大属性
- 一个分区列表
- 作用到每个分区的计算函数
- 依赖RDD列表
- 【可选】对于KV类型的RDD会有一个Partitioner(例如,定义某个RDD是Hash分区的)
- 【可选】每个分区的首选计算执行位置
计算函数(最核心)
抽象理解:计算函数就是作用在每一个partition上的逻辑运算函数;
具象理解:上文已经看到了,compute实质就是返回了一个迭代器,这个迭代器保存了这个RDD的计算逻辑。
依赖RDD列表(重要)
计算函数的核心是迭代器,而迭代器又需要依赖上一个迭代器,因此RDD也需要依赖上一个RDD。
依赖RDD列表存储了当前RDD所依赖的一个或多个前序RDD。
那列表代表什么意思呢?这里的列表并不是说记录了整条RDD链,它仅仅记录上一层RDD,但是上一层RDD并不一定只有一个;如map就确实只有一个父RDD,但如join则有两个父RDD了;所以这里需要使用列表。
分区列表(重要)
在分布式环境中,数据源需要被分成多块进行并行计算,而Spark也是如此的。
RDD中数据集的基本组成单位是分区。
(注:并不一定RDD1的分区一一对应RDD2的分区,可以了解一下宽窄依赖)
对于RDD1来说,它的分区数是3,决定了并行度也是3,而每一个分区都会被一个计算任务处理。
因此,一个RDD一定需要记录数据的分区信息,这就是分区列表。这里再强调一下,RDD只描述数据,而不实际存储数据,这个分区也只是一种描述。
KV类型RDD的分区器【可选】
分区器就是一种分区规则,但分区发生变化的时候,需要有分区规则来对数据进行分区。
那什么时候需要用到分区器呢?
每个分区的首选计算执行位置【可选】
分区放在哪台机器计算是有讲究的,设想,如果分区1的数据存在机器A上,那是不是在机器A执行该分区的执行任务更为合理呢?减少网络传输能有效提高效率。而这个属性就是记录了哪个分区在哪里算最好。
总结
本文从Scala迭代器引出了RDD的本质,并且介绍了RDD源码所说的五大属性的含义。再进一步理解,就需要从整个Spark的运行模式开始了。
标签:map,String,迭代,Scala,分区,next,RDD From: https://www.cnblogs.com/nanguahh/p/17593895.html