首页 > 其他分享 >Spark Core的知识碎片

Spark Core的知识碎片

时间:2024-07-15 19:41:02浏览次数:16  
标签:Core 碎片 RDD 算子 Spark 执行 数据 spark

spark初识

什么是spark?

  • Apache Spark 是一个开源集群计算系统,旨在快速进行数据分析。
  • 既好写运行时的也快

BDAS

BDAS 是由加利福尼亚大学伯克利分校的AMPLab开发的一套开源大数据分析工具集。其目的是为数据分析和机器学习提供高效、易用的工具。

  1. Spark

    • Spark 是 BDAS 的核心组件,是一个快速的、通用的大数据处理引擎。与 Hadoop MapReduce 相比,Spark 提供了一个更加灵活且高效的处理模型,支持批处理、交互式查询、流处理以及图计算等多种数据处理模式。
    • Spark的核心是 RDD(分布式弹性数据集),它允许用户以容错的方式在集群上进行并行操作。
  2. Spark SQL

    Spark SQL 是 Spark 中用于结构化数据处理的模块。它提供了一个名为 DataFrame 的编程抽象,类似于关系数据库中的表,可以使用 SQL 语句进行查询和操作。

    • 统一的数据访问接口:Spark SQL 支持从多种数据源(如 Hive、Parquet、JSON、JDBC、ORC)读取和处理数据,提供统一的接口来访问不同格式的数据。
    • 与SQL的集成:用户可以在 Spark SQL 中直接使用 SQL 语句进行数据查询和操作,同时也可以通过 DataFrame API 进行编程,灵活结合两者的优势。
    • 性能优化:Spark SQL 使用 Catalyst 优化器进行查询优化,通过一系列的规则和策略来生成高效的执行计划。
    • Hive兼容性:Spark SQL 与 Apache Hive 高度兼容,支持 Hive 的元数据存储、序列化格式和 UDF(用户自定义函数),可以在 Spark 上无缝运行 Hive 查询。
  3. Spark Streaming

    Spark Streaming 是 Spark 的实时数据流处理扩展。它允许用户从各种数据源(如 Kafka、Flume、Twitter、HDFS)接收和处理实时数据流。

    • 微批处理架构:Spark Streaming 将实时数据流分成小批次(微批次),然后使用 Spark 核心 API 进行批处理。每个微批次的数据被视为一个 RDD,进行并行处理。
    • 易于编程:Spark Streaming 提供与 Spark 核心 API 类似的高层次 API,用户可以使用熟悉的编程模式来开发实时数据处理应用。
    • 容错性和弹性:Spark Streaming 内置容错机制,可以自动从失败中恢复,并支持动态扩展以处理变化的工作负载。
    • 与其他Spark组件集成:Spark Streaming 可以与 Spark SQL、MLlib 和 GraphX 无缝集成,允许用户在流数据上执行复杂的数据处理、查询和分析。
  4. MLlib

    MLlib 是 Spark 的机器学习库,旨在提供可扩展的机器学习算法和实用工具

    • 丰富的算法:MLlib 包含各种常用的机器学习算法。
    • 高效的分布式计算:MLlib 的算法设计为在分布式环境中高效运行,能够处理大规模数据集。
    • 易于使用:MLlib 提供简单易用的 API,支持多种编程语言
    • 与其他Spark组件集成:MLlib 可以与 Spark SQL、Spark Streaming 和 GraphX 结合使用
  5. GraphX

    GraphX 是 Spark 的图计算库,支持图并行计算和图算法。

为什么Hadoop的共享数据慢,spark共享数据快

  1. Hadoop
    • 批处理模型:Hadoop 主要基于 MapReduce 批处理模型,每个任务都要从磁盘读取数据并将中间结果写回磁盘。由于频繁的磁盘 I/O 操作,数据处理速度较慢。
    • 数据存储在 HDFS:Hadoop 的数据存储在分布式文件系统(HDFS)上,数据读取和写入需要经过网络传输和磁盘 I/O,进一步增加了延迟。
  2. Spark
    • 内存计算模型:Spark 使用内存作为主要的数据处理和存储介质,采用内存中的 RDD(Resilient Distributed Dataset)来进行数据操作。大部分计算都在内存中进行,减少了磁盘 I/O 的频率。
    • 数据持久化选项:Spark 允许用户将中间结果持久化在内存中,并在多个操作之间共享数据,从而大幅提升数据处理速度。

