首页 > 编程语言 >【推荐系统算法实战】 Spark :大数据处理框架

【推荐系统算法实战】 Spark :大数据处理框架

时间:2022-11-30 11:08:43浏览次数:87  
标签:Mesos MapReduce YARN 算法 集群 数据处理 Spark spark


【推荐系统算法实战】 Spark :大数据处理框架_数据

Spark 简介

​http://spark.apache.org/​​​​https://github.com/to-be-architect/spark​

与​​Hadoop​​​和​​Storm​​​等其他大数据和​​MapReduce​​​技术相比,​​Spark​​有如下优势:

  • Spark提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求.
  • 官方资料介绍Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍,甚至能够将应用在磁盘上的运行速度提升10倍

架构及生态

通常当需要处理的数据量超过了单机尺度(比如我们的计算机有4GB的内存,而我们需要处理100GB以上的数据)这时我们可以选择spark集群进行计算,有时我们可能需要处理的数据量并不大,但是计算很复杂,需要大量的时间,这时我们也可以选择利用spark集群强大的计算资源,并行化地计算,其架构示意图如下:

【推荐系统算法实战】 Spark :大数据处理框架_hadoop_02

Spark组成(BDAS):全称伯克利数据分析栈,通过大规模集成算法、机器、人之间展现大数据应用的一个平台。也是处理大数据、云计算、通信的技术解决方案。

它的主要组件有:

  • SparkCore:将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。
  • SparkSQL:Spark Sql 是Spark来操作结构化数据的程序包,可以让我使用SQL语句的方式来查询数据,Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。
  • SparkStreaming: 是Spark提供的实时数据进行流式计算的组件。
  • MLlib:提供常用机器学习算法的实现库。
    ​​​https://spark.apache.org/docs/latest/mllib-guide.html​
  • GraphX:提供一个分布式图计算框架,能高效进行图计算。
  • BlinkDB:用于在海量数据上进行交互式SQL的近似查询引擎。
  • Tachyon:以内存为中心高容错的的分布式文件系统。

Spark结构设计

Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。其中,集群资源管理器可以是Spark自带的资源管理器,也可以是YARN或Mesos等资源管理框架。

【推荐系统算法实战】 Spark :大数据处理框架_数据_03

Spark运行基本流程

Spark的基本运行流程如下:

当一个Spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext,由SparkContext负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext会向资源管理器注册并申请运行Executor的资源;

资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上;

SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAG调度器(DAGScheduler)进行解析,将DAG图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理;Executor向SparkContext申请任务,任务调度器将任务分发给Executor运行,同时,SparkContext将应用程序代码发放给Executor;

任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。

【推荐系统算法实战】 Spark :大数据处理框架_hadoop_04

Spark三种部署方式

Spark应用程序在集群上部署运行时,可以由不同的组件为其提供资源管理调度服务(资源包括CPU、内存等)。比如,可以使用自带的独立集群管理器(standalone),或者使用YARN,也可以使用Mesos。因此,Spark包括三种不同类型的集群部署方式,包括standalone、Spark on Mesos和Spark on YARN。

1.standalone模式

与MapReduce1.0框架类似,Spark框架本身也自带了完整的资源调度管理服务,可以独立部署到一个集群中,而不需要依赖其他系统来为其提供资源管理调度服务。在架构的设计上,Spark与MapReduce1.0完全一致,都是由一个Master和若干个Slave构成,并且以槽(slot)作为资源分配单位。不同的是,Spark中的槽不再像MapReduce1.0那样分为Map 槽和Reduce槽,而是只设计了统一的一种槽提供给各种任务来使用。

2.Spark on Mesos模式

Mesos是一种资源调度管理框架,可以为运行在它上面的Spark提供服务。Spark on Mesos模式中,Spark程序所需要的各种资源,都由Mesos负责调度。由于Mesos和Spark存在一定的血缘关系,因此,Spark这个框架在进行设计开发的时候,就充分考虑到了对Mesos的充分支持,因此,相对而言,Spark运行在Mesos上,要比运行在YARN上更加灵活、自然。目前,Spark官方推荐采用这种模式,所以,许多公司在实际应用中也采用该模式。

