spark初识
什么是spark?
- Apache Spark 是一个开源集群计算系统,旨在快速进行数据分析。
- 既好写运行时的也快
BDAS
BDAS 是由加利福尼亚大学伯克利分校的AMPLab开发的一套开源大数据分析工具集。其目的是为数据分析和机器学习提供高效、易用的工具。
-
Spark
- Spark 是 BDAS 的核心组件,是一个快速的、通用的大数据处理引擎。与 Hadoop MapReduce 相比,Spark 提供了一个更加灵活且高效的处理模型,支持批处理、交互式查询、流处理以及图计算等多种数据处理模式。
- Spark的核心是 RDD(分布式弹性数据集),它允许用户以容错的方式在集群上进行并行操作。
-
Spark SQL
Spark SQL 是 Spark 中用于结构化数据处理的模块。它提供了一个名为 DataFrame 的编程抽象,类似于关系数据库中的表,可以使用 SQL 语句进行查询和操作。
- 统一的数据访问接口:Spark SQL 支持从多种数据源(如 Hive、Parquet、JSON、JDBC、ORC)读取和处理数据,提供统一的接口来访问不同格式的数据。
- 与SQL的集成:用户可以在 Spark SQL 中直接使用 SQL 语句进行数据查询和操作,同时也可以通过 DataFrame API 进行编程,灵活结合两者的优势。
- 性能优化:Spark SQL 使用 Catalyst 优化器进行查询优化,通过一系列的规则和策略来生成高效的执行计划。
- Hive兼容性:Spark SQL 与 Apache Hive 高度兼容,支持 Hive 的元数据存储、序列化格式和 UDF(用户自定义函数),可以在 Spark 上无缝运行 Hive 查询。
-
Spark Streaming
Spark Streaming 是 Spark 的实时数据流处理扩展。它允许用户从各种数据源(如 Kafka、Flume、Twitter、HDFS)接收和处理实时数据流。
- 微批处理架构:Spark Streaming 将实时数据流分成小批次(微批次),然后使用 Spark 核心 API 进行批处理。每个微批次的数据被视为一个 RDD,进行并行处理。
- 易于编程:Spark Streaming 提供与 Spark 核心 API 类似的高层次 API,用户可以使用熟悉的编程模式来开发实时数据处理应用。
- 容错性和弹性:Spark Streaming 内置容错机制,可以自动从失败中恢复,并支持动态扩展以处理变化的工作负载。
- 与其他Spark组件集成:Spark Streaming 可以与 Spark SQL、MLlib 和 GraphX 无缝集成,允许用户在流数据上执行复杂的数据处理、查询和分析。
-
MLlib
MLlib 是 Spark 的机器学习库,旨在提供可扩展的机器学习算法和实用工具
- 丰富的算法:MLlib 包含各种常用的机器学习算法。
- 高效的分布式计算:MLlib 的算法设计为在分布式环境中高效运行,能够处理大规模数据集。
- 易于使用:MLlib 提供简单易用的 API,支持多种编程语言
- 与其他Spark组件集成:MLlib 可以与 Spark SQL、Spark Streaming 和 GraphX 结合使用
-
GraphX
GraphX 是 Spark 的图计算库,支持图并行计算和图算法。
为什么Hadoop的共享数据慢,spark共享数据快
- Hadoop
- 批处理模型:Hadoop 主要基于 MapReduce 批处理模型,每个任务都要从磁盘读取数据并将中间结果写回磁盘。由于频繁的磁盘 I/O 操作,数据处理速度较慢。
- 数据存储在 HDFS:Hadoop 的数据存储在分布式文件系统(HDFS)上,数据读取和写入需要经过网络传输和磁盘 I/O,进一步增加了延迟。
- Spark
- 内存计算模型:Spark 使用内存作为主要的数据处理和存储介质,采用内存中的 RDD(Resilient Distributed Dataset)来进行数据操作。大部分计算都在内存中进行,减少了磁盘 I/O 的频率。
- 数据持久化选项:Spark 允许用户将中间结果持久化在内存中,并在多个操作之间共享数据,从而大幅提升数据处理速度。
spark的四种运行模式
-
本地模式(Local Mode):以一个独立的进程通过配合其内部的多个线程来模拟整个spark运行时的环境(开发与测试)
-
Kubernetes模式(容器集群):Spark 中的各个角色运行在Kubernetes的容器内部,并组成spark集群环境。
-
集群模式(Standalone Mode):spark中的各个角色以独立进程的形式存在,并组成spark集群环境。
-
集成模式(Cluster Mode):描述:Spark 与外部集群管理器集成,运行在真正的分布式环境中。
- YARN 模式:Spark中的各个角色运行在YARN的容器内部,并组成Spark集群环境
- Mesos 模式:Spark 与 Apache Mesos 集成,提供高效的资源隔离和共享,适合在多租户环境中运行。
初识小结
Spark解决了什么问题?
海量数据的计算,可以进行离线批处理以及实时流计算
Spark有哪些模块?
核心SparkCore、SQL计算(Spark SQL)、流计算(SparkStreming)、图计算(GraphX)、机器学习(MLlib)
Spark特点有哪些?
速度快、简单使用、通用性强、多种模式运行
Spark的运行模式?
- 本地模式
- 集群模式(StandAlone、YRAN、K8S)
- 云模式
Spark的运行角色(对比YARN)?
- Master:集群资源管理(类同ResourceManger)
- Worker:单机资源管理(类同与NodeManager)
- Driver:单任务管理者(类同与ApplicationMaster)
- Executor:单任务执行者(类同与YRAN容器内的Task)
Spark Core
spark Core
1、Spark作业执行的特点
- 只有遇到行动算子的适合,整个spark作业才会被触发执行
- 遇到几次行动算子,就执行几次
2、RDD:弹性分布式数据集
-
弹性:数据量可大可小
RDD类似于容器,但是本身存储的不是数据,是计算逻辑
当遇到行动算子的时候,整个spark作业才会被触发执行,是从第一个RDD开始执行,数据才开始产生流动
数据在RDD之间只是流动关系,不会存储
流动的数据量可以很大,也可以很小,所以称为弹性
-
分布式:
spark本质上它是需要从HDFS中读取数据的,HDFS是分布式,数据block块将来可能会在不同的datanode上
RDD中流动的数据,可能会来自不同的datanode中的block块数据
-
数据集:
计算流动过程中,可以短暂地将RDD看成一个容器,容器中有数据,默认情况下在内存中不会进行存储
可以将RDD的数据数据存储到磁盘中
3、RDD的五大特性!!!
-
RDD是由一系列分区构成
- 读文件时的minPartitions参数只能决定最小分区数,实际读取文件后的RDD分区数,由数据内容本身以及集群的分布来共同决定的
- 若设置minPartitions的大小比block数量还少的话,实际上以block块数量来决定分区数
- 产生shuffle的算子调用时,可以传入numPartitions,实际真正改变RDD的分区数,设置多少,最终RDD就多多少分区
-
算子是作用在每一个分区上的
-
RDD与RDD之间存在一些依赖关系
-
窄依赖 前一个RDD中的某一个分区数据只会到后一个RDD中的某一个分区 一对一的关系
-
宽依赖 前一个RDD中的某一个分区数据会进入到后一个RDD中的不同分区中 一对多的关系 也可以通过查看是否产生shuffle来判断
-
整个spark作业会被宽依赖的个数划分若干个stage,Num(stage) = Num(宽依赖) +1
-
当遇到产生shuffle的算子的时候,涉及到从前一个RDD写数据到磁盘中,从子盘中读取数据到后一个RDD的现象,
注意:第一次触发执行的时候,磁盘是没有数据的,所以会从第一个RDD产生开始执行,当重复触发相同的执行的时候,对于同一个DAG有向无环图而言,会直接从shuffle之后的RDD开始执行,可以直接从磁盘读取数据。
-
一个阶段中,RDD有几个分区,就会有几个并行task任务
-
-
kv算子只能作用在kv的RDD上
-
spark会提供最优的任务计算方式,只移动计算,不移动数据(优先从有数据的地方产生RDD)
4、map操作算子
将RDD中的数据依次取出,传递给后面的函数逻辑,将计算后的数据返回到新的RDD中
将RDD中的数据依次取出,处理完的数据返回下一个RDD直接继续执行后续的逻辑
5、filter操作算子
将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
6、flatMap操作算组
将rdd中的数据每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
7、sample抽样
withReplacement = false, fraction = n
n表示你要抽取多少比例
这个函数主要在机器学习中遇到
8、groupBy算子的使用
1、gourpBy的算子,后面的分组条件是我们自己指定的
2、spark中的groupBy之后的,所有值会被封装到一个Iterable迭代器中存储
9、groupByKey算子
GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上,也就是说只有kv格式的RDD才能调用kv格式的算子。
sprak core中 groupBy算子与groupByKey算子的区别!!!?
-
groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD。
groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要再自己指定,返回值是键和值组成的迭代器构成的kv格式RDD
-
执行shuffle数据量来看。
groupBy产生的shuffle数据量再一定程度上要大于groupByKey产生的shuffle数据量,所以groupByKey算子的执行效率要比groupBy算子的执行效率要高。
10、reduceByKey算子
reduceByKey与groupByKey的区别!!!?
-
相同点
他们都是kv格式的算子,只有kv格式的RDD才能调用
-
不同点
- groupByKey只能是单纯地根据键进行分组,分组后的逻辑可以再后续的处理中调用其他的算子实现
- reduceByKey相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
- groupByKey的灵活度要比reduceByKey的灵活度高,reduceByKey无法做一些复杂的操作,比如求方差。但是groupByKey可以在分组之后的RDD进行方差操作。
11、Union算子
parallelize:将scala的集合变成spark中的RDD
两个RDD想要进行union合并,必须保证元素的格式和数据类型是一致的
分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
12、join算子
join 内连接
right join右连接
left join 左连接
full join 全连接
13、MapValues算子
mapValues算子也是作用在kv格式的RDD上
将每个元素的值传递给后面的函数,进行处理得到新的值,键不变,这个处理后的组合重新返回到新的RDD中
14、mapPartitionBy算子
mapPartitions:一次处理一个分区中的数据
它与map的区别在于,map是每次处理一条数据就返回一条数据到下一个RDD而mapPartitions一次处理一个分区的数据,处理完在返回
最后处理的效果和map是一样的
mapPartitions可以优化与数据库连接的次数
15、sortBy排序
可以根据条件进行排序
16、foreach行动算子
行动算子,就可以触发一次作业执行,有几次行动算子调用,就会触发几次
rdd是懒加载的性质
17、collect
collect将rdd转成合适的scala中的数据结构,在使用collect之后foreach就是scala中的foreach,不会产生作业执行的
18、Cahce
缓存的目的是为了spark core作业执行的时候,缩短rdd的执行链,能够更快的得到结果
缓存的实现方式:
1、需要缓存的rdd调用cache函数
2、persist(StorageLevel.xxx)修改缓存级别
19、checkPoint
永久将执行过程中RDD中的流动的数据存储到磁盘中(hdfs)中
需要设置 checkpoint的路径,统一设置的
checkpoint也相当于一个行动算子,触发作业执行
第二次DAG有向无环图执行的时候,直接从最后一个有检查点的rdd开始向下执行
checkpoint和cache的区别!!!?
- cache是将一个复杂的RDD做缓存,将来执行的时候,只是这个rdd会从缓存中取(当计算逻辑很复杂,消耗性能,使用缓存)
- checkpoint是永久将rdd数据持久化,将来执行的时候,直接从检查点的rdd往后执行(当数据量很大,计算逻辑不复杂使用checkpoint)
写spark core程序的注意事项
1、RDD中无法嵌套使用RDD
2、RDD中无法是使用SparkContext
广播变量
使用SparkContext中的一个功能,将Driver端的变量广播到executor执行的节点上的blockManager中
为什么需要广播变量?
变量在Rriver端,也随着task任务一并发送到executor中执行,后期随着变量的数据量变大,也就意味着,每次发送任务,附带的数据量就会很大,无形之中,降低的执行速度。
standlone 模式 &yarn 模式
- client模式 本地提交执行
- cluster模式 本地集群提交执行
spark yarn client模式提交作业,好处是可以在提交的节点上能够看到详细的运行日志以及运行结果,不好的是,每次提交会在提交节点上启动一个driver,将来如果很多作业都需要提交执行,都以client模式提交的话,客户端会产生很多的driver,容易导致客户端压力过大崩溃,就会导致作业无法提交执行。
client模式一般是用于上线之前测试,当client模式通过没有问题的话,后续会使用yarn cluster模式进行提交。
spark yarn cluster模式相较于client模式而言,不会在提交作业的节点上产生Driver进程,而是选择一个相对空闲的子节点启动ApplicationMaster,这里的ApplicationMaster相当于一个Driver这样的话,就减轻了客户端的压力,可以提交更多的spark作业
提交作业的节点与Driver的启动节点不是同一个的,就会导致我们无法在提交·作业的节点上看到日志,如果是yarn cluster模式提交的话,我们只能够通过yarn logs -applicationld的命令查看yarn的作业日志来看到运行结果
上传到hdfs路径上
如果是打包到集群的话,这里的路径就是hdfs路径
如果是local的话,这里路径就是我们windows
Spark 任务调度
重试机制
- 正常情况下,task任务发送完毕,也执行结束,对应的进程会一次关闭释放
- 如果遇到失败的情况:
- task任务执行失败(网络,权限,外界条件导致的),会触发重试机制,首先task Schedluer重试了三次之后,任务还是失败的,就会由前一个DAG Scheduler重新发送taskSet,重试了4次;若这两个对象都重试了之后,任务还是失败的,那么整个job作业就会报错
- 如果遇到shuffle过程中,由于shuffle的小文件丢失了,导致任务失败,task Scheduler不会触发重试的,就会由DAG Scheduler重新规划taskSet,重新发送任务执行
推测执行机制
当某一个executor中 的任务数过多的时候,就右可能会导致某个任务执行十分缓慢或者失败,那么spark提供了推测执行的机制,讲缓慢执行的任务复制到其他相对空闲的且同属于一个spark作业的executor中执行,只要有一个executor中的该任务执行成功,就以这个任务的结果为准。
spark作业提交可能会用到的一些参数
--num-executors 所提供的executor个数
--executor-cores 每一个executor的核数
--executor-memory 每一个execuotr的内存
--total-executor-cores(代替num-executors 和 executor-cores)
为什么说spark执行的要比mr快
- MR属于细粒度资源调度
- MR中的map任务或者reduce任务,到要执行的时候,才会想yarn申请资源
- 任务的执行速度整体会变慢 --> Job作业执行速度变慢
- 优点:不会造成资源浪费
- 缺点:速度慢
- Spark属于粗粒度资源调度
- spark会在任务调度之前,先将所有的需要的资源先申请下来,然后再将task任务发送到执行
- 优点:速度快,执行过程中无需再次申请资源
- 缺点:有可能申请的资源不够,导致任务执行失败;也有可能申请的资源过多,再spark作业执行期间不会释放,造成资源浪费。
Spark累加器
在Apache Spark中,累加器(Accumulator)是一种可以在集群计算过程中对共享变量进行“加”操作的机制。这种机制使得累加器可以被多个任务(task)安全地更新,从而在分布式计算环境中实现高效的全局聚合。
累加器的特点:
- 只支持累加操作:累加器只支持累加(add)操作,不能进行读取和其他操作。这种设计确保了累加器的操作是幂等的(Idempotent),即多次执行相同的操作不会改变最终结果。
- 驱动程序端访问:虽然任务(task)可以更新累加器的值,但只有驱动程序(Driver Program)能够读取累加器的最终值。这是为了防止在分布式环境中读取不一致的中间值。
- 用于调试和监控:累加器经常用于调试和监控,因为它们可以收集任务执行过程中的一些统计信息,如处理的数据量、错误计数等。