Spark
Spark 作为分布式计算框架,基于 MapReduce 框架开发,但是也有以下区别:
- Spark 基于 Scala 语言开发,MR 基于 Java 语言开发;Scala 是函数式编程语言,对于函数间相互调用效率更高;而 Java 是面向对象语言,函数间调用必须依赖于对象,效率低。
- MapReduce 核心是一次性计算,不适合迭代计算,因为中间结果必须存盘,后续步骤依赖于当前中间结果,会造成大量的磁盘IO,导致效率低;Spark 优化了执行逻辑,前一步执行的结果保存到内存而不是磁盘,所以可以提取出重复的步骤,封装成函数。
- Spark 的 RDD(弹性分布式数据集)支持数据的自动缓存,使得重复的数据可以快速访问,无需重新从磁盘读取。
- Spark 和 MapReduce 各有利弊,MapReduce 因为依赖于磁盘所以能够保证程序一定运行完毕,Spark 因为依赖于内存,可能会出现内存溢出等问题。
- 如果旧项目是基于 Hadoop 的,现在需要迁移到 Spark,那么就可以使用
On Yarn
模型,基于 Yarn 调度。
Spark 内置模块:
Spark SQL
:提供 HQL 与 Spark 进行交互处理的 API,每个数据表当作一个 RDD,Spark SQL 查询被转化为 Spark 操作;Spark Streaming
:对实时数据流进行处理和控制;MLib
:机器学习算法库;Graphix
:控制图、并行图操作和计算的一组算法和工具的集合;Spark Core
:底层 RDD API基础库,其他模块都会依赖;
RDD
RDD(Resilient Distributed Datasets,弹性分布式数据集)在 Spark 计算过程中扮演重要的角色。Spark 的计算任务始于 SparkContext
上下文对象,它负责资源的申请、任务的调度以及 RDD 的管理和创建。
RDD 特点
RDD 具有以下特点:
- 每个 RDD 都是只读的,是分布在集群中的只读对象的集合;
- 一个 RDD 由多个 Partition 分区组成,每个分区可能分布在不同节点的内存中;
- RDD 之间可以执行转换操作,转换但是不计算,每次计算后的结果都需要保存为新的 RDD;
RDD 操作
RDD 主要包含了以下两种方法,通过两种方法不断对数据处理保存,最终实现高效、可靠的大数据处理任务:
- 转换算子:
- 将 Scala 集合或者 Hadoop 输入数据构造一个新的 RDD;
- 通过已有的 RDD 产生新的 RDD;
- 惰性执行,只记录转换关系不执行计算;
- 常见操作包括
map、filter、flatmap、union、distinct、sortbykey
;
- 动作算子:
- 通过 RDD 计算得到新的一组值;
- 触发真正的计算;
- 常见操作包括
first、count、collect、foreach、saveAsTextFile
等;
比如 rdd.map(_+1).saveAsTextFile("hdfs://node01:9000")
操作,map
对应转换,saveAsTextFile
对应计算。
RDD 依赖
RDD 中区分宽窄依赖,因为宽依赖回复起来速度很慢。
窄依赖:
- 父 RDD 中分区最多被一个子 RDD 分区使用;
- 子 RDD 如果部分分区数据丢失或损坏,只需要从父 RDD 重新计算恢复;
- 窄依赖可以并行地生成子 RDD 的分区,使得任务可以在多个节点上并行执行;
- 常见操作如
map、filter、union、sample
宽依赖:
- 主要发生在需要跨分区的数据重组或者聚合操作;
- 子 RDD 分区依赖 父RDD 的所有分区;
- 子 RDD 如果部分或者全部分区数据丢失或损坏,必须从所有父 RDD 分区重新计算;
- 宽依赖会导致数据在不同的分区间进行洗牌,数据需要重新分配到不同的分区,涉及到大量的数据移动和 IO 操作,执行宽依赖时性能可能会受到影响;
- 例如:
groupByKey、reduceByKey、sortByKey、join
;
举个例子
- 首先通过
SparkContext
上下文对象加载 HDFS 某个目录下的文件; - 针对每一行数据按照分表符切分成多个单词,得到新的单词集合;
- 对每个单词执行 map 处理,具体就是标记次数为 1;
- 按照 Key 值执行 Reduce 处理,将 Key 相同的所有 Value 累加求和;(代码中的
_
是 Scala 语法中的占位符) - 将最终结果保存到 HDFS 下的文件目录。
由于每个阶段都会保存新的 RDD,所以如果某阶段计算失败,就可以利用上一个步骤的 RDD 结果重新执行计算,而不需要从源头开始计算。
Spark 程序
程序架构
Spark 也是主从架构,包含管理节点 Master、工作节点 Worker。
- 构建运行环境,Driver 创建
SparkContext
上下文; - SparkContext 向资源管理器(standalone、Mesos、Yarn)申请
Executor
资源,资源管理器启动StandaloneExecutorBackend
; - Executor 向
SparkContext
申请 Task; - RDD 对象构建成 DAG 图,
DAG Scheduler
将 DAG图解析为 Stage,每个 Stage 对应一个 TaskSet,然后将其发送给TaskScheduler
,由TaskScheduler
调度发送给 Executor 运行; - Task 在
Executor
运行完毕后释放资源;
Spark 作业运行模式
Local 模式:
Spark 应用以多线程方式运行在本地,方便调试。
- local:启动一个 Executor;
- local[k]:启动 k 个 Executor;
- local[*]:启动 CPU 核数个 Executor;
standalone 模式:
分布式集群运行,不依赖于第三方资源管理系统(Yarn、Mesos等)。
- 采用 Master-Slave 结构;
- Driver 运行于 worker,Master 只负责集群管理;
- Driver 向 Master 申请作业运行所需资源,Master 分配 Executor 给 Driver;
- Driver 将解析后的 Task 调度到 Executor 上运行,并且不间断监听运行情况,汇报给 Master;
- 整体架构类似于 Yarn,其中
Master——ResourceManager
、Driver——Container
、Executor——ApplicationMaster
; - 为了避免 Master 单点故障,由 Zookeeper 负责 Master 节点集群高可用。
Spark On Yarn:
分布式部署集群、资源、任务监控由 Yarn 管理,目前仅支持粗粒度资源分配方式。
这种模式的工作流程:
Driver
解析 Spark 数据生成一定量的 Task,然后需要开始申请Hadoop
资源;- 但是 Dirver 无法申请,需要借助
ApplicationMaster
申请,所以在 Driver 外部套用一个 ApplicationMaster 用作资源申请; - ApplicationMaster 通过向 ResourceManager 申请 Container 资源,到手后通知 Driver;
- Driver 将解析好的作业分发到 Container 节点上运行。
Yarn 包括 Yarn-Client
(适用于交互和调试)、Yarn-Cluster
(适用于生产环境)两种模式。
程序执行及底层逻辑
生成逻辑执行计划
通过 Scala 语言编写 Spark 逻辑查询计划:
sc.textFile(inputArg)
.flatMap(_.split("\t"))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile(outArg)
上述是一个 WordCount 程序的逻辑执行计划,每个阶段会生成一个单独的 RDD。
生成物理执行计划
物理查询计划关注于底层数据状态:
- 如图一个 RDD 有 4个 Partition,那么前三个 RDD 步骤作用于 RDD 迭代变换;这三个步骤可以划分到一个 Stage,因为可以多个分区并行执行(生成 4 个任务分别调度到不同计算节点);
- 接着第四个 RDD 步骤需要进行 shuffle,所以需要单独划分 Stage(可以生成 3个任务调度到不同计算节点上运行);
任务调度与执行
将各个阶段生成的 Task 调度到 Executor
上运行,执行过程中会不停向 Driver 汇报进度。