3. Spark on YARN模式

Spark可运行于YARN之上,与Hadoop进行统一部署,即“Spark on YARN”,其架构如图9-13所示,资源管理和调度依赖YARN,分布式存储则依赖HDFS。

“Spark on YARN”

【推荐系统算法实战】 Spark :大数据处理框架_数据_05

Hadoop和Spark统一部署

一方面,由于Hadoop生态系统中的一些组件所实现的功能,目前还是无法由Spark取代的,比如,Storm可以实现毫秒级响应的流计算,但是,Spark则无法做到毫秒级响应。另一方面,企业中已经有许多现有的应用,都是基于现有的Hadoop组件开发的,完全转移到Spark上需要一定的成本。因此,在许多企业实际应用中,Hadoop和Spark的统一部署是一种比较现实合理的选择。
由于Hadoop MapReduce、HBase、Storm和Spark等,都可以运行在资源管理框架YARN之上,因此,可以在YARN之上进行统一部署(如图9-16所示)。这些不同的计算框架统一运行在YARN中,可以带来如下好处:

 计算资源按需伸缩;
 不用负载应用混搭,集群利用率高;
 共享底层存储,避免数据跨集群迁移。

【推荐系统算法实战】 Spark :大数据处理框架_spark_06

MapReduce & Spark

【推荐系统算法实战】 Spark :大数据处理框架_数据_07

image

七个MapReduce作业意味着需要七次读取和写入HDFS,而它们的输入输出数据存在关联,七个作业输入输出数据关系如下图。

【推荐系统算法实战】 Spark :大数据处理框架_spark_08

image

基于MapReduce实现此算法存在以下问题:

  • 为了实现一个业务逻辑需要使用七个MapReduce作业,七个作业间的数据交换通过HDFS完成,增加了网络和磁盘的开销。
  • 七个作业都需要分别调度到集群中运行,增加了Gaia集群的资源调度开销。
  • MR2和MR3重复读取相同的数据,造成冗余的HDFS读写开销。

这些问题导致作业运行时间大大增长,作业成本增加。相比与MapReduce编程模型,Spark提供了更加灵活的DAG(Directed Acyclic Graph) 编程模型, 不仅包含传统的map、reduce接口, 还增加了filter、flatMap、union等操作接口,使得编写Spark程序更加灵活方便。使用Spark编程接口实现上述的业务逻辑如下图所示。

【推荐系统算法实战】 Spark :大数据处理框架_hadoop_09

image

相对于MapReduce,Spark在以下方面优化了作业的执行时间和资源使用。

  • DAG编程模型。 通过Spark的DAG编程模型可以把七个MapReduce简化为一个Spark作业。Spark会把该作业自动切分为八个Stage,每个Stage包含多个可并行执行的Tasks。Stage之间的数据通过Shuffle传递。最终只需要读取和写入HDFS一次。减少了六次HDFS的读写,读写HDFS减少了70%。
  • Spark作业启动后会申请所需的Executor资源,所有Stage的Tasks以线程的方式运行,共用Executors,相对于MapReduce方式,Spark申请资源的次数减少了近90%。
  • Spark引入了RDD(Resilient Distributed Dataset)模型,中间数据都以RDD的形式存储,而RDD分布存储于slave节点的内存中,这就减少了计算过程中读写磁盘的次数。RDD还提供了Cache机制,例如对上图的rdd3进行Cache后,rdd4和rdd7都可以访问rdd3的数据。相对于MapReduce减少MR2和MR3重复读取相同数据的问题。

环境安装

​http://spark.apache.org/downloads.html​

Spark的部署模式有Local、Local-Cluster、Standalone、Yarn、Mesos,我们选择最具代表性的Standalone集群部署模式。

