首页 > 编程语言 >Spark编程

Spark编程

时间:2023-08-13 09:23:06浏览次数:37  
标签:返回 元素 读取 分区 编程 RDD 操作 Spark

Spark编程

image-20200424092626485

image-20200424092719582

Transformation算子-Value型

函数 作用
map(f, preservesPartitioning=False) 通过对这个RDD的每个元素应用一个函数来返回一个新的RDD
flatMap(f,preservesPartitioning=False) 将函数应用于该RDD的所有元素,然后将结果平坦化(压扁),从而返回新的RDD
mapPartitions(f,preservesPartitioning=False) 它的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的
mapPartitionsWithIndex (f, preservesPartitioning=False) 区别在于mapPartitionsWithIndex中传入的函数要求可接收两个参数,第一个参数为分区编号,第二个为对应分区的元素组成的迭代器
filter(f) 对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉
distinct(numPartitions=None) 将RDD中的元素进行去重操作
union(other) 合并两个RDD,结果中包含两个RDD中的所有元素
intersection(other) 返回这个RDD和另一个RDD的交集,输出将不包含任何重复的元素
subtract(other) 返回在RDD1中出现,但是不在RDD2中出现的元素,不去重
sortBy(ascending=True, numPartitions=None) 根据指定的Key进行排序

Transformation算子-Key-Value型(PairRDD)

函数 作用
mapValues(f) 针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理
flatMapValues(f) 完成mapValues处理后,再对结果进行扁平化处理
reduceByKey(func, numPartitions=None, partitionFunc=portable_hash) 相同key值的value值进行对应函数运算,类似于hdp的combiner操作
groupByKey(numPartitions=None, partitionFunc=portable_hash) 将Pair RDD中的相同Key的值放一个序列中
sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: x) 根据key值进行排序,默认升序
keys() 返回一个仅包含键的RDD
values() 返回一个仅包含值的RDD
join(rdd) 可以将两个RDD按照相同的Key值join起来
leftOuterJoin(rdd) 左外连接,与SQL中的左外连接一致
rightOuterJoin(rdd) 右外连接,与SQL中的右外连接一致

Action算子

函数 作用
collect() 返回RDD中的所有元素
count() 返回RDD中的所有元素个数
reduce(f) 通过指定的聚合方法来对RDD中元素进行聚合
take(num) 从RDD中返回前num个元素的列表
takeOrdered(num) 从RDD中返回前num个最小的元素的列表,结果默认升序排列
first() 从RDD中返回第一个元素
top(num,key=None) 从RDD中返回最大的前num个元素列表,结果默认降序排列。如果key参数有值,则先对各元素进行对应处理。注:会把所以数据都加载到内存,所以该方法只有在数据很小时使用
foreach(f) 遍历RDD的每个元素,并执行f函数操作,无返回值
foreachPartiton(f) 对每个分区执行f函数操作,无返回值
saveAsTextFile (path, compressionCodecClass=None) 将RDD中的元素以字符串的格式存储在文件系统中
collectAsMap () 以字典形式,返回PairRDD中的键值对。如果key重复,则后面的value覆盖前面的
countByKey () 以字典形式,返回PairRDD中key值出现的次数

1.1创建RDD

RDD(弹性分布式数据集)是一个容错的、只读的、可进行斌行操作的数据结构,是一个分布在集群各个节点的存放元素的集合

1.1.1 从内存中已有数据创建RDD

有两种常用的方法,一种是转化Seq集合为RDD,另一种是已有RDD转化成新的RDD。

SparkContext类中有两个方法:parallelize和makeRDD将单机数据创建为分布式RDD。

1、parallelize

参数一是要转化的seq集合,参数二是分区数,如果不设分区数默认分区个数为2

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data,3)

2、makeRDD

