RDD定义
RDD是弹性分布式数据集,是spark中的最基本的数据抽象,里面的元素可以并行计算
RDD的五大特性
RDD是有分区的,它的分区是数据存储的最小单位
RDD的方法会作用在所有分区上
RDD之间是有依赖关系的
KV型的RDD可以有分区器
RDD的分区会尽量靠近数据所在的服务器,尽量保证本地读取
from pyspark import SparkContext,SparkConf
conf = Sparkconf().setAppName('Wordcount').setMaster('local[*]')
sc = SparkContext(conf = conf)
rdd1 = sc.textFile("./test.txt")
rdd2 = rdd1.flatMap(lambda x:x.split(' '))
rdd3 = rdd2.map(lambda x: (x,1))
rdd4 = rdd3.reduceBykey(lambda a,b : a+b)
print(rdd4.collect())
#pyspark实现word count
将本地集合转化成分布式RDD
from pyspark import SparkContext,SparkConf
conf = Sparkconf().setAppName('Wordcount').setMaster('local[*]')
sc = SparkContext(conf = conf)
# 本地数据
data = [1,2,3,4,5,6]
rdd = sc.parallesize(data, numSlices=3)
# 转化成RDD数据,并且分了三个区
读取RDD分区数
读取文件
rdd = sc.textFile('../data/words.txt',1000)
# 限制读取的最大分区数为1000
rdd.getNumPartitions()
# 适用于读取小文件
rdd = sc.wholeTextFiles('../data/tiny_file/',1000)
RDD算子
分布式集合对象上的API成为算子
对应本地对象的API即为方法、函数
其中RDD算子分为两类:transformation转换算子和action动作算子
其中转换算子的返回值仍是RDD,返回值不是RDD的即为动作算子
在上面的代码片中,textfile,flatmap,mao,reducebykey均为转换算子,collect为动作算子
常用转换算子
rdd.map()将数据按照处理函数一条条处理并返回新的RDD
这个函数通常是lambda x:f(x)的形式
rdd.flatMap()执行map操作,然后接触嵌套
rdd.reduceByKey()实现自动按照key分组,然后按照提供的聚合逻辑实现组内数据的聚合操作
rdd.groupNy()
#对数字按照偶数奇数分组
rdd = sc.parallesize([1,2,3,4,5])
rdd2 = rdd.groupBy(lambda num: 'even' if (num%2==0) else 'odd')
#分组完成后每个组是一个二元元组
rdd3 = rdd2.map(lambda(x : (x[0],list(x[1]))))
print(rdd3.collect())
rdd.filter() 根据函数过滤想要的数据进行保留
rdd.distinct()对RDD数据进行去重,并返回新的RDD
rdd.union()合并两个RDD并返回一个RDD,只合并不去重,并且两个RDD的类型可以不同
rdd.join()实现内连接
rdd.leftOuterJoin()左外连接
rdd.rightOuterJoin()右外连接
rdd.intersection()返回两个RDD的交集
rdd.glom()按照数据分区进行嵌套并返回新的RDD
rdd.groupByKey()按照键值进行分组,返回一个二元元组
rdd.sortBy()
按照函数进行排序,ascending确定升序还是降序
numpartitions确定按照多少分区进行排序
rdd.sortByKey()按照键值进行排序,ascending确定升序和降序,numPartition确定分区大小,keyfunc确定排序之前最键值的处理方法
常用动作算子
rdd.countByKey()统计键值出现的次数
rdd.collect()将各个分区的数据统一收集到driver中,形成一个list对象,结果数据集不能过大,否则会造成内存爆炸
rdd.reduce(func)按照传入的函数逻辑进行聚合
rdd.fold(default value,func) 按照逻辑使用初始值进行聚合操作
rdd.first()取出第一个元素
rdd.take(n)取出前n个元素转化成list返回
rdd.top()取出降序排序的前n个元素
rdd.count()取出RDD中的数据条数,是一个数字返回值
rdd.takeSample()随机抽样数据
rdd.takeOrdered()排序后取前n个数据
rdd.foreach()按照逻辑对每一个元素执行指定操作,但是没有返回值
rdd.saveAsTextFile()将RDD数据写入到文本文件中,支持写出到本地或者hdfs文件系统中
foreach和saveAsTextFile()是由分区直接执行的,不经过driver,其余action算子均会经过driver
分区操作算子
rdd.mapPartitions()传递整个分区的数据
rdd.foreachPartition()一次处理一整个分区的数据
rdd.partitionBy()按照新的分区对原数据按照逻辑进行处理
rdd.repartition()按照分区执行重新分区
rdd.coalesce()对分区进行数量增减,通过第二个参数确定是否,比上一个算子更有安全性
rdd.mapValues()对与二元元组RDD进行操作,对其value值进行map操作
rdd.join()实现内连接
rdd.leftOuterJoin()左外连接
rdd.rightOuterJoin()右外连接
面试题
groupbykey和reducebykey的区别
前者只有分组功能,但是后者除了分组功能还有聚合功能,后者在分区内会进行预聚合,在进行分组流程,被分组的数据量大大减少,提高了整体性能
RDD持久化
RDD数据是过程数据,当新数据生成时,旧数据消失
因此RDD提供缓存技术,避免多次重复运算,
rdd.persist(StorageLevel.MEMORY_AND_DISK)
缓存是有可能丢失的
RDD check point也是将数据保存起来,但是只支持硬盘存储,它被设计认为是安全的,但是不保留血缘关系