首页 > 其他分享 >SparkCore(二)

SparkCore(二)

时间:2022-10-19 16:17:37浏览次数:41  
标签:rdd2 rdd1 rdd SparkCore RDD sc os

RDD的API操作/方法/算子

  • 比如有一个100M的csv文件,需要对它的每个元素操作,比如先+1,再平方,结果保存另一个csv文件。
  • 如下图,如果用传统python思维,不仅每个中间容器占用内存,消耗更多资源,而且每步都耗时。
  • 如果用RDD思维,则每个中间容器只是记住了要做什么,逻辑上该有什么数据,但不立即做,等最后一步再做,合并计算。实际并不占用过多内存,耗时短。

Python思维:每一步都用对应的函数进行操作,基于操作的结果数据,继续进行下一步操作,会产生较多的内存占用.

RDD思维:先预走一遍,记住每一步的操作,知道遇到action算子,表示,所有的处理到头了,然后,看看之前的操作函数能不能合并操作,如图先+1再平方,可以合并成(x+1)^2,这样,直接对RDD一开始的数据,用合并后的操作函数直接操作并且直接输出到结果中,省去了中间的步骤,比如中间的读取很存储,都省去了,不仅节省了内存空间,还使得计算也更加快.

分类

Transformation算子

  • 返回一个新的【RDD】,所有的transformation函数(算子)都是【延迟计算】的,不会立即执行,比如wordcount中的【flatMap】,【map】,【reduceByKey】

  • 下面了解用得较多的即可,不用背记。

  • 转换 含义
    map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
    filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
    flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
    mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
    mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
    sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
    union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
    intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
    distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
    groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
    reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
    sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
    sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
    join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
    cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
    cartesian(otherDataset) 笛卡尔积
    pipe(command, [envVars]) 对rdd进行管道操作
    coalesce(numPartitions) 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作
    repartition(numPartitions) 重新给 RDD 分区
  • 练习下面3类transformation算子

    • glom函数可以看每个分区内有哪些元素。

    • 值类型valueType

      • mapValues:RDD元素是键值对,只对value转换

      • map:将RDD的每个元素进行转换,整体形成新的RDD

      • groupBy:将RDD的每个元素,映射到一个key,再按照相同的key,分组到一起,整体形成新的RDD

      • filter:将RDD的每个元素进行过滤,过滤后的元素,进入新的RDD

      • flatMap:将RDD的每个元素,进行拆解成更小的元素,进入新的RDD,一般元素个数会增加

      • distinct:将RDD的相同的元素,进行去重复,进入新的RDD

      • 下面的交互窗口方便测试

      • 把wordcount头部的一些代码,复制到下面的输入框中,眼睛瞧仔细是哪几句。手动创建上下文对象。

        把下面的>>>后的一句句代码,敲入上图Python Console输入框。

        >>> rdd1=sc.parallelize([5,6,4,7,3,8,2,9,1,10])
        >>> obj=rdd1.collect()   
        >>> type(obj)                                                                   
        <class 'list'>
        >>> rdd1.glom().collect()
        [[5, 6], [4, 7], [3, 8], [2, 9, 1, 10]]                                         
        >>> len(rdd1.glom().collect())
        4
        >>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
        >>> rdd1.getNumPartitions()
        3
        #关于distinct
        rdd1=sc.parallelize([1,2,2,2,3,3])
        rdd2=rdd1.distinct()
        rdd2.collect()
        Out[22]: [3, 1, 2]
        
        
        #map的用法1
        >>> rdd2=rdd1.map(lambda x:x+1)
        >>> rdd2.collect()
        [2, 3, 4, 5, 6, 7, 8, 9, 10]
        >>> rdd2.getNumPartitions()
        3
        >>> def add(x):
        ...     return x+1
        ... 
        #map的用法2
        >>> rdd2=rdd1.map(add)
        >>> rdd2.collect()
        [2, 3, 4, 5, 6, 7, 8, 9, 10]
        #map的用法3
        >>> rdd2=rdd1.map(lambda x:add(x))
        >>> rdd2.collect()
        [2, 3, 4, 5, 6, 7, 8, 9, 10]
        >>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
        >>> rdd3=rdd1.filter(lambda x:x>4)
        >>> rdd3.collect()
        [5, 6, 7, 8, 9]
        >>> rdd1 = sc.parallelize(["a b c","d e f","h i j"])
        >>> rdd1.getNumPartitions()
        4
        >>> rdd1.count()
        3
        >>> rdd2=rdd1.flatMap( lambda x : x.split(" ") )
        >>> rdd2.collect()
        ['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']
        >>> rdd2.count()
        9
        
        >>> rdd1=sc.parallelize([("a", ["apple","banana","lemon"]), ("b", ["grapes"])])
        >>> def f(x):return len(x)
        ... 
        >>> rdd1.mapValues(f).collect()
        [('a', 3), ('b', 1)]
        
        >>> rdd1=sc.parallelize([1,2,3])
        >>> rdd2=rdd1.groupBy(lambda x : 'A' if(x%2==1) else 'B' )
        >>> for obj in rdd2.collect():print(obj)
        ... 
        ('A', <pyspark.resultiterable.ResultIterable object at 0x7f8911d1d640>)         
        ('B', <pyspark.resultiterable.ResultIterable object at 0x7f8911126af0>)
        >>> rdd3=rdd2.map( lambda obj : (obj[0] , list(obj[1]) ))
        >>> rdd3.collect()
        [('A', [1, 3]), ('B', [2])]
        >>> rdd3=rdd2.mapValues(lambda v:list(v) )
        #可以简化写为
        >>> rdd3=rdd2.mapValues(list)
        >>> rdd3.collect()
        [('A', [1, 3]), ('B', [2])]
        
        
        
    • 双值类型DoubleValueType

      • union

      • intersection

        >>> rdd1 = sc.parallelize([("a",1),("b",2)])
        >>> rdd2 = sc.parallelize([("c",1),("b",3)])
        >>> rdd3=rdd1.union(rdd2)
        >>> rdd3.collect()
        [('a', 1), ('b', 2), ('c', 1), ('b', 3)]
        >>> rdd4=rdd1.intersection(rdd2)
        >>> rdd4.collect()
        []
        
        
    • Key-Value值类型

      • groupByKey

      • reduceByKey

      • sortByKey

        >>> rdd1 = sc.parallelize([("a",1),("b",2)])
        >>> rdd2 = sc.parallelize([("c",1),("b",3)])
        >>> rdd3=rdd1.union(rdd2)
        >>> rdd5=rdd3.groupByKey()
        
        >>> for x in  rdd5.collect():print(x)
        ... 
        ('a', <pyspark.resultiterable.ResultIterable object at 0x7f89111396d0>)
        ('b', <pyspark.resultiterable.ResultIterable object at 0x7f89111397c0>)
        ('c', <pyspark.resultiterable.ResultIterable object at 0x7f8911139850>)
        >>> for x in  rdd5.mapValues(list).collect() : print(x)
        ... 
        ('a', [1])
        ('b', [2, 3])
        ('c', [1])
        >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
        >>> rdd1=sc.parallelize(tmp)
        >>> rdd3=rdd1.sortByKey()
        >>> for x in rdd3.collect():print(x)
        ... 
        ('1', 3)
        ('2', 5)
        ('a', 1)
        ('b', 2)
        ('d', 4)
        >>> rdd3.first()
        ('1', 3)
        >>> x = sc.parallelize([1,3,1,2,3])
        >>> y=x.countByValue()
        >>> y
        defaultdict(<class 'int'>, {1: 2, 3: 2, 2: 1})
        >>> 
        
        
        

