首页 > 其他分享 >pySpark RDD基本用法

pySpark RDD基本用法

时间:2022-08-26 14:57:35浏览次数:67  
标签:运算 parallelize pySpark 用法 RDD intRDD sc intRDD1

pySpark RDD基本用法

RDD的全称是:Resilient Distributed Dataset (弹性分布式数据集),它有几个关键的特性:

RDD是只读的,表示它的不可变性。
可以并行的操作分区集合上的所有元素。

每个RDD的内部,有5个主要特性:

  • A list of partitions (一个分区列表,可以获取所有的数据分区)
  • A function for computing each split(对给定的分区内的数据进行计算的function)
  • A list of dependencies on other RDDs (一个RDD所依赖的父RDD列表)
  • Optionally, a Partitioner for key-value RDDs (可选:如何进行K-V的RDD分区)
  • Optionally, a list of preferred locations to compute each split on(可选:数据做运算时最优的地址,即数据本地性)

1.RDD的三种基本运算

  • Transformation(转换)
    概念:
    将一个RDD通过一系列操作变为另一个RDD的过程,这个操作可能是简单的加减操作,也可能是某个函数或某一系列函数。

注:所有Transformation函数都是Lazy(惰性的),不会立即执行,需要Action函数来触发
Transformation操作不会触发真正的计算,只会建立RDD的关系图

  • Action(动作)

    概念:
    Action操作代表依次计算的结束,返回值不是RDD,将结果返回到Driver程序或输出到外部(文件或文件夹)。

注:所有Action函数立即执行(Eager),比如reduce、saveAsTextFile、count等。

所以Transformation只是建立计算关系,Action才是实际的执行者。
每个Action操作都会形成一个DAG调用SparkCoutext的runJob方法向集群提交请求,所以每个Action操作都对应一个DAG/Job。

  • Persisitence(持久化)

    概念:
    Persisitence操作对于那些会重复使用的 RDD,可以将RDD"持久化"在内存中以供后续使用,以提高执行性能。

注:持久化算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

2 常用算子使用实例

python中将数据抽象成4大数据结构,分别是tuple,list,dict,set。再粗分下,又可以分成list和k-v两种数据结构。针对这两种数据机构,spark中都有相应的算子。

初始化pyspark代码:

  import time
  import argparse

  from pyspark import SparkContext, SparkConf
  from pyspark.sql.dataframe import DataFrame
  from pyspark.sql.session import SparkSession
  import json

  from pyspark.sql import HiveContext
  from pyspark.sql import SQLContext


  sparkconf = SparkConf().setAppName("Python Spark2").set("spark.ui.showConsoleProgress", "false")\
                    .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")\
                    .set("hive.exec.dynamici.partition",True)\
                    .set("hive.exec.dynamic.partition.mode","nonstrict")

  spark = SparkSession.builder.config(conf=sparkconf).enableHiveSupport().getOrCreate()
  sc = spark.sparkContext

2.1基本RDD运算

2.1.1初始函数

使用parallelize函数初始化一个rdd:

  intRDD = sc.parallelize([1,2,3,4,5,6])
  intRDD.collect()
  :[1, 2, 3, 4, 5, 6]

parallelize,本意是平行化的意思,使数据生成于各个计算节点,用于并行计算的意思。还可以使用直接读txt文件的方式来:

  rdd = sc.textFile(file,3)

2.1.2 map函数

再使用map函数,对各个节点做相应的运算。map函数可以遍历所有的节点,生成另外一个RDD。

  intRDD.map(lambda x:x+1).collect()
  :[2, 3, 4, 5, 6, 7]

2.1.3 filter函数

filter函数可以对RDD内的元素进行筛选,并产生新的RDD。

  intRDD.filter(lambda x:x>4).collect()
  :[5, 6]

2.1.4 distinct函数

distinct函数对RDD内的元素进行去重复操作。并不产生新的RDD。

  intRDD = sc.parallelize([1,2,3,4,5,6,6])
  intRDD.distinct().collect()
  :[1, 2, 3, 4, 5, 6]

2.1.5 groupby运算

groupby运算可以通过匿名函数将数据分为多个List。

  intRDD = sc.parallelize([1,2,3,4,5,6,6])
  gRDD = intRDD.groupBy(lambda x: "big" if(x>4) else "small").collect()
  print(gRDD)
  :[('big', <pyspark.resultiterable.ResultIterable object at 0x7f1970214640>), ('small', <pyspark.resultiterable.ResultIterable object at 0x7f1951f15ee0>)]
  print(list(gRDD[0][1]))
  :[5, 6, 6]

2.2 多个RDD"转换"运算

