RDD的API操作/方法/算子
- 比如有一个100M的csv文件,需要对它的每个元素操作,比如先+1,再平方,结果保存另一个csv文件。
- 如下图,如果用传统python思维,不仅每个中间容器占用内存,消耗更多资源,而且每步都耗时。
- 如果用RDD思维,则每个中间容器只是记住了要做什么,逻辑上该有什么数据,但不立即做,等最后一步再做,合并计算。实际并不占用过多内存,耗时短。
Python思维:每一步都用对应的函数进行操作,基于操作的结果数据,继续进行下一步操作,会产生较多的内存占用.
RDD思维:先预走一遍,记住每一步的操作,知道遇到action算子,表示,所有的处理到头了,然后,看看之前的操作函数能不能合并操作,如图先+1再平方,可以合并成(x+1)^2,这样,直接对RDD一开始的数据,用合并后的操作函数直接操作并且直接输出到结果中,省去了中间的步骤,比如中间的读取很存储,都省去了,不仅节省了内存空间,还使得计算也更加快.
分类
Transformation算子
-
返回一个新的【RDD】,所有的transformation函数(算子)都是【延迟计算】的,不会立即执行,比如wordcount中的【flatMap】,【map】,【reduceByKey】
-
下面了解用得较多的即可,不用背记。
-
转换 含义 map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活 join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable ,Iterable ))类型的RDD cartesian(otherDataset) 笛卡尔积 pipe(command, [envVars]) 对rdd进行管道操作 coalesce(numPartitions) 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 repartition(numPartitions) 重新给 RDD 分区 -
练习下面3类transformation算子
-
glom函数可以看每个分区内有哪些元素。
-
值类型valueType
-
mapValues:RDD元素是键值对,只对value转换
-
map:将RDD的每个元素进行转换,整体形成新的RDD
-
groupBy:将RDD的每个元素,映射到一个key,再按照相同的key,分组到一起,整体形成新的RDD
-
filter:将RDD的每个元素进行过滤,过滤后的元素,进入新的RDD
-
flatMap:将RDD的每个元素,进行拆解成更小的元素,进入新的RDD,一般元素个数会增加
-
distinct:将RDD的相同的元素,进行去重复,进入新的RDD
-
下面的交互窗口方便测试
-
把wordcount头部的一些代码,复制到下面的输入框中,眼睛瞧仔细是哪几句。手动创建上下文对象。
把下面的>>>后的一句句代码,敲入上图Python Console输入框。
>>> rdd1=sc.parallelize([5,6,4,7,3,8,2,9,1,10]) >>> obj=rdd1.collect() >>> type(obj) <class 'list'> >>> rdd1.glom().collect() [[5, 6], [4, 7], [3, 8], [2, 9, 1, 10]] >>> len(rdd1.glom().collect()) 4 >>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3) >>> rdd1.getNumPartitions() 3 #关于distinct rdd1=sc.parallelize([1,2,2,2,3,3]) rdd2=rdd1.distinct() rdd2.collect() Out[22]: [3, 1, 2] #map的用法1 >>> rdd2=rdd1.map(lambda x:x+1) >>> rdd2.collect() [2, 3, 4, 5, 6, 7, 8, 9, 10] >>> rdd2.getNumPartitions() 3 >>> def add(x): ... return x+1 ... #map的用法2 >>> rdd2=rdd1.map(add) >>> rdd2.collect() [2, 3, 4, 5, 6, 7, 8, 9, 10] #map的用法3 >>> rdd2=rdd1.map(lambda x:add(x)) >>> rdd2.collect() [2, 3, 4, 5, 6, 7, 8, 9, 10] >>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3) >>> rdd3=rdd1.filter(lambda x:x>4) >>> rdd3.collect() [5, 6, 7, 8, 9] >>> rdd1 = sc.parallelize(["a b c","d e f","h i j"]) >>> rdd1.getNumPartitions() 4 >>> rdd1.count() 3 >>> rdd2=rdd1.flatMap( lambda x : x.split(" ") ) >>> rdd2.collect() ['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j'] >>> rdd2.count() 9 >>> rdd1=sc.parallelize([("a", ["apple","banana","lemon"]), ("b", ["grapes"])]) >>> def f(x):return len(x) ... >>> rdd1.mapValues(f).collect() [('a', 3), ('b', 1)] >>> rdd1=sc.parallelize([1,2,3]) >>> rdd2=rdd1.groupBy(lambda x : 'A' if(x%2==1) else 'B' ) >>> for obj in rdd2.collect():print(obj) ... ('A', <pyspark.resultiterable.ResultIterable object at 0x7f8911d1d640>) ('B', <pyspark.resultiterable.ResultIterable object at 0x7f8911126af0>) >>> rdd3=rdd2.map( lambda obj : (obj[0] , list(obj[1]) )) >>> rdd3.collect() [('A', [1, 3]), ('B', [2])] >>> rdd3=rdd2.mapValues(lambda v:list(v) ) #可以简化写为 >>> rdd3=rdd2.mapValues(list) >>> rdd3.collect() [('A', [1, 3]), ('B', [2])]
-
-
双值类型DoubleValueType
-
union
-
intersection
>>> rdd1 = sc.parallelize([("a",1),("b",2)]) >>> rdd2 = sc.parallelize([("c",1),("b",3)]) >>> rdd3=rdd1.union(rdd2) >>> rdd3.collect() [('a', 1), ('b', 2), ('c', 1), ('b', 3)] >>> rdd4=rdd1.intersection(rdd2) >>> rdd4.collect() []
-
-
Key-Value值类型
-
groupByKey
-
reduceByKey
-
sortByKey
>>> rdd1 = sc.parallelize([("a",1),("b",2)]) >>> rdd2 = sc.parallelize([("c",1),("b",3)]) >>> rdd3=rdd1.union(rdd2) >>> rdd5=rdd3.groupByKey() >>> for x in rdd5.collect():print(x) ... ('a', <pyspark.resultiterable.ResultIterable object at 0x7f89111396d0>) ('b', <pyspark.resultiterable.ResultIterable object at 0x7f89111397c0>) ('c', <pyspark.resultiterable.ResultIterable object at 0x7f8911139850>) >>> for x in rdd5.mapValues(list).collect() : print(x) ... ('a', [1]) ('b', [2, 3]) ('c', [1]) >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] >>> rdd1=sc.parallelize(tmp) >>> rdd3=rdd1.sortByKey() >>> for x in rdd3.collect():print(x) ... ('1', 3) ('2', 5) ('a', 1) ('b', 2) ('d', 4) >>> rdd3.first() ('1', 3) >>> x = sc.parallelize([1,3,1,2,3]) >>> y=x.countByValue() >>> y defaultdict(<class 'int'>, {1: 2, 3: 2, 2: 1}) >>>
-
-
Action算子
-
返回的【不是】RDD,可以将结果保存输出,所有的Action算子【立即】执行,比如wordcount中的【foreach和saveAsTextfile】。
-
下面了解用得较多的即可,不用背记。
-
动作 含义 reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 collect() 在驱动程序中,以数组的形式返回数据集的所有元素 count() 返回RDD的元素个数 first() 返回RDD的第一个元素(类似于take(1)) take(n) 返回一个由数据集的前n个元素组成的数组 takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素 saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 foreach(func) 在数据集的每一个元素上,运行函数func进行更新。通常用foreach(print)打印数据. foreachPartition(func) 在数据集的每一个分区上,运行函数func - first:返回第一个元素
- take:返回前几个元素组成的list,
- top:倒排序并返回前几个元素组成的list
- count:返回元素的个数
- 上面Executor都会将执行的结果统一发送回Driver
- 唯独foreach和saveAsTextFile是不会统一发送回Driver的.
- foreach:对每个元素,遍历输出到某种介质
- saveAsTextFile:保存为文件
-
练习
-
collect:将各Executor节点的数据拉取到Driver节点,形成单机python列表。注意如果RDD数据量太大,可能会让driver进程内存溢出崩溃。所以要慎用。
-
reduce:对所有的元素,聚合成一个值
-
使用函数【lambda x,y:x+y】对列表[1,2,3,4,5]的元素聚合时, 1-没有初始值,过程是: 第1轮函数计算:x是元素1,y是元素2,结果是3 第2轮函数计算:x是上轮结果3,y是元素3,结果是6 第3轮函数计算:x是上轮结果6,y是元素4,结果是10 。。。 后续每轮: x是上轮结果 ,y是下一个元素 2-有初始值时 初始值=0 第1轮函数计算:x是初始值0,y是元素1,结果是1 第2轮函数计算:x是上轮结果1,y是元素2,结果是3 后续每轮: x是上轮结果 ,y是下一个元素 2-有初始值时 初始值=1 第1轮函数计算:x是初始值1,y是元素1,结果是2 第2轮函数计算:x是上轮结果2,y是元素2,结果是4 后续每轮: x是上轮结果 ,y是下一个元素
-
-
from pyspark import SparkConf, SparkContext
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
conf = SparkConf().setAppName('wordcount').setMaster('local[*]')
sc = SparkContext(conf=conf)
22/01/22 09:52:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
sc.defaultParallelism
Out[4]: 2
rdd1=sc.parallelize(range(1,10),3)
rdd2=rdd1.map(lambda x:x+1)
list1=rdd2.collect()
In[8]: print(list1)
[2, 3, 4, 5, 6, 7, 8, 9, 10]
type(list1)
Out[9]: list
rdd1=sc.parallelize([1,2,3,4,5])
rdd1.reduce(lambda x,y:x+y)
Out[11]: 15
a=rdd1.reduce(lambda x,y:x+y)
print(a)
15
sc.parallelize([2,3,4]).first()
Out[14]: 2
sc.parallelize([2,3,4,5,6]).take(2)
Out[15]: [2, 3]
sc.parallelize([2,3,4,5,6]).take(10)
Out[16]: [2, 3, 4, 5, 6]
x = sc.parallelize([1,3,1,2,3])
y=x.top(num=3)
print(y)
[3, 3, 2]
sc.parallelize([2,3,4]).count()
Out[24]: 3
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x):print(x)
words.foreach(f)
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark
RDD的API操作/方法/算子
分区操作函数
-
mapPartitions
-
map与mapPartitions,过程不一样。map是对RDD的每个【元素】转换,颗粒度更细,mapPartitions是对每个【分区】整体进行转换,颗粒度粗。
-
-
foreachPartition:
- foreach和foreachPartition的【结果效果】是一样的,但是【过程】不一样。foreach是对每个【元素】输出,颗粒度更细,foreachPartition是对每个【分区】整体进行输出,颗粒度粗。
-
代码
def f(ite): for x in ite : print(x) #f函数被调用了2次 sc.parallelize([1,2,3,4,5],2).foreachPartition(f) 1 2 3 4 5 ------------------------------------ def f2(x): print(x) #f2函数被调用了5次 sc.parallelize([1,2,3,4,5]).foreach(f2) 1 2 3 4 5 #结果一样,调用次数不同
重分区函数
repartition(n)底层源码就是coalesce(n,shuffle=True)
coalesce(n)底层就是coalesce(n,shuffle=False)
-
repartition(n) :通常用来【增大】分区,默认底层调用coalesce(n, shuffle = true),会产生【shuffle 】,其实不管是增大还是减小,都会产生【shuffle 】。
-
coalesce:一般用来【减少】分区,coalesce(n, shuffle = false),【默认不会】产生shuffle,如果强行增大分区数,那么不起作用,除非你允许使用shuffle ,即coalesce(n, shuffle = True)。
-
【了解】rdd.partitionBy(分区数):要求被调用的RDD的元素是【键值对形式】。
-
何时增加分区数目?:当处理的数据很多的时候,可以考虑增加RDD的分区数目
-
何时减少分区数目?:当对RDD数据进行过滤操作(filter函数)后,考虑是否降低RDD分区数目,不然可能形成很多个小文件.
- 当对结果RDD存储到外部系统,比如mysql和HDFS
-
代码
rdd1=sc.parallelize(range(1,8),4) rdd1.getNumPartitions() Out[39]: 4 rdd1.glom().collect() Out[40]: [[1], [2, 3], [4, 5], [6, 7]] rdd1.repartition(2).getNumPartitions() Out[42]: 2 rdd1.repartition(10).getNumPartitions() Out[43]: 10 rdd1=sc.parallelize([1, 2, 3, 4, 5], 3) rdd1.coalesce(1).getNumPartitions() Out[46]: 1 rdd1.coalesce(4).getNumPartitions() Out[47]: 3 rdd1.coalesce(4,True).getNumPartitions() Out[48]: 4 rdd1=sc.parallelize([1,2,3,4,2,4,1],1) pairs_rdd=rdd1.map(lambda x:(x,x)) pairs_rdd.foreach(lambda x:print(x)) (1, 1) (2, 2) (3, 3) (4, 4) (2, 2) (4, 4) (1, 1) rdd2=pairs_rdd.partitionBy(3) rdd2.glom().foreach(lambda x:print(x)) [(1, 1), (4, 4), (4, 4), (1, 1)] [(3, 3)] [(2, 2), (2, 2)]
没有key的聚合函数
回顾python中的reduce
from functools import reduce def add(x,y): return x+y sum1=reduce(add,[1,2,3,4,5]) sum1 Out[59]: 15 -------------------------------------- sum2=reduce(lambda x,y:x+y,[1,2,3,4,5]) sum2 Out[61]: 15
- aggregate
- 值=rdd.aggregate( 初始值 , 分区内聚合函数 , 分区间聚合函数)
)
注意:上图中可以看到,分区间的聚合,初始值依然参与运算.
- fold ,如果上面的aggregate的分区内和分区间的聚合函数,逻辑一样,则简写为一次
- 值=rdd.fold( 初始值 , 聚合函数 )
- reduce,如果上面的fold的初始值,没有意义时,比如加法时初始值是0,或乘法初始值是1,可以省略。
- 值=rdd.reduce(聚合函数)
rdd=sc.parallelize([1, 2, 3, 4],2)
rdd.aggregate(0,lambda x,y:x+y,lambda x,y:x+y)
10
rdd.aggregate(1,lambda x,y:x+y,lambda x,y:x+y)
13
rdd.fold(0,lambda x,y:x+y)
10
rdd.fold(1,lambda x,y:x+y)
13
rdd.reduce(lambda x,y:x+y)
10
综合案例
from pyspark import SparkConf, SparkContext
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
#创建上下文对象
conf=SparkConf().setAppName('test').setMaster('local[*]')
sc=SparkContext(conf=conf)
#定义RDD,有6个数字,分区数是3
rdd=sc.parallelize([1,2,3,4,5,6],3)
#用aggregate求和
from operator import add
r1=rdd.aggregate(0,lambda x,y:x+y , lambda x,y:x+y)
#也可以写成
r2=rdd.aggregate(0,add , add)
print('aggregate结果,',r1,r2)
#用fold求和,如果上面的分区内和分区间的聚合算法逻辑一样,则可以简化成fold
r3=rdd.fold(0,lambda x,y:x+y)
print('fold结果,',r3)
#用reduce求和,如果上面的fold的初始值没有意义,则可以简化成reduce
r4=rdd.reduce(lambda x,y:x+y)
print('reduce结果',r4)
sc.stop()
有key的聚合函数
-
RDD的每个元素是【键值对形式】才能调用下列函数。
-
groupByKey
- rdd2=rdd1.groupByKey() ,rdd1的元素必须是键值对,rdd2的每个元素结构也是键值对,键是key,value是一个迭代器(可看做是容器),groupByKey用于按key聚合.
-
aggregateByKey( 初始值,分区内如何聚合,分区间如何聚合 )
-
下面分区内聚合时,初始值【会】参与计算,而分区间聚合时,初始值【不会】参与计算。具体见下图
-
图解如下当初始值分别=0和1时
-
-
foldByKey
- foldByKey是由【aggregateByKey】简化而来
- 当aggregateByKey的分区内和分区间的聚合函数逻辑是【一样】的,可以省略为一个,就变成了foldByKey。
-
reduceByKey
- reduceByKey是由【foldByKey】简化而来
- 当foldByKey的初始值没有意义时,可以省略不写,就变成了reduceByKey。
from pyspark import SparkConf, SparkContext import os # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置 os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': #创建上下文对象 conf=SparkConf().setAppName('test').setMaster('local[*]') sc=SparkContext(conf=conf) rdd=sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) #用groupByKey,对相同的key,的value聚合 rdd2=rdd.groupByKey() rdd2.mapValues(lambda v: sum(v)).foreach(lambda x: print(x)) #可以简化写成 rdd2.mapValues(sum).foreach(lambda x: print(x)) #用aggregateByKey,对相同的key,的value聚合 rdd.aggregateByKey(0,lambda x,y:x+y,lambda x,y:x+y).foreach(print) from operator import add #可以简化写为 rdd.aggregateByKey(0,add,add).foreach(print) rdd.aggregateByKey(1,add,add).foreach(print) #用foldByKey,对相同的key,的value聚合 #当aggregateByKey,的分区内和分区间的聚合逻辑一样时,可以省略为1次 rdd.foldByKey(0, add).foreach(print) #用reduceByKey,对相同的key,的value聚合 #当foldByKey的初始值没有意义时,可以省略 rdd.reduceByKey( add).foreach(print)
综合案例
from pyspark import SparkConf, SparkContext
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
#创建上下文对象
conf=SparkConf().setAppName('test').setMaster('local[*]')
sc=SparkContext(conf=conf)
lineseq = ["hadoop scala hive spark scala sql sql",
"hadoop scala spark hdfs hive spark",
"spark hdfs spark hdfs scala hive spark"]
rdd=sc.parallelize(lineseq)
import re
rdd2=rdd.flatMap(lambda x: re.split('\s+' , x)).map(lambda x:(x,1))
#用groupByKey统计词频
rdd2.groupByKey().mapValues(sum).foreach(print)
#用aggregateByKey统计词频
from operator import add
rdd2.aggregateByKey(0,add,add).foreach(print)
#用foldByKey统计词频
rdd2.foldByKey(0,add).foreach(print)
#用reduceByKey统计词频
rdd2.reduceByKey(add).foreach(print)
面试题:groupByKey和reduceByKey的区别?
-
groupByKey的shuffle的数据量【多】,容易造成子RDD的分区的内存【溢出】。如果做wordcount词频统计,那么需要继续手动mapValues才能得到结果。
-
groupByKey原理:
-
reduceByKey有【2个】阶段的聚合,性能快。在父RDD分区内做了【预聚合】,在子RDD的分区内再次聚合。
-
reduceByKey原理:
关联函数
-
RDD的每个元素是【键值对】才能调用关联函数。
-
有【join】、【leftOuterJoin】等算子,与SQL语句中的inner join、left outer join作用一样。
-
案例
from pyspark import SparkConf, SparkContext import os # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置 os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': #创建上下文对象 conf=SparkConf().setAppName('test').setMaster('local[*]') sc=SparkContext(conf=conf) rdd1 = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")]) rdd2 = sc.parallelize([(1001, "sales"), (1002, "tech")]) #调用用join rdd3=rdd1.join(rdd2) rdd3.foreach(print) #调用用leftOuterJoin rdd4=rdd1.leftOuterJoin(rdd2) rdd4.foreach(print)
标红框的是重点算子
RDD的持久化/缓存
-
对RDD的转换过程中,如果要对中间某RDD复用多次,比如对RDD进行多次输出,那么默认情况下每次Action都会触发一个job,每个job都会从头开始加载数据并计算,浪费时间。如果将逻辑上的RDD4的数据持久化到具体的存储介质上比如【内存】、【磁盘】,那么只用计算一次该RDD,提高程序性能。
- 高清图见资料截图
-
RDD调用cache/persist都是【lazy:延迟计算】算子,需要一个【action】算子触发后。RDD的数据才会持久化到内存或磁盘。之后的操作,才会从内存或磁盘中直接拿取数据。
-
下面3个都是仅持久化到【内存】
- rdd.cache() --将数据持久化到内存中
- 等价于 rdd.persist()
- 等价于 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY)
-
【了解】更多的存储级别
-
rdd.persist(level : StorageLevel)
-
StorageLevel
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) #此为默认值 StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1) StorageLevel.MEMORY_AND_DISK_DESER = StorageLevel(True, True, False, True) --以上是python版本的源码,java/scale版本还不太一样,在源码上有一段解释,大意为: python的存储总是在python端序列化好了,所以没有看到_SER后缀,因为不用强调,默认都是序列化,反而非序列化需要强调,故有_DESER后缀,表示不要序列化.序列化本质其实就是减少数据体积而已. # DISK:磁盘 MEMORY:内存 ONLY:仅仅 _2:保存两份,并且是在不同的机器上 _SER:序列化 # off_heap:JVM堆外内存 DESER:
-
上面带有后缀的表示:
-
_ONLY:仅仅将数据保存到【内存】或【磁盘】
-
_2:数据持久化时备份【2】份
-
_SER:将RDD的元素进行【序列化】,压缩,方便网络传输。
-
MEMORY_AND_DISK_2 :将数据保存到内存,如果【内存】不够,继续溢写到【磁盘】,并备份2次。
-
-
【了解】释放缓存/持久化
-
当缓存的RDD数据不再被使用时,考虑释放资源
-
rdd.unpersist()
-
此函数属于【eager:立即计算】,立即执行。
-
-
案例代码
# -*- coding:utf-8 -*- # Desc:This is Code Desc import os # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置 import time from pyspark import SparkConf, SparkContext, StorageLevel os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': # 1-首先创建SparkContext上下文环境 #setAppName给当前应用程序起个名字 #setMaster,指定委托谁来提供CPU和内存资源的算力,当这个程序运行在node1上时,local指的是node1机器 conf=SparkConf().setAppName('wordcount')\ .setMaster('local[*]') sc=SparkContext(conf=conf) #2-加载一个文本内容形成RDD rdd=sc.textFile('file:///etc/profile') #3-将RDD的数据缓存到内存 #注意persist或cache是懒加载,延迟计算,需要action触发才行。 rdd.persist(StorageLevel.MEMORY_ONLY) rdd.persist() #上句等价于下面的 #rdd.cache() #4-第一次action操作count,打印结果,查看耗时,慢 #下面的count有2个作用, #作用1:计算RDD的元素个数 #作用2:触发持久化到内存的动作 cnt1=rdd.count() print('第一次action操作count结果',cnt1) #5-第2次action操作count,打印结果,查看耗时,快 #下面的count直接从内存拿数据,会快 cnt2=rdd.count() print('第2次action操作count结果', cnt2) #休眠 time.sleep(10*60) #6-练习手动释放内存 rdd.unpersist() sc.stop()
-
webui界面上可以看到缓存状态
-
没用持久化时:耗时6秒。
-
使用持久化后,第二个job耗时只用了0.6s
-
-
什么时候使用cache/persist ?
- 当RDD被【多】次复用时
- 当RDD之前的计算过程很【 复】,并且他又被多次使用。
什么是序列化和反序列化
标签:rdd2,rdd1,rdd,SparkCore,RDD,sc,os From: https://www.cnblogs.com/nanguyhz/p/16806612.html2个作用,持久化存储到磁盘, 方便网络传输