Action算子

  • 返回的【不是】RDD,可以将结果保存输出,所有的Action算子【立即】执行,比如wordcount中的【foreach和saveAsTextfile】。

  • 下面了解用得较多的即可,不用背记。

  • 动作 含义
    reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
    collect() 在驱动程序中,以数组的形式返回数据集的所有元素
    count() 返回RDD的元素个数
    first() 返回RDD的第一个元素(类似于take(1))
    take(n) 返回一个由数据集的前n个元素组成的数组
    takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
    takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
    saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
    saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
    saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
    countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
    foreach(func) 在数据集的每一个元素上,运行函数func进行更新。通常用foreach(print)打印数据.
    foreachPartition(func) 在数据集的每一个分区上,运行函数func
    • first:返回第一个元素
    • take:返回前几个元素组成的list,
    • top:倒排序并返回前几个元素组成的list
    • count:返回元素的个数
    • 上面Executor都会将执行的结果统一发送回Driver
    • 唯独foreach和saveAsTextFile是不会统一发送回Driver的.
    • foreach:对每个元素,遍历输出到某种介质
    • saveAsTextFile:保存为文件
  • 练习

    • collect:将各Executor节点的数据拉取到Driver节点,形成单机python列表。注意如果RDD数据量太大,可能会让driver进程内存溢出崩溃。所以要慎用。

    • reduce:对所有的元素,聚合成一个值

      • 使用函数【lambda x,y:x+y】对列表[1,2,3,4,5]的元素聚合时,
        
        1-没有初始值,过程是:
        第1轮函数计算:x是元素1,y是元素2,结果是3
        第2轮函数计算:x是上轮结果3,y是元素3,结果是6
        第3轮函数计算:x是上轮结果6,y是元素4,结果是10
        。。。
        后续每轮:     x是上轮结果 ,y是下一个元素
        
        
        2-有初始值时
        初始值=0
        第1轮函数计算:x是初始值0,y是元素1,结果是1
        第2轮函数计算:x是上轮结果1,y是元素2,结果是3
        后续每轮:     x是上轮结果 ,y是下一个元素
        
        2-有初始值时
        初始值=1
        第1轮函数计算:x是初始值1,y是元素1,结果是2
        第2轮函数计算:x是上轮结果2,y是元素2,结果是4
        后续每轮:     x是上轮结果 ,y是下一个元素
        
        

  from pyspark import SparkConf, SparkContext
  import os
  # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
  os.environ['SPARK_HOME'] = '/export/server/spark'
  PYSPARK_PYTHON = "/root/anaconda3/bin/python"
  # 当存在多个版本时,不指定很可能会导致出错
  os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
  os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
  conf = SparkConf().setAppName('wordcount').setMaster('local[*]')
  sc = SparkContext(conf=conf)
  22/01/22 09:52:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  sc.defaultParallelism
  Out[4]: 2
  rdd1=sc.parallelize(range(1,10),3)
  rdd2=rdd1.map(lambda x:x+1)
  list1=rdd2.collect()
                                                                                  In[8]: print(list1)
  [2, 3, 4, 5, 6, 7, 8, 9, 10]
  type(list1)
  Out[9]: list
  rdd1=sc.parallelize([1,2,3,4,5])
  rdd1.reduce(lambda x,y:x+y)
  Out[11]: 15
  a=rdd1.reduce(lambda x,y:x+y)
  print(a)
  15
  sc.parallelize([2,3,4]).first()
  Out[14]: 2
  sc.parallelize([2,3,4,5,6]).take(2)
  Out[15]: [2, 3]
  sc.parallelize([2,3,4,5,6]).take(10)
  Out[16]: [2, 3, 4, 5, 6]
  
  x = sc.parallelize([1,3,1,2,3])
  y=x.top(num=3)
  print(y)
  [3, 3, 2]
  sc.parallelize([2,3,4]).count()
  Out[24]: 3
  words = sc.parallelize (
      ["scala",
      "java",
      "hadoop",
      "spark",
      "akka",
      "spark vs hadoop",
      "pyspark",
      "pyspark and spark"]
   )
  def f(x):print(x)
  words.foreach(f)
  scala
  java
  hadoop
  spark
  akka
  spark vs hadoop
  pyspark
  pyspark and spark