进入到Spark安装目录

​cd /home/bigdata/hadoop/spark-2.1.1-bin-hadoop2.7/conf​

将slaves.template复制为slaves
将spark-env.sh.template 复制为 spark-env.sh
修改 slave 文件,将 work 的 hostname 输入:

【推荐系统算法实战】 Spark :大数据处理框架_数据_10

修改spark-env.sh文件,添加如下配置:

【推荐系统算法实战】 Spark :大数据处理框架_数据_11

将配置好的Spark文件拷贝到其他节点上

Spark集群配置完毕,目前是1个Master,2个Work,linux01上启动Spark集群
/opt/modules/spark-2.1.1-bin-hadoop2.7/sbin/start-all.sh

启动后执行jps命令,主节点上有Master进程,其他子节点上有Work进行,登录Spark管理界面查看集群状态(主节点):​​http://linux01:8080/​

jack@Jack-MacBook-Pro:~/soft/spark-3.0.0-preview-bin-hadoop2.7/sbin$ ./start-all.sh 
starting org.apache.spark.deploy.master.Master, logging to /Users/jack/soft/spark-3.0.0-preview-bin-hadoop2.7/logs/spark-jack-org.apache.spark.deploy.master.Master-1-Jack-MacBook-Pro.local.out
localhost: Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
Password:
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/jack/soft/spark-3.0.0-preview-bin-hadoop2.7/logs/spark-jack-org.apache.spark.deploy.worker.Worker-1-Jack-MacBook-Pro.local.out
localhost: bash: shell_session_update: command not found
jack@Jack-MacBook-Pro:~/soft/spark-3.0.0-preview-bin-hadoop2.7/sbin$
jack@Jack-MacBook-Pro:~/soft/spark-3.0.0-preview-bin-hadoop2.7/sbin$ jps
4160 Launcher
82225 Worker
23393 Preloader
71349 Launcher
43252 Launcher
82244 Jps
82068 Master
54599 ApplicationKt
59352 Main
59291 org.eclipse.equinox.launcher_1.5.600.v20191014-2022.jar
53807

到此为止,Spark集群安装完毕.

注意:如果遇到 “JAVA_HOME not set” 异常,可以在sbin目录下的spark-config.sh 文件中加入如下配置:
export JAVA_HOME=XXXX

快速开始

​http://spark.apache.org/examples.html​

【推荐系统算法实战】 Spark :大数据处理框架_数据_12

spark任务部署:

./bin/spark-submit --class org.apache.spark.examples.SparkPi \        #作业类名
--master yarn \ #spark模式
--deploy-mode cluster \ #spark on yarn 模式
--driver-memory 4g \ #每一个driver的内存
--executor-memory 2g \ #每一个executor的内存
--executor-cores 1 \ #每一个executor占用的core数量
--queue thequeue \ #作业执行的队列
examples/jars/spark-examples*.jar \ #jar包
10 #传入类中所需要的参数

spark 任务划分:

一个jar包就是一个Application

一个行动操作就是一个Job, 对应于Hadoop中的一个MapReduce任务

一个Job有很多Stage组成,划分Stage是从后往前划分,遇到宽依赖则将前面的所有转换换分为一个Stage

一个Stage有很多Task组成,一个分区被一个Task所处理,所有分区数也叫并行度。

​​​https://www.jianshu.com/p/11c4cfa094aa​


Kotlin 开发者社区

国内第一Kotlin 开发者社区公众号,主要分享、交流 Kotlin 编程语言、Spring Boot、Android、React.js/Node.js、函数式编程、编程思想等相关主题。

【推荐系统算法实战】 Spark :大数据处理框架_spark_13


越是喧嚣的世界,越需要宁静的思考。

标签:Mesos,MapReduce,YARN,算法,集群,数据处理,Spark,spark
From: https://blog.51cto.com/u_15236724/5897812

相关文章