Spark概述与搭建
1、离线计算,基于内存,所以比MapReduce(基于磁盘)快(Flink真正实时型框架)
2、spark处理量级在GB量级
3、spark构成:BDAS,将数据变成DataFrame(DF基于pandas框架,表结构,有行有列)进行处理(ResilientDistributedDataset (弹性分布式数据集 )
1、SparkSql:
2、Spark Streaming:流,做实时处理(伪实时,微批处理(用时间度量),类似于实时处理)
3、MLlib:机器学习
4、Graphx:图像处理
发展历史
hadoop历史16年
spark历史12年
4、Spark模式
Local 多用于测试
Standalone
Mesos
YARN 最具前景
spark比MapReduce快的原因:
1、基于内存
2、 DAG:有向无环图,按流程一个方向,并行计算,
3、粗粒度资源调度(将需要的资源一次性全部申请)
5、搭建spark环境(Local)
maven环境Scala依赖spark——core依赖2.4.5版本、
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("wordcount")
conf.setMaster("local[*]")//设置spark运行方式
val sc: SparkContext = new SparkContext(conf)
//1、加载数据
//RDD弹性数据集
val linesRDD: RDD[String] = sc.textFile("F:\\IdeaProjects\\Spark\\src\\main\\data\\words.txt ")
//2、将数据进行flatmap切分
val wordfsRDD: RDD[String] = linesRDD.flatMap(_.split(","))
//3、将每一行数据进行分组
/**
* iterable和List的区别
* iterable中的数据是只要在需要的时候才会加载
* List中的数据是完全加载到内存中的
*/
val grpRDD: RDD[(String, Iterable[String])] = wordfsRDD.groupBy(word => word)
//4、将分组过后的map集合按照kv格式,进行输出
val result = grpRDD.map(kv => s"${kv._1 },${ kv._2.size}")
result.saveAsTextFile("F:\\IdeaProjects\\Spark\\src\\main\\data\\result")
//5.链式调用
// sc
// .textFile("F:\\IdeaProjects\\Spark\\src\\main\\data\\words.txt ")
// .flatMap(_.split(","))
// .groupBy(word => word)
// .map(kv=>kv._1+","+kv._2.size)
// .foreach(println)
}
}
pom.xml文件依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.shujia</groupId>
<artifactId>Spark</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
6.spark环境搭建(StandAlone模式)
-
1、上传解压
tar -zxvf spark-2.4.5-bin-hadoop2.7.tgz -C /usr/local/soft mv spark-2.4.5-bin-hadoop2.7 spark-2.4.5
-
2、修改配置文件
# 重命名文件 cp spark-env.sh.template spark-env.sh cp slaves.template slaves
增加配置:
vim spark-env.sh
master相当于RM worker相当于NM
export SPARK_MASTER_IP=master01 export SPARK_MASTER_PORT=7077 export SPARK_WORKER_CORES=2 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_MEMORY=2g export JAVA_HOME=/usr/local/soft/jdk1.8.0_171
增加从节点配置:
vim slaves
以node1、node2作为从节点
node01 node02
-
3、复制到其它节点
cd /usr/local/soft/ scp -r spark-2.4.5 node01:`pwd` scp -r spark-2.4.5 node02:`pwd`
-
4、配置环境变量
-
5、在主节点执行启动命令
注意:start-all.sh 与Hadoop的sbin目录中的启动命令有冲突
cd /usr/local/soft/spark-2.4.5/ ./sbin/start-all.sh
-
6、访问Spark Web UI
http://master01:8080/
-
7、测试及使用
切换目录:
cd /usr/local/soft/spark-2.4.5/examples/jars
standalone client模式 :日志在本地输出,一般用于上线前测试
-
提交自带的SparkPi任务
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master01:7077 --executor-memory 512m --total-executor-cores 1 spark-examples_2.11-2.4.5.jar 100
standalone cluster模式:上线使用,不会在本地打印日志
-
提交自带的SparkPi
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master01:7077 --executor-memory 512M --total-executor-cores 1 --deploy-mode cluster spark-examples_2.11-2.4.5.jar 100
-
-
8、其他运行方式
-
spark-shell spark 提供的一个交互式的命令行,可以直接写代码
spark-shell master spark://master01:7077
-
7、On Yarn模式
在公司一般不适用standalone模式
因为公司一般已经有yarn 不需要搞两个资源管理框架
Spark整合yarn只需要在一个节点整合, 可以删除node1 和node2中所有的Spark 文件
-
1、停止Spark Standalone模式集群
# 切换目录 cd /usr/local/soft/spark-2.4.5/ # 停止集群 ./sbin/stop-all.sh
-
2、增加hadoop 配置文件地址
vim spark-env.sh # 增加HADOOP_CONF_DIR export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop
-
3、关闭Yarn
stop-yarn.sh
-
4、修改Yarn配置
cd /usr/local/soft/hadoop-2.7.6/etc/hadoop/ vim yarn-site.xml # 加入如下配置 <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
-
5、同步到其他节点
scp -r yarn-site.xml node1:`pwd` scp -r yarn-site.xml node2:`pwd`
-
6、启动Yarn
start-yarn.sh
-
7、测试及使用
切换目录:
cd /usr/local/soft/spark-2.4.5/examples/jars
Spark on Yarn Client模式:日志在本地输出,一班用于上线前测试
-
提交自带的SparkPi任务
spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 512M --num-executors 2 spark-examples_2.11-2.4.5.jar 100
Spark on Yarn Cluster模式:上线使用,不会在本地打印日志
-
提交自带的SparkPi任务
spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --executor-memory 512m --num-executors 2 --executor-cores 1 spark-examples_2.11-2.4.5.jar 100
-
获取yarn程序执行日志 执行成功之后才能获取到
yarn logs -applicationId application_1652086375126_0002
-
-
8、开启Spark On Yarn的WEB UI
修改配置文件:
# 切换目录 cd /usr/local/soft/spark-2.4.5/conf # 去除后缀 cp spark-defaults.conf.template spark-defaults.conf # 修改spark-defaults.conf vim spark-defaults.conf # 加入以下配置 spark.eventLog.enabled true spark.eventLog.dir hdfs://master01:9000/user/spark/applicationHistory spark.yarn.historyServer.address master01:18080 spark.eventLog.compress true spark.history.fs.logDirectory hdfs://master01:9000/user/spark/applicationHistory spark.history.retainedApplications 15
创建HDFS目录用于存储Spark History日志
hdfs dfs -mkdir -p /user/spark/applicationHistory
启动Spark History Server
cd /usr/local/soft/spark-2.4.5/ ./sbin/start-history-server.sh
启动 hadoop/sbin/mr-jobhistory-daemon.sh start historyserver