RDD的API操作/方法/算子

分区操作函数

  • mapPartitions

    • map与mapPartitions,过程不一样。map是对RDD的每个【元素】转换,颗粒度更细,mapPartitions是对每个【分区】整体进行转换,颗粒度粗。

  • foreachPartition:

    • foreach和foreachPartition的【结果效果】是一样的,但是【过程】不一样。foreach是对每个【元素】输出,颗粒度更细,foreachPartition是对每个【分区】整体进行输出,颗粒度粗。
  • 代码

    def f(ite):
        for x in ite :
            print(x)
    #f函数被调用了2次        
    sc.parallelize([1,2,3,4,5],2).foreachPartition(f)
    1
    2
    3
    4
    5
    ------------------------------------
    
    def f2(x):
        print(x)
        
    #f2函数被调用了5次     
    sc.parallelize([1,2,3,4,5]).foreach(f2)
    
    1
    2
    3
    4
    5
    
    #结果一样,调用次数不同
    

重分区函数

repartition(n)底层源码就是coalesce(n,shuffle=True)

coalesce(n)底层就是coalesce(n,shuffle=False)

  • repartition(n) :通常用来【增大】分区,默认底层调用coalesce(n, shuffle = true),会产生【shuffle 】,其实不管是增大还是减小,都会产生【shuffle 】。

  • coalesce:一般用来【减少】分区,coalesce(n, shuffle = false),【默认不会】产生shuffle,如果强行增大分区数,那么不起作用,除非你允许使用shuffle ,即coalesce(n, shuffle = True)。

  • 【了解】rdd.partitionBy(分区数):要求被调用的RDD的元素是【键值对形式】。

  • 何时增加分区数目?:当处理的数据很多的时候,可以考虑增加RDD的分区数目

  • 何时减少分区数目?:当对RDD数据进行过滤操作(filter函数)后,考虑是否降低RDD分区数目,不然可能形成很多个小文件.

    • 当对结果RDD存储到外部系统,比如mysql和HDFS
  • 代码

    rdd1=sc.parallelize(range(1,8),4)
    rdd1.getNumPartitions()
    Out[39]: 4
    rdd1.glom().collect()
    Out[40]: [[1], [2, 3], [4, 5], [6, 7]]
    
    rdd1.repartition(2).getNumPartitions()
    Out[42]: 2
    rdd1.repartition(10).getNumPartitions()
    Out[43]: 10
        
    rdd1=sc.parallelize([1, 2, 3, 4, 5], 3)
    rdd1.coalesce(1).getNumPartitions()
    Out[46]: 1
    rdd1.coalesce(4).getNumPartitions()
    Out[47]: 3
    rdd1.coalesce(4,True).getNumPartitions()
    Out[48]: 4
        
    rdd1=sc.parallelize([1,2,3,4,2,4,1],1)
    pairs_rdd=rdd1.map(lambda x:(x,x))
    pairs_rdd.foreach(lambda x:print(x))
    (1, 1)
    (2, 2)
    (3, 3)
    (4, 4)
    (2, 2)
    (4, 4)
    (1, 1)
    rdd2=pairs_rdd.partitionBy(3)
    
    rdd2.glom().foreach(lambda x:print(x))
    [(1, 1), (4, 4), (4, 4), (1, 1)]
    [(3, 3)]
    [(2, 2), (2, 2)]
    

