首页 > 其他分享 >Spark从零开始

Spark从零开始

时间:2022-12-06 10:04:11浏览次数:51  
标签:RDDs 函数 val RDD rdd 从零开始 Spark


Spark简介

Spark是一个快速且通用的集群计算机平台。

快速

1.Spark扩充了流行的Mapreduce计算机模型

2.Spark是基于内存的计算

通用

Spark的设计容纳了其他分布式系统拥有的功能批处理,迭代式计算,交互查询和流处理等。

Spark是高度开放的

Spark提供了Python,Java,Scala,SQL的API和丰富的内置库。

Spark和其他的大数据工具整合得很好,包括hadoop,kafka等。

Spark历史

1.诞生于2009年,加州大学伯克利分校RAD实验室的一个研究项目,最初是基于Hadoop Mapreduce的。发现Mapreduce在迭代式计算和交互式低效,引入了内存存储

2.2010年3月份Spark开源

3.2011年AMP实验室在Spark上开发高级组件,像Spark Streaming

4.2013年转移到了Apache下,不就便成顶级项目了

Spark组件

Spark包括多个紧密集成的组件

Spark从零开始_数据集

Spark Core

包含Spark的基本功能,包含任务调度,内存管理,容错机制等。

内部定义了RDDs(弹性分布式数据集).

提供了很多APIs来创建和操作这些RDDs。

应用场景,为其他的组件提供底层的服务。

Spark SQL

是Spark处理结构化数据的库,就像Hive SQL,Mysql一样。

应用场景,企业中用来做报表统计。

Spark Streaming

是实时数据流处理组件,类似Storm。

Spark Streaming提供了API来操作实时流数据。

应用场景,企业中用来从Kafka接收数据做实时统计。

Mlib

一个包含通用机器学习功能的包,Machine learning lib。

包含分类,聚类,回归等,还包括模型评估和数据导入。

Mlib提供的上面这些方法,都支持集群上的横向扩展。

应用场景,机器学习

Graphx

是处理图的库(例如社交网络图),并进行图的并行计算。

像Spark Streaming,Spark SQL一样,它也继承了RDD API。

它提供了各种图的操作,和常用的图算法,例如PangeRank算法。

应用场景,图计算。

Cluster Managers

就是集群管理,Spark自带一个集群管理是单独调度器。

常见集群管理包括Hadoop YARN,Apache Mesos

紧密集成的优点

Spark底层优化了,基于Spark底层的组件也得到了相应的优化。

紧密集成,节省了各个组件组合使用时的部署,测试等时间。

向Spark增加新的组件时,其他组件可立刻享用新组件的功能。

Spark和Hadoop比较

Hadoop应用场景

1.离线处理

2.对时效性要求不高


Spark应用场景

1.时效性要求高的场景

2.机器学习等领域

Spark下载和安装

Spark是Scala写的,运行在JVM上,所以运行环境Java7+

如果使用Python API,需要安装Python 2.6+或者Python 3.4+。

Spark 1.6.2 - Scala 2.10  Spark 2.0.0 - Scala 2.11

下载地址:​​http://spark.apache.org/downloads.html​​,

搭Spark不需要Hadoop,如有hadoop集群,可下载相应的版本。

下载完成后,解压目录如下:

Spark从零开始_Spark_02


bin包含用来和Spark交互的可执行文件,如Spark shell。

core,streaming,python...包含主要组件的源代码

examples包含一些单机Spark job,你可以研究和运行这些例子。

Spark Shell

Spark shell使你能够处理分布在集群上的数据。

Spark把数据加载到节点的内存中,因此分布式处理可在秒级完成。

快速迭代式计算,实时查询、分析一般能够在shells完成。

Spark提供了Python shells和Scala shells。

RDDs介绍

Driver program

包含程序的main()方法,RDDs的定义和操作。

它管理很多节点,我们称作executors。

Spark从零开始_数据集_03


SparkContext

Driver programs通过SparkContext对象访问Spark。SparkContext对象代表和一个集群的连接。

在Shell中SparkContext自动创建好了,就是sc。

RDDs

Resilient distributed datasets(弹性分布式数据集,简写RDDs)。

这些RDDs,并行的分布在整个集群中。