spark的四种运行模式

  1. 本地模式(Local Mode):以一个独立的进程通过配合其内部的多个线程来模拟整个spark运行时的环境(开发与测试)

  2. Kubernetes模式(容器集群):Spark 中的各个角色运行在Kubernetes的容器内部,并组成spark集群环境。

  3. 集群模式(Standalone Mode):spark中的各个角色以独立进程的形式存在,并组成spark集群环境。

  4. 集成模式(Cluster Mode):描述:Spark 与外部集群管理器集成,运行在真正的分布式环境中。

    • YARN 模式:Spark中的各个角色运行在YARN的容器内部,并组成Spark集群环境
    • Mesos 模式:Spark 与 Apache Mesos 集成,提供高效的资源隔离和共享,适合在多租户环境中运行。

初识小结

Spark解决了什么问题?

海量数据的计算,可以进行离线批处理以及实时流计算

Spark有哪些模块?

核心SparkCore、SQL计算(Spark SQL)、流计算(SparkStreming)、图计算(GraphX)、机器学习(MLlib)

Spark特点有哪些?

速度快、简单使用、通用性强、多种模式运行

Spark的运行模式?

  • 本地模式
  • 集群模式(StandAlone、YRAN、K8S)
  • 云模式

Spark的运行角色(对比YARN)?

  • Master:集群资源管理(类同ResourceManger)
  • Worker:单机资源管理(类同与NodeManager)
  • Driver:单任务管理者(类同与ApplicationMaster)
  • Executor:单任务执行者(类同与YRAN容器内的Task)

Spark Core

spark Core

1、Spark作业执行的特点

  1. 只有遇到行动算子的适合,整个spark作业才会被触发执行
  2. 遇到几次行动算子,就执行几次

2、RDD:弹性分布式数据集

  • 弹性:数据量可大可小

    RDD类似于容器,但是本身存储的不是数据,是计算逻辑

    当遇到行动算子的时候,整个spark作业才会被触发执行,是从第一个RDD开始执行,数据才开始产生流动

    数据在RDD之间只是流动关系,不会存储

    流动的数据量可以很大,也可以很小,所以称为弹性

  • 分布式:

    spark本质上它是需要从HDFS中读取数据的,HDFS是分布式,数据block块将来可能会在不同的datanode上

    RDD中流动的数据,可能会来自不同的datanode中的block块数据

  • 数据集:

    计算流动过程中,可以短暂地将RDD看成一个容器,容器中有数据,默认情况下在内存中不会进行存储

    可以将RDD的数据数据存储到磁盘中

3、RDD的五大特性!!!

  1. RDD是由一系列分区构成

    • 读文件时的minPartitions参数只能决定最小分区数,实际读取文件后的RDD分区数,由数据内容本身以及集群的分布来共同决定的
    • 若设置minPartitions的大小比block数量还少的话,实际上以block块数量来决定分区数
    • 产生shuffle的算子调用时,可以传入numPartitions,实际真正改变RDD的分区数,设置多少,最终RDD就多多少分区
  2. 算子是作用在每一个分区上的

  3. RDD与RDD之间存在一些依赖关系

    • 窄依赖 前一个RDD中的某一个分区数据只会到后一个RDD中的某一个分区 一对一的关系

    • 宽依赖 前一个RDD中的某一个分区数据会进入到后一个RDD中的不同分区中 一对多的关系 也可以通过查看是否产生shuffle来判断

    • 整个spark作业会被宽依赖的个数划分若干个stage,Num(stage) = Num(宽依赖) +1

    • 当遇到产生shuffle的算子的时候,涉及到从前一个RDD写数据到磁盘中,从子盘中读取数据到后一个RDD的现象,

      注意:第一次触发执行的时候,磁盘是没有数据的,所以会从第一个RDD产生开始执行,当重复触发相同的执行的时候,对于同一个DAG有向无环图而言,会直接从shuffle之后的RDD开始执行,可以直接从磁盘读取数据。

    • 一个阶段中,RDD有几个分区,就会有几个并行task任务

  4. kv算子只能作用在kv的RDD上

  5. spark会提供最优的任务计算方式,只移动计算,不移动数据(优先从有数据的地方产生RDD)