没有key的聚合函数

回顾python中的reduce

from functools import reduce

def add(x,y):
return x+y

sum1=reduce(add,[1,2,3,4,5])

sum1
Out[59]: 15
 
--------------------------------------
sum2=reduce(lambda x,y:x+y,[1,2,3,4,5])

sum2
Out[61]: 15
 
  • aggregate
    • 值=rdd.aggregate( 初始值 , 分区内聚合函数 , 分区间聚合函数)

    • )

注意:上图中可以看到,分区间的聚合,初始值依然参与运算.

  • fold ,如果上面的aggregate的分区内和分区间的聚合函数,逻辑一样,则简写为一次
    • 值=rdd.fold( 初始值 , 聚合函数 )
  • reduce,如果上面的fold的初始值,没有意义时,比如加法时初始值是0,或乘法初始值是1,可以省略。
    • 值=rdd.reduce(聚合函数)
rdd=sc.parallelize([1, 2, 3, 4],2)

rdd.aggregate(0,lambda x,y:x+y,lambda x,y:x+y)
 10
    
rdd.aggregate(1,lambda x,y:x+y,lambda x,y:x+y)
 13
    
rdd.fold(0,lambda x,y:x+y)
 10
    
rdd.fold(1,lambda x,y:x+y)
 13
    