RDD支持求并集,交集,差集和笛卡尔积运算。

  intRDD1 = sc.parallelize([1,2])
  intRDD2 = sc.parallelize([3,4,5])
  intRDD3 = sc.parallelize([5,6,6])
  intRDD4 = intRDD1.union(intRDD2).union(intRDD3).collect() #并集
  print(intRDD4)
  :[1, 2, 3, 4, 5, 5, 6, 6]


  intRDD5 = intRDD2.intersection(intRDD3).collect()#交集
  print(intRDD5)
  :[5]

  intRDD6 = intRDD2.subtract(intRDD3).collect()#差集
  print(intRDD6)
  :[3, 4]

  intRDD7 = intRDD1.cartesian(intRDD3).collect()#笛卡尔积
  print(intRDD7)
  :[(1, 5), (1, 6), (1, 6), (2, 5), (2, 6), (2, 6)]

2.3 基本的action运算

2.3.1 读取运算

  intRDD = sc.parallelize([1,2,3,4,5,6,6])
  print(intRDD.take(1))
  print(intRDD.first())
  print(intRDD.takeOrdered(3))
  print(intRDD.takeOrdered(3,lambda x:-x))

  :[1]
  :1
  :[1, 2, 3]
  :[6, 6, 5]

2.3.2 统计功能

  print(intRDD.stats())
  :(count: 7, mean: 3.857142857142857, stdev: 1.8070158058105026, max: 6.0, min: 1.0)

2.4基本K-V RDD运算

Spark RDD 支持键值(K-V)运算,K-V运算也是Map/Reduce的基础。

2.4.1 同样的初始化方法

  intRDD = sc.parallelize([(1,2),(3,2),(4,5)])
  intRDD.collect()
  :[(1,2),(3,2),(4,5)]

2.4.2 filter函数

filter算子的入参是tuple,可以用x[0]和x[1]区分两个值。

  intRDD = sc.parallelize([(1,2),(3,2),(4,5)])
  intRDD.filter(lambda x:x[0] > 3).collect()
  :[(4, 5)]

2.4.3 mapValues函数

需要注意的是mapValues的入参是value,返回值也是value。

  intRDD = sc.parallelize([(1,2),(3,2),(4,5)])
  intRDD.mapValues(lambda x:x+1).collect()
  :[(1, 3), (3, 3), (4, 6)]

2.4.4 sortByKey函数

通过key值来做排序

  intRDD = sc.parallelize([(1,2),(3,2),(4,5)])
  intRDD.sortByKey().collect()
  intRDD.sortByKey(ascending = False).collect()

  :[(1, 3), (3, 3), (4, 6)]
  :[(4, 5), (3, 2), (1, 2)]

2.4.5 reduceByKey函数

通过key来归纳数据

  intRDD = sc.parallelize([(1,2),(3,2),(4,5),(1,6)])
  intRDD.reduceByKey(lambda x,y:x+y)
  intRDD.collect()
  :[(1,8),(3,2),(4,5)]

2.5 多个 K-V RDD运算

2.5.1 join运算

创建两个RDD通过join做运算拼接

  intRDD1 = sc.parallelize([(1,2),(3,2),(4,5),(1,6)])
  intRDD2 = sc.parallelize([(1,2)])
  intRDD1.join(intRDD2).collect()
  :[(1, (2, 2)), (1, (6, 2))]

默认的join运算是通过intRDD1.key=intRDD2.key做运算的,生成一个新的RDD,key值不变,value值为tuple类型,做值得聚合。

2.5.2 leftOuterJoin运算

创建两个RDD通过leftOuterJoin做运算拼接

  intRDD1 = sc.parallelize([(1,2),(3,2),(4,5),(1,6)])
  intRDD2 = sc.parallelize([(1,2)])
  intRDD1.leftOuterJoin(intRDD2).collect()
  :[(1, (2, 2)), (1, (6, 2)), (3, (2, None)), (4, (5, None))]

leftOuterJoin运算是通过intRDD1的key做left join运算的,生成一个新的RDD,key值为intRDD1的key,value值为tuple类型,做值的聚合。

2.5.3 rightOuterJoin运算

创建两个RDD通过rightOuterJoin做运算拼接

  intRDD1 = sc.parallelize([(1,2),(3,2),(4,5),(1,6)])
  intRDD2 = sc.parallelize([(1,2)])
  intRDD1.rightOuterJoin(intRDD2).collect()
  [(1, (2, 2)), (1, (6, 2))]

rightOuterJoin运算是通过intRDD2的key做right join运算的,生成一个新的RDD,key值为intRDD2的key,value值为tuple类型,做值的聚合。

2.5.4 subtractByKey运算

subtractByKey会清理掉key相同的值:

  intRDD1 = sc.parallelize([(1,2),(3,2),(4,5),(1,6)])
  intRDD2 = sc.parallelize([(1,9)])
  intRDD1.subtractByKey(intRDD2).collect()
  :[(3, 2), (4, 5)]

2.6 K-V 的 action 运算

K-V 的RDD同样支持first等运算,但是也支持一些只有K-V情况下的值。

2.6.1 collectAsMap 运算

可以将输出的值转换成map,但是需要注意的是会将key重复的值抹掉。

  intRDD1 = sc.parallelize([(1,2),(3,2),(4,5),(1,6)])
  print(intRDD1.collectAsMap())
  :{1: 6, 3: 2, 4: 5}