4、map操作算子

将RDD中的数据依次取出,传递给后面的函数逻辑,将计算后的数据返回到新的RDD中

将RDD中的数据依次取出,处理完的数据返回下一个RDD直接继续执行后续的逻辑

5、filter操作算子

将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条

6、flatMap操作算组

将rdd中的数据每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合

7、sample抽样

withReplacement = false, fraction = n

n表示你要抽取多少比例

这个函数主要在机器学习中遇到

8、groupBy算子的使用

1、gourpBy的算子,后面的分组条件是我们自己指定的

2、spark中的groupBy之后的,所有值会被封装到一个Iterable迭代器中存储

9、groupByKey算子

GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上,也就是说只有kv格式的RDD才能调用kv格式的算子。

sprak core中 groupBy算子与groupByKey算子的区别!!!?

  1. groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD。

    groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要再自己指定,返回值是键和值组成的迭代器构成的kv格式RDD

  2. 执行shuffle数据量来看。

    groupBy产生的shuffle数据量再一定程度上要大于groupByKey产生的shuffle数据量,所以groupByKey算子的执行效率要比groupBy算子的执行效率要高。

10、reduceByKey算子

reduceByKey与groupByKey的区别!!!?

  • 相同点

    他们都是kv格式的算子,只有kv格式的RDD才能调用

  • 不同点

    • groupByKey只能是单纯地根据键进行分组,分组后的逻辑可以再后续的处理中调用其他的算子实现
    • reduceByKey相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
    • groupByKey的灵活度要比reduceByKey的灵活度高,reduceByKey无法做一些复杂的操作,比如求方差。但是groupByKey可以在分组之后的RDD进行方差操作。

11、Union算子

parallelize:将scala的集合变成spark中的RDD

两个RDD想要进行union合并,必须保证元素的格式和数据类型是一致的

分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定

12、join算子

join 内连接

right join右连接

left join 左连接

full join 全连接

13、MapValues算子

mapValues算子也是作用在kv格式的RDD上

将每个元素的值传递给后面的函数,进行处理得到新的值,键不变,这个处理后的组合重新返回到新的RDD中

14、mapPartitionBy算子

mapPartitions:一次处理一个分区中的数据

它与map的区别在于,map是每次处理一条数据就返回一条数据到下一个RDD而mapPartitions一次处理一个分区的数据,处理完在返回

最后处理的效果和map是一样的

mapPartitions可以优化与数据库连接的次数

15、sortBy排序

可以根据条件进行排序

16、foreach行动算子

行动算子,就可以触发一次作业执行,有几次行动算子调用,就会触发几次

rdd是懒加载的性质

17、collect

collect将rdd转成合适的scala中的数据结构,在使用collect之后foreach就是scala中的foreach,不会产生作业执行的

18、Cahce

缓存的目的是为了spark core作业执行的时候,缩短rdd的执行链,能够更快的得到结果

缓存的实现方式:

1、需要缓存的rdd调用cache函数

2、persist(StorageLevel.xxx)修改缓存级别

19、checkPoint

永久将执行过程中RDD中的流动的数据存储到磁盘中(hdfs)中

需要设置 checkpoint的路径,统一设置的

checkpoint也相当于一个行动算子,触发作业执行

第二次DAG有向无环图执行的时候,直接从最后一个有检查点的rdd开始向下执行

checkpoint和cache的区别!!!?

  • cache是将一个复杂的RDD做缓存,将来执行的时候,只是这个rdd会从缓存中取(当计算逻辑很复杂,消耗性能,使用缓存)
  • checkpoint是永久将rdd数据持久化,将来执行的时候,直接从检查点的rdd往后执行(当数据量很大,计算逻辑不复杂使用checkpoint)

写spark core程序的注意事项

1、RDD中无法嵌套使用RDD

2、RDD中无法是使用SparkContext

广播变量

使用SparkContext中的一个功能,将Driver端的变量广播到executor执行的节点上的blockManager中

为什么需要广播变量?

