Spark概述
1.1认识Spark
背景:现有的计算框架有:批处理:MapReduce、Hive、Pig…,流式计算:Storm,交互式计算:Impala,Presto,但没有一种框架兼容以上所有的计算框架,spark应运而生
1.1.1 Spark的发展
2009年由Berkeley‘s AMPLab开始编写最初的源代码。
2013年加入Apache 孵化器项目,很快成为最重要的三大开源项目之一(Hadoop、Spark、Storm)
目前已经有30+公司100+开发者在提交代码
Hadoop最大的厂商Cloudera宣称加大Spark框架的投入来取代MapReduce
Hortonworks的HDP也集成了Spark。
Hadoop厂商MapR投入Spark阵营
Spark2.2.0与2017年7月发布
1.1.2spark特点
速度(比MapReduce快10~100倍)
内存计算引擎,提供了Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销。
DAG引擎,减少多次计算之间中间结果写入到HDFS的开销。
使用多线程池来减少task启动开销,shuffle过程中避免了不必要的sort操作和减少磁盘IO操作。
易用
丰富的API。支持多种语言(JAVA、Scala、Python、R)。
通用
Spark涵盖了批处理、流式计算、交互式计算、图计算、机器学习。这些计算都可以用一个框架(Spark)解决。
运行环境多样化
Spark于Hadoop有了很好的集成,支持多种数据源,可以读写HDFS/HBase/Hive等的数据。
Spark和YARN集成,可以运行在YARN上。
Spark可以运行在Mesos上。
Spark也可以脱离Hadoop等独立运行。
代码简洁
Spark支持使用Scala、Python等语言编写,两者都比java简洁
1.1.3 Spark生态圈
Spark Core:
(1) Spark Core: Spark 核心,提供底层框架及核心支持。
(2) BlinkDB:一个用于在海量数据上运行交互式SQL查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。
(3) Spark SQL:可以执行SQL查询,包括基本的SQL语法和HiveQL语法。读取的数据源包括Hive表、Parquent 文件、JSON数据、关系数据库(如MySQL)等。
(4) Spark Streaming: 流式计算。比如,一个网站的流量是每时每刻都在发生的,如果需要知道过去15分钟或一-个小时的流量,则可以使用Spark Streaming来解决这个问题。
(5) MLBase: MLBase 是Spark生态圈的一部分,专注于机器学习,让机器学习的门槛更低,让-一些可能并不了解机器学习的用户也能方便地使用MLBase。MLBase分为4部分: MLlib、 MLI、ML Optimizer和MLRuntime。
(6) MLlib: MLBase 的一部分, MLlib 是Spark的数据挖掘算法库,实现了- -些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化。
(7) GraphX:图计算的应用在很多情况下处理的数据都是很庞大的,比如在移动社交上面的关系等都可以用图相关算法来进行处理和挖掘,但是如果用户要自行编写相关的图计算算法,并且要在集群中应用,那么难度是非常大的。而使用Spark GraphX就可以解决
1.2搭建环境
1.2.1 搭建单机版环境
单机版的Spark可以满足对Spark的应用程序测试工作,对于初学者而言是非常有益的。安装单机版Spark环境步骤如下。
1、下载Spark安装包到Windows本地
2、将Spark安装包上传到Linux的/opt目录下。
3、将Spark安装包解压到/usr/local目录下
4、使用SparkPi来计算Pi的值,进入到Spark安装包的bin目录下,运行如下所示的命令,其中参数2是指两个并行度
1.2.2 搭建单机伪分布式环境
Spark单机伪分布式是在一台机器上既有Master,又有Worker进程。搭建Spark单机伪分布式环境可在Hadoop伪分布式的基础上进行搭建。
1、将Spark安装包解压到/usr/local目录下。
2、进入到Spark安装包的conf目录下,将spark-env.sh.template复制为spark-env.sh
3、打开spark-env.sh文件,在文件末尾添加如下所示的内容
4、目录切换到sbin目录下启动集群
5、jps查看进程
6、使用计算SparkPi来计算Pi的值
1.2.3 搭建完全分布式环境
完全分布式环境是master/slave模式,即其中一台机器作为主节点master,其他的作为子节点slave。
搭建步骤如下:
1、将Spark安包解压到/usr/local目录下
2、切换到Spark安装包下的/conf目录下,进行配置。
3、s将spark-env.sh.template改名为spark-env.sh,并配置该文件
4、配置slaves文件
5、将spark-defaults.conf.template改名为spark-
defaults.conf,并配置该文件
6、将master主节点中配置好的Spark目录复制到子节点中
7、启动Hadoop集群,创建spark-logs目录
8、在Spark的/sbin目录下启动Spark独立集群
9、浏览器访问http:master:18080
1.3Spark运行架构与原理
1.3.1Spark架构
(1)客户端程序:用户提交作业的客户端。
(2) Driver: 运行Application的main函数并创建SparkContext。Application指的是用户编写的Spark应用程序,包含-一个Driver功能的代码和分布在集群中多个节点上的Executor代码。
(3) SparkContext:应用上下文,控制整个生命周期。
(4) Cluster Manager:指的是在集群上获取资源的外部服务,即资源管理器。目前主要有Standalone和Hadoop YARN, Standalone 是Spark原生的资源管理器,由Master负责资源的分配,也可以理解为使用Standalone时Cluster Manager是Master主节点。若是使用YARN模式,则是由ResourceManager负责资源的分配。
(5 ) Spark Worker:集群中任何可以运行Application的节点,运行一个或多个Executor进程。
(6) Executor:运行在Worker的Task执行器。Executor 启动线程池运行Task,并且负责将数据存在内存或磁盘上,每个Application都会申请各自的Executor来处理任务。
(7) Task:被送到某个Executor的具体工作任务。
1.3.2Spark作业运行流程
Spark有3种运行模式,包括Standalone、YARN和Mesos模式
1、Standalone模式是Spark实现的资源调度框架。其主要节点有Client、Master、和Worker节点
(1)首先,SparkContext连接到Master, 向Master注册并申请资源。
(2 ) Worker定期发送心跳信息给Master并报告Executor状态。
(3) Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker.上获取资源,启动StandaloneExecutorBackend。
(4) StandaloneExecutorBackend向SparkContext注册。
(5) SparkContext 将Application 代码发送给StandaloneExecutorBackend, 并且SparkContext解析Application代码,构建DAG图,并提交给DAG Scheduler,分解成Stage (当碰到Action操作时,就会催生Job,每个Job中含有一个或多个Stage),然后将Stage (或者称为TaskSet )提交给Task Scheduler, Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行。
(6) StandaloneExecutorBackend会建立Executor 线程池,开始执行Task,并向SparkContext报告,直至Task完成。
(7)所有Task完成后,SparkContext 向Master注销,释放资源。
2、YARN运行模式
YARN模式可根据Driver在集群中的位置可分为YARN-Client模式和YARN-Cluster模式。
(1 )客户端生成作业信息提交给ResourceManager。
(2) ResourceManager在某-一个NodeManager (由YARN决定)启动Container, 并将Application Master分配给NodeManager。
(3 ) NodeManager接收到ResourceManager的分配,启动Application Master并初始化作业,此时NodeManager就称为Driver。
(4) Application向ResourceManager申请资源,ResourceManager 分配资源的同时通知其他NodeManager启动相应的Exccutor。
(5) Executor向NodeManager上的Application Master注册汇报并完成相应的任务。图1-19是YARN客户端模式的作业运行流程。Application Master仅仅从YARN中申请资源给Executor,之后Client会与Container通信进行作业的调度。
(1)客户端生成作业信息提交给ResourceManager。
(2 ) ResourceManager在本地NodeManager启动Container,并将Application Master分配给该NodeManager。
(3) NodeManager接收到ResourceManager的分配,启动Application Master 并初始化作业,此时这个NodeManager就称为Driver。
(4) Application向ResourceManager申请资源,ResourceManager 分配资源同时通知其他NodeManager启动相应的Executor。
(5) Executor向本地启动的Application Master注册汇报并完成相应的任务。
1.3.3Spark核心数据集RDD
RDD(Resilient Distributed Datasets弹性分布式数据集),是Spark中最重要的概念,可以简单的把RDD理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据分布存储于一批机器中(内存或磁盘中)
RDD的特点:
它是集群节点上的不可改变的、已分区的集合对象;
通过并行转换的方式来创建如(map、filter、join等);
失败自动重建;
可以控制存储级别(内存、磁盘等)来进行重用;
必须是可序列化的;
RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需要根据它的lineage就可重新计算出来,而不需要做特定的checkpoint;
RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的;
RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能有大的下降但不会差于现在的MapReduce;
RDD主要有两大类操作,分别为转换( Transformations)和操作( Actions)。转换主要是指把原始数据集加载到RDD以及把-一个RDD转换为另外-一个RDD,而操作主要指把RDD存储到硬盘或触发转换执行。
所有的转换都是懒惰(Lazy)操作,他只会记住这种转换动作,只有当Action操作时才会执行
1.4宽窄依赖
RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。
能从其他RDD通过确定操作创建新的RDD的原因是RDD含有从其他RDD衍生(即计算)出本RDD的相关信息(即血统,Lineage)
Dependency代表了RDD之间的依赖关系,分为窄依赖和宽依赖。
窄依赖指的是子RDD( Child RDD)的一个分区只依赖于某个父RDD( Parent RDD(S) )中的一个分区( Partition )。
宽依赖指的是子RDD ( Child RDD)的每-一个分区( Partition )都依赖于某个父RDD(Parent RDD(s)) 的一个以上分区 ( Partition )。
Shuffle操作是区分窄依赖和宽依赖的根据,Shuffle过程不仅会产生大量网络传输开销,也会带来大量的磁盘IO开销。
1.5阶段划分
DAG(有向无环图,Directed Acyclic Graph),原始的RDD通过一系列的转换就形成了DAG,在spark里每一个操作生成一个RDD,RDD之间连一条边,最后这些RDD和他们之间的边组成一个有向无环图。
Spark根据DAG图中的RDD依赖关系,把一个作业分成多个阶段。对于宽依赖和窄依赖而言,窄依赖对于作业的优化很有利。只有窄依赖可以实现流水线优化,宽依赖包含Shuffle过程,无法实现流水线方式处理。
Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分Stage,具体划分方法是:
在DAG中进行反向解析,遇到宽依赖就断开
遇到窄依赖就把当前的RDD加入到Stage中
将窄依赖尽量划分在同一个Stage中,可以实现流水线计算
1.6RDD运行过程
通过上述对RDD概念、依赖关系和Stage划分的介绍,结合之前介绍的Spark运行基本流程,再总结一下RDD在Spark架构中的运行过程:
(1)创建RDD对象;
(2)SparkContext负责计算RDD之间的依赖关系,构建DAG;
(3)DAGScheduler负责把DAG图分解成多个Stage,每个Stage中包含了多个Task,每个Task会被TaskScheduler分发给各个WorkerNode上的Executor去执行。
1.7共享变量
1.7.1共享变量-累加器
accumulator:一个全局共享变量,可以完成对信息进行聚合操作。
counter = sc.accumulator(0)
rdd = sc.parallelize(range(10))
def increment(x):
global counter
counter += x
rdd.foreach(increment)
print("Counter value: ", counter.value)
注:1、 累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。
2、累加器不是一个调优的操作,因为如果不这样做,结果是错的
1.7.2共享变量-广播变量
Broadcast:一个全局共享变量,可以广播只读变量。
TorrentBroadcast:点到点的传输,有效避免单点故障,提高网络利用率,减少节点压力
Broadcast :
一般用于处理共享配置文件,通用的数据子,常用的数据结构等等;不适合存放太大的数据
不会内存溢出,因为其数据的保存的 Storage Level 是 MEMORY_AND_DISK 的方式
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
注:
1、能不能将一个RDD使用广播变量广播出去?
不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
2、 广播变量只能在Driver端定义,不能在Executor端定义。
3、 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
1.8RDD持久化
1.8.1持续化方法
persist方法:
•支持StorageLevel
•可以持久化一个RDD在内存或磁盘中
cache方法:
•仅缓存到内存
•本质上是persist(MEMORY_ONLY)的别名
1.8.2持续化存储等级
StorageLevel类型 | 类型描述 |
---|---|
MEMORY_ONLY (默认级别) | 将RDD以JAVA对象的形式保存到JVM内存 如果分片太大,内存缓存不下,就不缓存 |
MEMORY_ONLY_SER | 将RDD以序列化的JAVA对象形式保存到内存 |
DISK_ONLY | 将RDD持久化到硬盘 |
MEMORY_AND_DISK | 将RDD数据集以JAVA对象的形式保存到JVM内存中, 如果分片太大不能保存到内存中,则保存到磁盘上, 并在下次用时重新从磁盘读取。 |
MEMORY_AND_DISK_SER | 与MEMORY_ONLY_SER类似,但当分片太大, 不能保存到内存中,会将其保存到磁盘中 |
XXX_2 | 上述5中level后缀添加2代表两副本 |
OFF_HEAP | RDD实际被保存到Tachyon |
RDD根据useDisk、useMemory、deserialized、off_heap、replication五个参数的组合提供了11种存储级别
from pyspark import StorageLevel
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist()
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
>>> rdd.getStorageLevel()
StorageLevel(False, True, False, False, 1)
>>> rdd. unpersist()
>>> rdd.persist(StorageLevel. DISK_ONLY)
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala: 195
>>> rdd.getStorageLevel()
StorageLevel(True, False, False, False, 1)
标签:RDD,Driver,Application,概述,Executor,Master,Spark
From: https://www.cnblogs.com/simpleness/p/17626147.html