rdd.reduce(lambda x,y:x+y)
 10

综合案例

from pyspark import SparkConf, SparkContext
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    #创建上下文对象
    conf=SparkConf().setAppName('test').setMaster('local[*]')
    sc=SparkContext(conf=conf)
    #定义RDD,有6个数字,分区数是3
    rdd=sc.parallelize([1,2,3,4,5,6],3)
    #用aggregate求和
    from operator import add
    r1=rdd.aggregate(0,lambda x,y:x+y , lambda x,y:x+y)
    #也可以写成
    r2=rdd.aggregate(0,add , add)
    print('aggregate结果,',r1,r2)
    #用fold求和,如果上面的分区内和分区间的聚合算法逻辑一样,则可以简化成fold
    r3=rdd.fold(0,lambda x,y:x+y)
    print('fold结果,',r3)

    #用reduce求和,如果上面的fold的初始值没有意义,则可以简化成reduce
    r4=rdd.reduce(lambda x,y:x+y)
    print('reduce结果',r4)

    sc.stop()

有key的聚合函数

  • RDD的每个元素是【键值对形式】才能调用下列函数。

  • groupByKey

    • rdd2=rdd1.groupByKey() ,rdd1的元素必须是键值对,rdd2的每个元素结构也是键值对,键是key,value是一个迭代器(可看做是容器),groupByKey用于按key聚合.
  • aggregateByKey( 初始值,分区内如何聚合,分区间如何聚合 )

    • 下面分区内聚合时,初始值【会】参与计算,而分区间聚合时,初始值【不会】参与计算。具体见下图

    • 图解如下当初始值分别=0和1时

  • foldByKey

    • foldByKey是由【aggregateByKey】简化而来
    • 当aggregateByKey的分区内和分区间的聚合函数逻辑是【一样】的,可以省略为一个,就变成了foldByKey。
  • reduceByKey

    • reduceByKey是由【foldByKey】简化而来
    • 当foldByKey的初始值没有意义时,可以省略不写,就变成了reduceByKey。
    from pyspark import SparkConf, SparkContext
    import os
    # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
    os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    if __name__ == '__main__':
        #创建上下文对象
        conf=SparkConf().setAppName('test').setMaster('local[*]')
        sc=SparkContext(conf=conf)
        rdd=sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        #用groupByKey,对相同的key,的value聚合
        rdd2=rdd.groupByKey()
        rdd2.mapValues(lambda v: sum(v)).foreach(lambda x: print(x))
        #可以简化写成
        rdd2.mapValues(sum).foreach(lambda x: print(x))
        #用aggregateByKey,对相同的key,的value聚合
        rdd.aggregateByKey(0,lambda x,y:x+y,lambda x,y:x+y).foreach(print)
        from operator import add
        #可以简化写为
        rdd.aggregateByKey(0,add,add).foreach(print)
        rdd.aggregateByKey(1,add,add).foreach(print)
        #用foldByKey,对相同的key,的value聚合
        #当aggregateByKey,的分区内和分区间的聚合逻辑一样时,可以省略为1次
        rdd.foldByKey(0, add).foreach(print)
        #用reduceByKey,对相同的key,的value聚合
        #当foldByKey的初始值没有意义时,可以省略
        rdd.reduceByKey( add).foreach(print)
    