变量在Rriver端,也随着task任务一并发送到executor中执行,后期随着变量的数据量变大,也就意味着,每次发送任务,附带的数据量就会很大,无形之中,降低的执行速度。

standlone 模式 &yarn 模式

  • client模式 本地提交执行
  • cluster模式 本地集群提交执行

spark yarn client模式提交作业,好处是可以在提交的节点上能够看到详细的运行日志以及运行结果,不好的是,每次提交会在提交节点上启动一个driver,将来如果很多作业都需要提交执行,都以client模式提交的话,客户端会产生很多的driver,容易导致客户端压力过大崩溃,就会导致作业无法提交执行。

client模式一般是用于上线之前测试,当client模式通过没有问题的话,后续会使用yarn cluster模式进行提交。

spark yarn cluster模式相较于client模式而言,不会在提交作业的节点上产生Driver进程,而是选择一个相对空闲的子节点启动ApplicationMaster,这里的ApplicationMaster相当于一个Driver这样的话,就减轻了客户端的压力,可以提交更多的spark作业

提交作业的节点与Driver的启动节点不是同一个的,就会导致我们无法在提交·作业的节点上看到日志,如果是yarn cluster模式提交的话,我们只能够通过yarn logs -applicationld的命令查看yarn的作业日志来看到运行结果

上传到hdfs路径上

如果是打包到集群的话,这里的路径就是hdfs路径

如果是local的话,这里路径就是我们windows

Spark 任务调度

重试机制

  1. 正常情况下,task任务发送完毕,也执行结束,对应的进程会一次关闭释放
  2. 如果遇到失败的情况:
    • task任务执行失败(网络,权限,外界条件导致的),会触发重试机制,首先task Schedluer重试了三次之后,任务还是失败的,就会由前一个DAG Scheduler重新发送taskSet,重试了4次;若这两个对象都重试了之后,任务还是失败的,那么整个job作业就会报错
    • 如果遇到shuffle过程中,由于shuffle的小文件丢失了,导致任务失败,task Scheduler不会触发重试的,就会由DAG Scheduler重新规划taskSet,重新发送任务执行

推测执行机制

当某一个executor中 的任务数过多的时候,就右可能会导致某个任务执行十分缓慢或者失败,那么spark提供了推测执行的机制,讲缓慢执行的任务复制到其他相对空闲的且同属于一个spark作业的executor中执行,只要有一个executor中的该任务执行成功,就以这个任务的结果为准。

spark作业提交可能会用到的一些参数

--num-executors 所提供的executor个数

--executor-cores 每一个executor的核数

--executor-memory 每一个execuotr的内存

--total-executor-cores(代替num-executors 和 executor-cores)

为什么说spark执行的要比mr快

  1. MR属于细粒度资源调度
    • MR中的map任务或者reduce任务,到要执行的时候,才会想yarn申请资源
    • 任务的执行速度整体会变慢 --> Job作业执行速度变慢
    • 优点:不会造成资源浪费
    • 缺点:速度慢
  2. Spark属于粗粒度资源调度
    • spark会在任务调度之前,先将所有的需要的资源先申请下来,然后再将task任务发送到执行
    • 优点:速度快,执行过程中无需再次申请资源
    • 缺点:有可能申请的资源不够,导致任务执行失败;也有可能申请的资源过多,再spark作业执行期间不会释放,造成资源浪费。

Spark累加器

在Apache Spark中,累加器(Accumulator)是一种可以在集群计算过程中对共享变量进行“加”操作的机制。这种机制使得累加器可以被多个任务(task)安全地更新,从而在分布式计算环境中实现高效的全局聚合。

累加器的特点:

  1. 只支持累加操作:累加器只支持累加(add)操作,不能进行读取和其他操作。这种设计确保了累加器的操作是幂等的(Idempotent),即多次执行相同的操作不会改变最终结果。
  2. 驱动程序端访问:虽然任务(task)可以更新累加器的值,但只有驱动程序(Driver Program)能够读取累加器的最终值。这是为了防止在分布式环境中读取不一致的中间值。
  3. 用于调试和监控:累加器经常用于调试和监控,因为它们可以收集任务执行过程中的一些统计信息,如处理的数据量、错误计数等。

标签:Core,碎片,RDD,算子,Spark,执行,数据,spark
From: https://www.cnblogs.com/yulugoat/p/18303853