2.6.2 lookup 运算

通过key值查找对应的value值:

  intRDD1 = sc.parallelize([(1,2),(3,2),(4,5),(1,6)])
  print(intRDD1.lookup(1))
  :[2, 6]

2.7 RDD持久化

如果我们相对一个RDD进行复用操作的时候,基于RDD的特性,当以rdd通过transformation转化为另外一个rdd的时候,前面的rdd就会被自动释放,此时还想在原来的rdd身上进行其它操作,需要从源头进行数据计算,这样效率自然会降低。为了能够在rdd重用的时候,直接从内存中加载相关数据,所以我们需要缓存算子(persist/cache)将rdd数据持久化到内存等等其它地方。

  • MEMORY_ONLY RDD中所有的数据都会以未经序列化的java对象的格式优先存储在内存中,如果内存不够,剩下的数据不会进行持久化。很容易出OOM=OutOfMemoryException异常。java的gc频率和对象个数成正比。gc的时候会stop-the-world。

  • MEMORY_ONLY_SER 和MEMORY_ONLY的操作几乎一致,唯一的区别是在内存中存储的不在是未经序列化的java对象,是序列化之后的数据,rdd经过序列化之后,每一个partition就只有一个字节数组,也就是说一个partition就是一个java对象。

  • MEMORY_AND_DISK 和MEMORY_ONLY的唯一区别在于,MEMORY_ONLY不会持久化哪些在内存中持久化的数据,MEMORY_AND_DISK会将哪些在内存中保存不下的数据保存到磁盘中。

  • MEMORY_AND_DISK_SER 就比MEMORY_AND_DISK多了一点,存储的是序列化的java对象

  • DISK_ONLY 磁盘存储,效率太低,一般不用XXXXX_2(MEMORY_ONLY_2等等) 是在上述所有操作的基础之上进行了一个备份。从安全、高可用的角度上考虑,如果备份所消耗的时间,比数据丢失之后从源头重新计算一遍的代价小,我们才考虑使用Xxxx_2。

  • OFF_HEAP 非堆。上述所有的操作都会使用Spark自身的内存资源,所以为了给计算提供足够的资源,可以将持久化的数据保存到非executor中。常见的OFF_HEAP:Tachyon/Alluxio

    from pyspark import StorageLevel
    intRDD1.persist(storageLevel=StorageLevel.MEMORY_ONLY)
    intRDD1.unpersist()

标签:运算,parallelize,pySpark,用法,RDD,intRDD,sc,intRDD1
From: https://www.cnblogs.com/chenjie0949/p/16627523.html

相关文章

  • RDD,DataFrame,DataSet
    RDD:以Person为类型参数,但是Spark框架本身不了解Person类的内部结构。DataFrame:DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个......
  • 10.Java中Map的entrySet() 详解以及用法
    一、Map.entry是什么?Map是java中的接口,Map.Entry是Map的一个内部接口。此接口为泛型,定义为Entry<K,V>。它表示Map中的一个实体(一个key-value对)接口中有get......
  • 使用mybatis的Criteria 查询、条件过滤用法
     借鉴博客:https://cloud.tencent.com/developer/article/1979972 1、如果业务查询中,有的条件要用括号()括起来达到想要的效果,如:第2个and后面的条件要括起来【此业......
  • python json用法 dump和dumps的区别;loads()和load()的区别
    json常用方法方法作用json.dumps()将python对象编码成Json字符串json.loads()将Json字符串解码成python对象json.dump()将python中的对象转化成json储存到......
  • verilog中的task用法
    任务就是一段封装在“task-endtask”之间的程序。任务是通过调用来执行的,而且只有在调用时才执行,如果定义了任务,但是在整个过程中都没有调用它,那么这个任务是不会执行的。......
  • es6 findIndex,find用法
    letarr=[{name:'test1',age:1},{name:'test2',age:2},{name:'test3',age:3}]lettemp=arr.findIndex(function(item){console.log(item.name......
  • MySQL六种窗口函数用法案例
     文章目录一、创建一个案例表二、序号函数-ROW_NUMBER、RANK、DENSE_RANK三、开窗聚合函数-SUM,AVG,MIN,MAX四、分布函数-CUME_DIST五、前后函数-LAG和LEAD六......
  • shell script中的括号用法
    目录大括号变量分界中括号条件判断获取数组元素小括号运行命令并捕获结果给数组赋值大括号变量分界NAME="Tom"echo"Hi,${Tom}"普通变量可以不用{}定界,但获取数组元......
  • oracle中pivot函数的用法
    pivot函数:对查询结果行转列进行统计示例:比如我想查每个用户投资的各种类型基金的分别有多少份额平常的写法:selectuserID,fundtype,sum(shares)fromuserassetgroup......
  • git高级用法
    前言使用Git作为代码版本管理,早已是现在开发工程师必备的技能。可大多数工程师还是只会最基本的保存、拉取、推送,遇到一些commit管理的问题就束手无策,或者用一些不优雅......