RDDs是Spark分发数据和计算的基础抽象类。

一个RDDs是一个不可改变的分布式集合对象

Spark中,所有计算都是通过RDDs的创建、转换操作完成的。

一个RDDs内部由许多partitions(分片)组成。

分片

每个分片包括一部分数据,partitions可在集群不同节点上计算。

分片是Spark并行处理的单元,Spark顺序的,并行地处理分片

RDDs的创建方法

1. 把一个存在的集合传给SparkContext的parallelize()方法(测试的时候使用)

val rdd = sc.parallelize(Array(1, 2, 3, 4), 4)

第1个参数:待并行化处理的集合

第2个参数:分区个数

2. 加载外部数据集

val rdd = sc.textFile("helloSpark.txt") 

Scala的基础知识

Scala的变量声明

在Scala中创建变量的时候,必须使用val或者var

val,变量值不可修改,一旦分配不能重新指向别的值

var,分配后,可以指向类型相同的值。

Scala的匿名函数和类型推断

lines.filter(line => line.contains("world"))

定义一个匿名函数,接收一个参数line

使用line这个String类型变量上的contains方法,并且返回结果。

line的类型不需指定,能够推断出来。

RDD基本操作之Transformation

Transformation

转换,从之前的RDD构建一个新的RDD,像map()和filter()。

1.逐元素Transformation

map():接收函数,把函数应用到RDD的每一个元素,返回新RDD。

filter():接收函数,返回只包含满足filter()函数的元素的新RDD。

flatMap():对每个输入元素,输出多个输出元素。flat压扁的意思,将RDD中元素压扁后返回一个新的RDD。

集合运算:RDDs支持数学集合的计算,例如并集,交集计算。

RDD基本操作之Action
Action介绍

在RDD上计算出来一个结果,把结果返回给driver program或保存在文件系统,count(),save

一些Action说明如下:

1.reduce(func):通过函数func先聚集各分区的数据集,再聚集分区之间的数据,func接收两个参数,返回一个新值,新值再做为参数继续传递给函数func,直到最后一个元素

 

2.collect():以数据的形式返回数据集中的所有元素给Driver程序,为防止Driver程序内存溢出,一般要控制返回的数据集大小

 

3.count():返回数据集元素个数

 

4.first():返回数据集的第一个元素

 

5.take(n):以数组的形式返回数据集上的前n个元素

 

6.top(n):按默认或者指定的排序规则返回前n个元素,默认按降序输出

 

7.takeOrdered(n,[ordering]):

例1:

def main(args: Array[String]) {            
val conf = new SparkConf().setMaster("local").setAppName("reduce")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 10,2)
val reduceRDD = rdd.reduce(_ + _)
val reduceRDD1 = rdd.reduce(_ - _) //如果分区数据为1结果为 -53
val countRDD = rdd.count()
val firstRDD = rdd.first()
val takeRDD = rdd.take(5) //输出前个元素
val topRDD = rdd.top(3) //从高到底输出前三个元素
val takeOrderedRDD = rdd.takeOrdered(3) //按自然顺序从底到高输出前三个元素

println("func +: "+reduceRDD)
println("func -: "+reduceRDD1)
println("count: "+countRDD)
println("first: "+firstRDD)
println("take:")
takeRDD.foreach(x => print(x +" "))
println("\ntop:")
topRDD.foreach(x => print(x +" "))
println("\ntakeOrdered:")
takeOrderedRDD.foreach(x => print(x +" "))
sc.stop
}

输出:

func +: 55
func -: 15 //如果分区数据为1结果为 -53
count: 10
first: 1
take:
1 2 3 4 5
top:
10 9 8
takeOrdered:
1 2 3


(RDD依赖图:红色块表示一个RDD区,黑色块表示该分区集合,下同)


Spark从零开始_数据_04


 


         (RDD依赖图)


 

8.countByKey():作用于K-V类型的RDD上,统计每个key的个数,返回(K,K的个数)

 

9.collectAsMap():作用于K-V类型的RDD上,作用与collect不同的是collectAsMap函数不包含重复的key,对于重复的key。后面的元素覆盖前面的元素

 

10.lookup(k):作用于K-V类型的RDD上,返回指定K的所有V值

例2:

def main(args: Array[String]) {            
val conf = new SparkConf().setMaster("local").setAppName("KVFunc")
val sc = new SparkContext(conf)
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val rdd = sc.parallelize(arr,2)
val countByKeyRDD = rdd.countByKey()
val collectAsMapRDD = rdd.collectAsMap()

println("countByKey:")
countByKeyRDD.foreach(print)

println("\ncollectAsMap:")
collectAsMapRDD.foreach(print)
sc.stop
}

输出:

countByKey:
(B,2)(A,2)
collectAsMap:
(A,2)(B,3)

 

Spark从零开始_数据_05

        (RDD依赖图)

 

11.aggregate(zeroValue:U)(seqOp:(U,T) => U,comOp(U,U) => U):

seqOp函数将每个分区的数据聚合成类型为U的值,comOp函数将各分区的U类型数据聚合起来得到类型为U的值

def main(args: Array[String]) {            
val conf = new SparkConf().setMaster("local").setAppName("Fold")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1,2,3,4),2)
val aggregateRDD = rdd.aggregate(2)(_+_,_ * _)
println(aggregateRDD)
sc.stop
}


输出:

90


步骤1:分区1:zeroValue+1+2=5   分区2:zeroValue+3+4=9

 

步骤2:zeroValue*分区1的结果*分区2的结果=90

 

Spark从零开始_Scala_06

            (RDD依赖图)

 

12.fold(zeroValue:T)(op:(T,T) => T):通过op函数聚合各分区中的元素及合并各分区的元素,op函数需要两个参数,在开始时第一个传入的参数为zeroValue,T为RDD数据集的数据类型,,其作用相当于SeqOp和comOp函数都相同的aggregate函数


例3

def main(args: Array[String]) {             
val conf = new SparkConf().setMaster("local").setAppName("Fold")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)), 2)
val foldRDD = rdd.fold(("d", 0))((val1, val2) => { if (val1._2 >= val2._2) val1 else val2
})
println(foldRDD)
}


输出:

c,5


其过程如下:


1.开始时将(“d”,0)作为op函数的第一个参数传入,将Array中和第一个元素("a",1)作为op函数的第二个参数传入,并比较value的值,返回value值较大的元素


 


2.将上一步返回的元素又作为op函数的第一个参数传入,Array的下一个元素作为op函数的第二个参数传入,比较大小


 


3.重复第2步骤


 


每个分区的数据集都会经过以上三步后汇聚后再重复以上三步得出最大值的那个元素,对于其他op函数也类似,只不过函数里的处理数据的方式不同而已

 

Spark从零开始_Scala_07

             (RDD依赖图)

 

13.saveAsFile(path:String):将最终的结果数据保存到指定的HDFS目录中

 

14.saveAsSequenceFile(path:String):将最终的结果数据以sequence的格式保存到指定的HDFS目录中

RDDs的特性


RDDs的血统关系图

Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图。

Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据。

延迟计算(Lazy Evaluation)

Spark对RDDs的计算是,他们第一次使用action操作的时候

这种方式在处理大数据的时候特别有用,可以减少数据的传输

Spark内部记录metadata表明transformations操作已经被响应了

加载数据也是延迟计算,数据只有在必要的时候,才会被加载进去

RDD.persist()持久化

默认每次在RDDs上面进行action操作时,Spark都重新计算RDDs

如果想重复利用一个RDD,可以使用RDD.persist()

unpersist()从缓存中移除

KeyValue对RDDs

创建KeyValue对RDDs

使用map()函数,返回key/value对

例如,包含数行数据的RDD,把每行数据的第一个单词作为keys

combinByKey()

(createCombiner, mergeValue, mergeCombiners, partitioner)

最常用的基于key的聚合函数,返回的类型可以与输入类型不一样

许多基于key的聚合函数都用到了它,像groupByKey()

遍历partition中的元素,元素的key,要么之前见过的,要么不是。

如果是新元素,使用我们提供的createCombiner()函数

如果是这个partition中已经存在的key,就会使用mergeValue()函数

合计每个partition的结果的时候,使用mergeCombiners()函数



标签:RDDs,函数,val,RDD,rdd,从零开始,Spark
From: https://blog.51cto.com/u_11407799/5914576

相关文章