makeRDD有两种实现方法:一种和parallelize一样,另一种接受的参数类型是Seq[(T,Seq[String])]。第二种方法生成的RDD中保存的是T的值,但是Seq[String]部分的数据会按Seq[(T,Set[String]]的顺序存放到各个分区中,一个Seq[String]对应存到-个分区,为数据提供位置信息。通过preferredLocations可以根据位置信息查看每一个分区的值。这种makeRDD的实现不可以自己指定RDD的分区个数,而是固定为Seq[String]参数的个数。

1.1.2从外部存储创建RDD

从外部存储创建RDD是指直接读取一一个存放在文件系统的数据文件创建RDD,第一种创建RDD的方式常用于测试,这种方式才是用于实践操作的常用方法。

从外部读取数据创建RDD可以有很多种数据来源,通过SparkContext对象的textFile方法读取数据集,支持多种类型数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数。这里演示的输入数据的路径包括HDFS和Linux本地。

(1)从HDFS文件创建RDD

这种方式最为简单也最为常用,直接通过textFile命令读取HDFS文件的位置即可。如图3-3所示,在HDFS上有一个文件“serlootest.txt",读取该文件创建一个RDD。

val test = sc.textFile("/user/root/text.txt")

(2)从Linux本地文件创建RDD

本地文件读取也是通过sc.txtFile(“路径")的方法,在路径前面加上“il://" 表示从本地Linux文件系统读取。在Intelij IDEA开发环境中可以直接读取本地文件,但在spark-shell模式下,要求在所有节点的相同位置保存该文件才可以被读取。例如,在/opt 目录下创建一个文件test,随意输人4行数据,将test文件传到所有节点的/opt目录下。如图3-4所示,读取test文件,并且统计test的数据行数。

val test = sc.textFile("file:///etc/hadoop/conf/core-site.xml")
test.count

1.2查询

Spark RDD提供了丰富的操作方法来用于操作分布式的数据集合。其中包含了转换操作(Transformations)以及行动操作( Actions)两部分。

RDD 有两种类型的操作:转换操作和行动操作。转换操作通过某种函数将一一个RDD转换为一-个新的RDD,但是转换操作是懶操作,不会立刻执行计算。行动操作是用于触发转换操作的操作,这个时候才会真正开始进行计算。

1.2.1使用map()转换数据

map(f,  preservesPartitioning=False)

map是一种基础的RDD转换操作,用于将RDD中的每一个数据元素通过某种雨数进行转换并返回新的RDD,由于是转换操作,不会立即进行计算。转换操作是RDD的第二种创建方法,通过转换已有RDD来生成新的RDD。因为RDD是一个不可变的集合,所以如果对其中的数据进行了某种转换,那么势必会生成一个新的RDD。

1.2.2使用mapPartitions()将每个分区中的内容作为整体来处理的

mapPartitions(f,  preservesPartitioning=False)

它的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的

1.2.3用mapPartitionsWithIndex将每个分区中的内容作为整体来处理的

mapPartitionsWithIndex (f,  preservesPartitioning=False)

区别在于mapPartitionsWithIndex中传入的函数要求可接收两个参数,第一个参数为分区编号,第二个为对应分区的元素组成的迭代器

1.2.4使用sortBy()排序

sortBy(ascending=True, numPartitions=None)

sortBy0是对标准RDD进行排序的方法,有3个如下可输入参数。

(1)第一个参数是一个函数f:(T)=> K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。

(2 )第二个参数是ascending,决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序,如果要降序则需要写人参数false。

(3)第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。

第一个参数是必须输入,而后面的两个参数可以不输入。

1.2.5使用collect()查询

collect函数是一个行动操作,把RDD所有元素转换成数组并返回到Driver端,适用于小数据处理后的返回。因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,如果数据量大的话,会给网络造成很大的压力。所以数据量比较大的时候,尽量不要使用collect函数,因为这可能导致Driver端内存溢出问题。collect 有两种如下操作方式。

(1) cllect(: Array[T])

直接返回RDD中的所有元素,返回类型是一个数组,是最常用的一种方法。

(2) collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
这种用法非常少用,需要提供- 个标准的偏函数,然后保存符合的元素到MappedRDD中。首先定义一-个函数“one”, 用于将collect得到的数组中数值为1的值替换为“: one",将其他值替代为“other”。

1.2.6使用flatMap()转换数据

flatMap(f,  preservesPartitioning=False)

flatMap的操作是将函数应用于RDD之中的每一个元索, 将返回的迭代器(数组、列表等)中的所有元素构成新的RDD。简单来讲,使用flatMap,就是先map再flat,数据会先完成跟map一样的功能,为每一条输 入返回一个迭代器(可迭代的数据类型)。然后将所得到的不同级别的迭代器中的元素全部当成同级别的元索,返回一个元素级别全部相同的RDD。这个转换通常用来切分单词。

1.2.7使用take()方式查询某几个值

take(N)方法用于获取RDD的前N个元素,返回类型为数组。take 与collect 的原理相似,collect 用于获取全部数据,take 获取指定个数的数据。

1.3输出

1.3.1使用union()合并多个RDD

union是一种转换操作,用于将两个RDD的元素合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数和类型需要保持一致。

1.3.2使用filter()进行过滤

filter是一种转换操作,用于过滤RDD中的元素。filter需要一个参数, 参数是一个用于过滤的函数,该函数的返回值为Boolean类型,返回值为true的元素保留,返回值为false的元索过滤,最后结果是返回一个存储符合过滤条件的所有元素的新RDD。

1.3.3使用distinct()进行去重

distinct(numPartitions=None)

distinct()是一个转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。

1.3.4简单的集合操作

RDD是一个分布式的数据集合,所以也有一些与数学中的集合类似的操作,如求交集、并集、补集、笛卡儿积。

1.3.4.1intersection()

intersection()方法用于求出两个RDD的共同元素,也就是找出两个RDD的交集,参数是另一个RDD,顺序先后与结果无关。

1.3.4.2subtract()

subtract()的参数是一个RDD,用于将前一个RDD中在后-一个RDD出现的元索删除,可以看作是求补集的操作,返回值为前一一个RDD去除与后-一个RDD相同元索后的剩余值所组成的新的RDD,所以RDD的顺序会影响结果。

1.3.4.3cartesian()

参数是RDD,笛卡儿积就是将两个集合的元素两两组合成一组。

1.4键值对RDD

由一组组的键值对组成的RDD,这些RDD被称为PairRDD。

1.4.1创建键值对RDD

有很多种创建键值对RDD的方式,很多存储键值对类型数据的数据格式会在读取时直接返回由其键值对组成的PairRDD。当需要将一个普通的RDD转化为一个PairRDD时可以使用map函数来进行操作,传递的函数需要返回键值对。

1.4.2获取RDD的键和值

作为键值对类型的RDD,包含了键和值两部分。Spark 提供了两种方法,分别获取键值对RDD的键和值。keys 返回一个仅包含键的RDD, values 返回一个仅包含值的RDD。

1.4.3合并有相同键的值

reduceByKey()的功能是合并具有相同键的值,作用域是Key/Value类型的键值对,并且是只对每个Key的Value进行处理,当RDD中有多个键相同的键值对时,那么就会对这个Key对应的Values进行处理。

在进行处理时,reduceByKey 将一个Key中的前两个Value传给输人函数,产生一个新的retur值,新产生的returm值与RDD中同一个Key的下一个Value组成两个元索,再被传给输人函数,直到最后只有一个值为止。reduceByKey()不是一个行动操作,而是一个转换操作,返回一个新的RDD。

1.4.4对具有相同键的值进行分组

groupByKey()是对具有相同键的值进行分组,对于一个由类型K的键和类型V的值组成的RDD,通过groupByKey()得到的RDD类型是[K,lterablel[v]]。

1.5关联操作

1.5.1使用join()连接两个RDD

image-20200424080615617

1.5.1.1join

join()是根据键对两个RDD进行内连接,将两个RDD中键相同的数据的值存放在一个元组中,最后只返回两个RDD都存在的键的连接结果。

1.5.1.2rightOuterJoin

rightOuterJoin()是根据键对两个RDD进行右外连接,连接结果返回第二个RDD的所有键的连接结果,不管在第一个RDD中是否存在。

1.5.1.3leftOuterJoin

leftOuterJoin()是对两个RDD的键进行左外连接的方法,与rightOuterJoin相反,返回结果保留第一个RDD的所有键。

1.5.1.4fullOuterJoin

fullOuterJoin是一种全外连接,会保留两个连接的RDD中所有键的连接结果

1.5.2使用zip组合两个RDD

zip函数用于将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

1.5.3使用combineByKey合并相同键的值

combineByKey用于将相同键的数据聚合,并且允许返回类型与输人数据类型不同的返回值,combineByKey函数的定义为:

combineByKey (createCombiner, mergeValue, me rgeCombiners, numPartitions=None)

有以下3个重要的参数:

( 1 )createCombiner:V=>C, V是键值对RDD中的值部分,将该值转换为另- -种类型C,C会作为每一个键的累加器的初始值。

(2 )mergeValue:(C,V)=>C,该丽数把元素V合并到之前的元素C(createCombiner)上(这个操作在每个分区内进行)。

(3) mergeCombiners:(C,C)=>C.该函数把两个元素C合并(这个操作在不同分区间进行)。

由于聚合操作会遍历分区中所有的元素,因此每个元素(这里指的是键值对)的键只有两种情况:以前没出现过或以前出现过。

(1)如果以前没出现过,则执行的是createCombiner方法,createCombiner()会在新遇到的键对应的累加器中赋予初始值,否则执行mergeValue方法。

(2)对于已经出现过的键(key),调用mergeValue来进行聚合操作,对该键的累加器对应的当前值(C格式)与新值(V格式)进行合并。

(3)由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果(全是C格式)进行合并。

1.5.4使用lookup查找指定键的值

lookup(k)作用于Key/Value类型的RDD上,返回指定Key的所有Value值。

1.6数据的存储和读取

image-20200424082924033

1.6.1JSON文件的读取与存储

JSON是一种使用较广的半结构化数据格式。JSON指的是JavaScript 对象表示法( JavaScript Object Notation) ,是轻量级的文本数据交换格式。JSON解析器和JSON库支持许多不同的编程语言。

JSON数据的书写格式是名称/值对。

名称/值对包括字段名称(在双引号中),后面写一个冒号, 然后是值。数据由逗号分隔,花括号保存对象,方括号保存数组。

1.6.1.1JSON文件读取

读取JSON文件,将数据作为文本文件,对JSON数据进行解析,这样的方法可以在所有支持的编程语言中使用。这种方法要求文件每行是一条JSON记录,如果记录跨行,就需要读取整个文件,对每个文件进行解析。

1.6.1.2JSON文件存储

写入JSON文件比读取它要简单得多,因为不需要考虑格式错误问题,也知道写人的数据类型。可以使用之前的包,把由结构化数据解析成的RDD转化为字符串RDD.然后使用Spark的文本文件API写入。

1.6.2CSV文件的读取与存储

逗号分隔值(CSV)文件每行都有固定数目的字段,字段间用逗号隔开[在制表符分隔值文件(TSV )中用制表符隔开]。在CSV的所有数据字段均没有包含换行符的情况下,可以使用textFile()读取并解析数据。

1.6.2.1CSV文件读取

读取CSV文件需要先把文件当成文本文件读取,再对数据进行处理。

如果在字段中嵌有换行符,就需要完整读人每个文件,然后解析各段。

1.6.2.2CSV文件存储

写人CSV/TSV数据相当简单,可以通过重用输出编码器加速。由于CSV不会对每条记录都输出字段名,所以需要创建一种映射关系使输出保持一致。 其中一种方法就是通过函数把各字段都转换为指定顺序的数组。

1.6.3SquenceFile的读取与存储

1.6.3.1SquenceFile的存储

SequenceFile文件的存储非常简单,首先要保证有一个键值对类型的RDD,然后直接调用saveSequenceFile(path)保存数据,自动将键和值转化为Writable类型。

1.6.3.2SquenceFile的读取

Spark有专门读取SequenceFile的接口,可以调用SparkContext中的sequenceFile(path,keyClass,valueClass,minartitions)实现。SequenceFile使用的是Hadoop的Writable类型,所以keyClass和valueClass参数必须定义为正确的Writable类。

1.6.4文本文件的读取与存储

1.6.4.1文本文件的读取

文本文件是最常用的数据文件,通过textFile方法就可以直接读取,一行记录作为一一个元素。

1.6.4.2文本文件的存储

文本文件的存储也是最常用的存储格式之一,当对数据进行处理之后,通常需要将结果保存用于分析或存储。RDD类型的数据可以直接调用saveAsTextFile(path)将数据存储为文本文件。

标签:返回,元素,读取,分区,编程,RDD,操作,Spark
From: https://www.cnblogs.com/simpleness/p/17626148.html

相关文章

  • Spark概述
    Spark概述1.1认识Spark背景:现有的计算框架有:批处理:MapReduce、Hive、Pig…,流式计算:Storm,交互式计算:Impala,Presto,但没有一种框架兼容以上所有的计算框架,spark应运而生1.1.1Spark的发展2009年由Berkeley‘sAMPLab开始编写最初的源代码。2013年加入Apache孵化器项目,很快成为......
  • Spark SQL
    SparkSQL1.1SparkSQL简介SparkSQL是一个用来处理结构化数据的Spark组件。它可被视为一个分布式的SQL查询引擎,并且提供了一个叫作DataFrame的可编程抽象数据模型。SparkSQL的前身是Shark,由于Shark需要依赖于Hive而制约了Spark各个组件的相互集成,因此Spark团队提出了Spark......
  • 学习go语言编程之函数
    函数定义函数的基本组成:关键字func,函数名,参数列表,返回值,函数体,返回语句。示例如下:funcAdd(aint,bint)(retint,errerror){ ifa<0||b<0{ err=errors.New("shouldbenon-negativenumbers") return } returna+b,nil//支持多重返回值}如果参......
  • 学习go语言编程之常量
    什么在常量在Golang中,常量是指在编译期就已知且不可改变的值。字面常量在程序中硬编码的常量值被称为字面常量,如:-12//整数类型常量3.1415926//浮点类型常量3.2+12i//复数类型常量true//布尔类型常量"foo"//字符串常量常量定义使用关键字con......
  • 学习go语言编程之数据类型
    数据类型概述Golang语言内置了如下基础数据类型:布尔类型:bool整型:int8,unit8,int16,uint16,int32,uint32,int64,uint64,int,uint,uintptr浮点类型:float32,float64复数类型:complex64,complex128字符串:string字符类型:rune错误类型:error同时,Golang还支持如下复合类型:指针:pointer数组......
  • 学习go语言编程之流程控制
    Golang支持如下4种流程控制语句:条件语句:if,else和elseif选择语句:switch,case和select循环语句:for,range跳转语句:goto条件语句示例代码:a:=3ifa<5{fmt.Println(a,"litterthan5")}else{fmt.Println(a,"notlitterthan5")}关于条件语句,要注意以下......
  • Spark提交程序到Yarn任务状态一直为Accepted
    正在学习《Spark快速大数据分析》第七章-在集群上运行Spark,写了一个单词数量统计的Spark程序提及到Yarn,但是状态一直是Accepted,等待运行。1、排查了Yarn资源调度器配置,配置的是公平配置,确认无问题<property> <name>yarn.scheduler.fair.allocation.file</name> <value>/opt/ha......
  • SV 第五章 面向对象编程基础
    SystemVerilog验证5面向对象编程基础5.1概述对于Verilog和C语言来说,由于他们不是面向对象变成语言,数据的存储往往是分布式的,例如把数据、地址、指令分别保存在不同的数组里面,不利于程序的解读。面向对象变成使得用户可以创建复杂的数据类型,将数据类型紧密地结合在一起,可以在......
  • pyspark小案例
    ##py_pyspark_demo.py#py_learn##CreatedbyZ.Steveon2023/8/1215:33.##统计文件中各个单词出现的次数#1.导入库frompysparkimportSparkConf,SparkContext#2.创建SparkConf对象和SparkContext对象conf=SparkConf().setMaster("local......
  • 《Rust编程之道》学习笔记一
    《Rust编程之道》学习笔记一序Rust语言的主要特点系统级语言无GC基于LLVM内存安全强类型+静态类型混合编程范式零成本抽象线程安全程序员的快乐何谓快乐?真正的快乐不仅仅是写代码时的“酸爽”,更应该是代码部署到生产环境之后的“安稳”。程序的三大定律程序必须......