综合案例

from pyspark import SparkConf, SparkContext
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    #创建上下文对象
    conf=SparkConf().setAppName('test').setMaster('local[*]')
    sc=SparkContext(conf=conf)
    lineseq = ["hadoop scala   hive spark scala sql sql",
               "hadoop scala spark hdfs hive    spark",
               "spark hdfs   spark hdfs scala hive spark"]
    rdd=sc.parallelize(lineseq)
    import re
    rdd2=rdd.flatMap(lambda x: re.split('\s+' , x)).map(lambda x:(x,1))
    #用groupByKey统计词频
    rdd2.groupByKey().mapValues(sum).foreach(print)
    #用aggregateByKey统计词频
    from operator import add
    rdd2.aggregateByKey(0,add,add).foreach(print)
    #用foldByKey统计词频
    rdd2.foldByKey(0,add).foreach(print)
    #用reduceByKey统计词频
    rdd2.reduceByKey(add).foreach(print)

面试题:groupByKey和reduceByKey的区别?

  • groupByKey的shuffle的数据量【多】,容易造成子RDD的分区的内存【溢出】。如果做wordcount词频统计,那么需要继续手动mapValues才能得到结果。

  • groupByKey原理:

  • reduceByKey有【2个】阶段的聚合,性能快。在父RDD分区内做了【预聚合】,在子RDD的分区内再次聚合。

  • reduceByKey原理:

关联函数

  • RDD的每个元素是【键值对】才能调用关联函数。

  • 有【join】、【leftOuterJoin】等算子,与SQL语句中的inner join、left outer join作用一样。

  • 案例

    from pyspark import SparkConf, SparkContext
    import os
    # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
    os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    if __name__ == '__main__':
        #创建上下文对象
        conf=SparkConf().setAppName('test').setMaster('local[*]')
        sc=SparkContext(conf=conf)
        rdd1 = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
        rdd2 = sc.parallelize([(1001, "sales"), (1002, "tech")])
        #调用用join
        rdd3=rdd1.join(rdd2)
        rdd3.foreach(print)
        #调用用leftOuterJoin
        rdd4=rdd1.leftOuterJoin(rdd2)
        rdd4.foreach(print)
        
    
    
    
    

标红框的是重点算子