相关文章

  • 将.net core项目部署到IIS上?
    如何将.netcore项目部署到IIS上? 如何将.netcore项目部署到IIS上?1.新建一个.netcoremvc项目2.运行.netcore项目3.发布项目4.部署到IIS1.新建一个.netcoremvc项目1.点击左侧“最近的项目模板”或者右侧“ASP.NETCoreWeb应用程序”均可。2.填写你的项目名称,并且选择项......
  • 一个pyspark 开发练习实例
    实例功能说明:1,使用pyspark开发了一个数据ETL,分析的练习项目。2,实例功能为,从mysql读取表数据,按照一定规则进行ETL。以csv格式保存到hadoop.并特别的使用了Spark提供的3种API进行统计分析,分别是RDD算子,Dataframe算子,SQL编程算子,进行了数量统计,3,组件版本:pyspark:3.3.1......
  • 云原生周刊:Score 成为 CNCF 沙箱项目|2024.7.15
    开源项目TridentTrident是由NetApp维护的全面支持的开源项目。它从头开始设计,旨在通过行业标准接口(如容器存储接口CSI)帮助您满足容器化应用程序对持久性存储的需求。MonokleMonokle通过提供用于编写YAML清单、验证策略和管理实时集群的统一可视化工具,简化了创建、分析......
  • 基于EF Core存储的Serilog持久化服务
    前言Serilog是.NET上的一个原生结构化高性能日志库,这个库能实现一些比内置库更高度的定制。日志持久化是其中一个非常重要的功能,生产环境通常很难挂接调试器或者某些bug的触发条件很奇怪。为了在脱离调试环境的情况下尽可能保留更多线索来辅助解决生产问题,持久化的日志就显得很......
  • 1、多线程同步——CPU、core核、线程、内存
    CPU的运行原理控制单元在时序脉冲的作用下,将指令计数器里所指向的指令地址(这个地址是在内存里的)送到地址总线上去,然后CPU将这个地址里的指令读到指令寄存器进行译码。对于执行指令过程中所需要用到的数据,会将数据地址也送到地址总线,然后CPU把数据读到CPU的内部存储单元(就......
  • Spark算子综合案例 - Scala篇
    文章目录第1关:WordCount-词频统计代码第1关:WordCount-词频统计任务描述本关任务:使用SparkCore知识编写一个词频统计程序。编程要求请仔细阅读右侧代码,根据方法内的提示,在Begin-End区域内进行代码补充,具体任务如下:对文本文件内的每个单词都统计出其出......
  • Spark _Exam_ 20240711
    SparkExam20240711Conclusion比较可惜,做前面AB的时候状态不错,但是后面就不行了,C题直接想错了一个点,然后又没有继续想,D题确实不知道一些技巧,但是其实已经凑齐了正解的全部拼图,可以拿到60-70pts.score240|rnk3|est260|ideal360|idealrnk2A.flandreStatement定义一个序列......
  • C#面:dot net core管道里面的map拓展有什么作用?
    在.NETCore管道中,Map拓展方法用于将中间件添加到请求处理管道中。它的作用是根据请求的路径或其他条件来选择性地执行中间件。具体来说,Map方法接受一个路径参数和一个委托参数。当请求的路径与指定的路径匹配时,该委托中的中间件将被执行。这使得我们可以根据不同的路径来应用......
  • [rCore学习笔记 015]特权级机制
    写在前面本随笔是非常菜的菜鸡写的。如有问题请及时提出。可以联系:[email protected]:https://github.com/WindDevil(目前啥也没有官方文档仍然是一上来就丢出来的官方文档.只摘抄了我觉得有意思的部分:实现特权级机制的根本原因是应用程序运行的安全性不可充分信任......
  • 2024 年,Hadoop 已经被 Apache Spark 全面取代了吗?
    Hadoop是一个开源的分布式计算平台,能够处理大规模数据集,并且具备高可靠性和可扩展性。Hadoop生态系统庞大,包含了多个组件,如HDFS(HadoopDistributedFileSystem,Hadoop分布式文件系统)、YARN(YetAnotherResourceNegotiator,另一种资源协调者)、Hive、HBase等。这些组件共同构成了......