RDD的持久化/缓存

  • 对RDD的转换过程中,如果要对中间某RDD复用多次,比如对RDD进行多次输出,那么默认情况下每次Action都会触发一个job,每个job都会从头开始加载数据并计算,浪费时间。如果将逻辑上的RDD4的数据持久化到具体的存储介质上比如【内存】、【磁盘】,那么只用计算一次该RDD,提高程序性能。

    • 高清图见资料截图
  • RDD调用cache/persist都是【lazy:延迟计算】算子,需要一个【action】算子触发后。RDD的数据才会持久化到内存或磁盘。之后的操作,才会从内存或磁盘中直接拿取数据。

  • 下面3个都是仅持久化到【内存】

    • rdd.cache() --将数据持久化到内存中
    • 等价于 rdd.persist()
    • 等价于 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY)
  • 【了解】更多的存储级别

    • rdd.persist(level : StorageLevel)

    • StorageLevel

      StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
      StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
      StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)
      StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) #此为默认值
      StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
      StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
      StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
      StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)
      StorageLevel.MEMORY_AND_DISK_DESER = StorageLevel(True, True, False, True)
      --以上是python版本的源码,java/scale版本还不太一样,在源码上有一段解释,大意为:
          python的存储总是在python端序列化好了,所以没有看到_SER后缀,因为不用强调,默认都是序列化,反而非序列化需要强调,故有_DESER后缀,表示不要序列化.序列化本质其实就是减少数据体积而已.
      # DISK:磁盘  MEMORY:内存  ONLY:仅仅   _2:保存两份,并且是在不同的机器上  _SER:序列化 
      # off_heap:JVM堆外内存  DESER:
      
    • 上面带有后缀的表示:

    • _ONLY:仅仅将数据保存到【内存】或【磁盘】

    • _2:数据持久化时备份【2】份

    • _SER:将RDD的元素进行【序列化】,压缩,方便网络传输。

    • MEMORY_AND_DISK_2 :将数据保存到内存,如果【内存】不够,继续溢写到【磁盘】,并备份2次。

  • 【了解】释放缓存/持久化

    • 当缓存的RDD数据不再被使用时,考虑释放资源

    • rdd.unpersist()
      
    • 此函数属于【eager:立即计算】,立即执行。

  • 案例代码

    # -*- coding:utf-8 -*-
    # Desc:This is Code Desc
    import os
    # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
    import time
    
    from pyspark import SparkConf, SparkContext, StorageLevel
    
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    if __name__ == '__main__':
        # 1-首先创建SparkContext上下文环境
        #setAppName给当前应用程序起个名字
        #setMaster,指定委托谁来提供CPU和内存资源的算力,当这个程序运行在node1上时,local指的是node1机器
        conf=SparkConf().setAppName('wordcount')\
            .setMaster('local[*]')
        sc=SparkContext(conf=conf)
        #2-加载一个文本内容形成RDD
        rdd=sc.textFile('file:///etc/profile')
        #3-将RDD的数据缓存到内存
        #注意persist或cache是懒加载,延迟计算,需要action触发才行。
        rdd.persist(StorageLevel.MEMORY_ONLY)
        rdd.persist()
        #上句等价于下面的
        #rdd.cache()
        #4-第一次action操作count,打印结果,查看耗时,慢
        #下面的count有2个作用,
        #作用1:计算RDD的元素个数
        #作用2:触发持久化到内存的动作
        cnt1=rdd.count()
        print('第一次action操作count结果',cnt1)
    
        #5-第2次action操作count,打印结果,查看耗时,快
        #下面的count直接从内存拿数据,会快
        cnt2=rdd.count()
        print('第2次action操作count结果', cnt2)
        #休眠
        time.sleep(10*60)
        #6-练习手动释放内存
        rdd.unpersist()
        sc.stop()
    
    • webui界面上可以看到缓存状态

    • 没用持久化时:耗时6秒。

    • 使用持久化后,第二个job耗时只用了0.6s

  • 什么时候使用cache/persist ?

    • 当RDD被【多】次复用时
    • 当RDD之前的计算过程很【 复】,并且他又被多次使用。

什么是序列化和反序列化

2个作用,持久化存储到磁盘方便网络传输

标签:rdd2,rdd1,rdd,SparkCore,RDD,sc,os
From: https://www.cnblogs.com/nanguyhz/p/16806612.html

相关文章

  • SparkCore:累加器和广播变量
    累加器累加器(分布式共享只写变量):用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每......
  • sparkcore案例四:统计每个省份的用户访问量
    题目:/***统计每个省份的用户访问量,最终要求将不同省份用户访问量存放到不同的分区中分区存放规则如下*省份是以包含山0*如果省份包含海1*其他......
  • sparkcore案例三:获取每一种状态码对应的访问量
    题目描述:/***清洗完成的数据中包含一个用户的响应状态码,获取每一种状态码对应的访问量*1、读取清洗完成的数据成为RDD[String]*2、可以把上一步得到的RDD......
  • SparkCore系列(四)函数大全
    有了上面三篇的函数,平时开发应该问题不大了。这篇的主要目的是把所有的函数都过一遍,深入RDD的函数RDD函数大全数据准备        val sparkconf = new Spa......