首页 > 其他分享 >Spark 快速入门

Spark 快速入门

时间:2023-08-11 15:37:47浏览次数:38  
标签:入门 scala line RDD spark textFile 快速 Spark



Spark 快速入门

目录 [−]

使用Spark进行交互式分析

基本操作

更多的RDD操作

缓存

独立应用

深入了解

本教程快速介绍了Spark的使用。 首先我们介绍了通过Spark 交互式shell调用API( Python或者scala代码),然后演示如何使用Java, Scala或者Python编写独立程序。 你可以查看Spark编程指南了解完整的参考。


开始下面的快速入门之前,首先需要到Spark网站下载一份打包好的spark。 既然本教程中我们不使用HDFS,你可以随便下载一个适配任何Hadoop的版本的Spark。


本教程翻译时的Spark版本为1.1.1


使用Spark进行交互式分析

基本操作

Spark shell提供了一个简单方式去学习API,它也是一个交互式分析数据的强大工具。 你既可以使用Scala(运行在JVM之上,所以可以使用众多的Java库),也可以使用Python。运行Spark文件夹下的的命令:



./bin/spark- shell

Spark最主要的一个抽象出来的概念就是分布式的数据集合, 也就是弹性分布式数据集Resilient Distributed Dataset (RDD). RDD可以从Hadoop InputFormats (比如HDFS文件)创建, 也可以通过其它RDD转换(transforming)得到。 让我们从Spark源代码文件夹下的README文件创建一个RDD:




scala> val textFile = sc.textFile( "README.md")
textFile: spark.RDD[String] = spark.MappedRDD@ 2ee9b6e3

RDD包含action,可以返回数据, 也包含transformation,返回新的RDD的指针。 先看一些action的例子:



scala> textFile.count() // 此RDD中的item的数量
res0: Long = 126
 
scala> textFile.first() // 此RDD第一个item
res1: String = # Apache Spark

现在再看一个转换的例子。我们使用filter返回一个新的RDD, 新的RDD是文件中item的一个子集。



scala> val linesWithSpark = textFile.filter(line => line.contains( "Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@ 7dd4af09

将transformation和action串起来:



scala> textFile.filter(line => line.contains( "Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多的RDD操作

RDD的transformation和action可以组成起来完成复杂的计算。 比如查找包含最多单词的一行:



scala> textFile.map(line => line.split( " ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

第一步map一行包含的单词数到一个整数, 第二步调用reduce得到最大的单词数。map和reduce的参数都是lambda表达式(closures), 可以调用 Scala/Java库. 例如我们很容易的调用在其它地方声明的方法。 这里我们使用Math.max()函数简化代码:



scala> import java.lang.Math
import java.lang.Math
 
scala> textFile.map(line => line.split( " ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一个通用的数据流模式就是MapReduce,在Hadoop中相当流行. Spark实现MapReduce流很容易:


scala> val wordCounts = textFile.flatMap(line => line.split( " ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@ 71f027b8

此处我们使用flatMap, map 和 reduceByKey转换来计算文件中每个单词的频度。 为了收集单词频度结果,我们可以调用collect action:



scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means, 1), (under, 2), ( this, 3), (Because, 1), (Python, 2), (agree, 1), (cluster., 1), ...)

缓存

Spark也支持将数据集放入集群的内存中缓存起来. 当数据重复访问时特别有用, 比如查询一个小的 “hot”数据集或者运行一个交互式算法PageRank. 看一个简单的例子, 我们把上面的linesWithSpark数据集缓存起来:


scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@ 17e51082
 
scala> linesWithSpark.count()
res8: Long = 15
 
scala> linesWithSpark.count()
res9: Long = 15

当然使用Spark缓存一个100行的文本文件看起来有些傻,我们只是做个示范。 你可以将它用在非常大的数据集上,即使它们可能横跨几十甚至上百个节点。你也可以使用bin/spark-shell交互式实现此功能, 就像开发指南中描述的那样。


独立应用

下面我们想说一下怎样使用Spark API编写一个独立的应用程序。 这里使用Scala (SBT构建工具)和Java举例。 (Python官方文档中有,译者未翻译)


/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
 
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName( "Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains( "a")).count()
val numBs = logData.filter(line => line.contains( "b")).count()
println( "Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}

这个程序统计Spark README文件中包含字符a和b的行数。 注意你需要用你实际的Spark路径替换 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我们初始化一个SparkContext 作为程序的一部分.


我们将一个SparkConf对象传给SparkContext的构造函数, 它包含了我们程序的信息。


我们的程序依赖Spark API,所以我们包含一个sbt配置文件:simple.sbt 指明Spark是一个依赖, 这个文件也增加了Spark依赖的仓库(repository):



name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1"

为了保证sbt工作正常,我们需要将SimpleApp.scala和simple.sbt放入典型的sbt项目布局的文件夹中。 如此一来我们将应用代码可以打包成一个jar文件, 然后使用spark-submit脚本来运行此程序。



# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
 
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala- 2.10/simple-project_2. 10- 1.0.jar
 
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[ 4] \
target/scala- 2.10/simple-project_2. 10- 1.0.jar
...
Lines with a: 46, Lines with b: 23

或者使用Java


/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
 
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkConf conf = new SparkConf().setAppName( "Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
 
long numAs = logData.filter( new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains( "a"); }
}).count();
 
long numBs = logData.filter( new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains( "b"); }
}).count();
 
System.out.println( "Lines with a: " + numAs + ", lines with b: " + numBs);
}
}

这个程序统计Spark README文件中包含字符a和b的行数。. 注意你需要用你实际的Spark路径替换 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我们需要一个JavaSparkContext对象. 我们也创建了RDD (JavaRDD)然后运行transformations. 最后我们传递给Spark一个function对象, 这个function对象是一个匿名类,继承于 spark.api.java.function.Function. Spark开发指南描述了细节. (译者注: 这是Java 7的语法, 通过Java 8 Lambda表达式,上面的代码和scala一样的简化)


为了编译此程序,我们需要写一个Maven pom.xml文件, 增加Spark作为依赖. 注意Spark artifact带有Scala的版本.



<project>
<groupId>edu.berkeley </groupId>
<artifactId>simple-project </artifactId>
<modelVersion>4.0.0 </modelVersion>
<name>Simple Project </name>
<packaging>jar </packaging>
<version>1.0 </version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark </groupId>
<artifactId>spark-core_2.10 </artifactId>
<version>1.1.1 </version>
</dependency>
</dependencies>
</project>

使用Maven项目的布局:



$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,我们使用Maven打包并使用./bin/spark-submit执行此程序.


# Package a jar containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project- 1.0.jar
 
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[ 4] \
target/simple-project- 1.0.jar
...
Lines with a: 46, Lines with b: 23

深入了解

亲爱的读者,恭喜你运行了你的第一个Spark应用程序!

你肯定不仅仅满足于此,以下是更多的深入学习的资料:


深度学习API和其它组件, 请参照Spark开发指南

学习在集群中运行程序,访问 发布概览.

最后, Spark发布包中的examples文件夹下包含几个例子 (Scala, Java, Python). 你可以运行它们:


# For Scala and Java, use run-example:
./bin/run-example SparkPi
 
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

翻译自 Quick Start


标签:入门,scala,line,RDD,spark,textFile,快速,Spark
From: https://blog.51cto.com/u_16087105/7048291

相关文章

  • Nginx+Lua开发入门
    Nginx入门本文目的是学习Nginx+Lua开发,对于Nginx基本知识可以参考如下文章:nginx启动、关闭、重启agentzh的Nginx教程http://openresty.org/download/agentzh-nginx-tutorials-zhcn.htmlNginx+Lua入门http://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtmlnginx配置指令的......
  • spark社区bug
    1.SPARK-26114repartitionAndSortWithinPartitions后合并时PartitionedPairBuffer的内存泄漏   原因这个Spark源码的issue描述了在使用coalesce操作合并分区时可能会导致PartitionedPairBuffer内存泄漏的问题。具体来说,当在使用repartitionAndSortWithinPartitions操作进......
  • locate快速查找某文件路径会报以下错误
    部分版本的linux系统使用locate快速查找某文件路径会报以下错误:-bash:locate:commandnotfound其原因是没有安装mlocate这个包安装:yum-yinstallmlocate安装完再尝试用locate定位内容,发现依然不好使,报了新的错误:locate:cannotstat()`/var/lib/mlocate/mlocate.db':No......
  • ​​​软件开发入门教程网之​​MySQL LIKE 子句​
    我们知道在MySQL中使用SQLSELECT命令来读取数据,同时我们可以在SELECT语句中使用WHERE子句来获取指定的记录。WHERE子句中可以使用等号=来设定获取数据的条件,如"kxdang_author='RUNOOB.COM'"。但是有时候我们需要获取kxdang_author字段含有"COM"字符的所有记录,......
  • ​​​软件开发入门教程网之​​MySQL 连接的使用​
    在前几章节中,我们已经学会了如何在一张表中读取数据,这是相对简单的,但是在真正的应用中经常需要从多个数据表中读取数据。本章节我们将向大家介绍如何使用MySQL的JOIN在两个或多个表中查询数据。你可以在SELECT,UPDATE和DELETE语句中使用Mysql的JOIN来联合多表查询。JO......
  • iOS快速实现环形渐变进度条
    前言进度条相信我们大家都不陌生,往往我们很多时候需要使用到圆形进度条。这篇文章给大家分享了利用ios如何快速实现环形进度条,下面来一起看看。一:先制作一个不带颜色渐变的进度条自定义一个cycleview,在.m中实现drawrect方法-(void)drawrect:(cgrect)rect{cgcontextrefctx......
  • 软件开发入门教程网 Search之C++ 动态内存
       C++基本的输入输出   ......
  • 软件开发入门教程网 Search之C++ 环境设置
       C++基本的输入输出   ......
  • Nginx日志分析- AWK命令快速分析日志--封禁访问请求最多、最频繁的恶意ip
    Nginx日志常用分析命令示范(注:日志的格式不同,awk取的项不同。下面命令针对上面日志格式执行)1.分析日志的方法1)总请求数cd/usr/local/nginx/logs/wc-laccess.log|awk'{print$1}'166252)独立IP数awk'{print$1}'access.log|sort|uniq|wc-l4003)每秒客户端......
  • Pytorch框架CV开发-从入门到实战
    课程下载——Pytorch框架CV开发-从入门到实战提取码:hcjk分享课程——Pytorch框架CV开发-从入门到实战,附代码+PDF课件+数据集下载。课程目标:掌握pytorch深度学习框架在计算机视觉领域的开发技术,理解卷积神经网络的基础知识,学会使用相关的网络模型完成图像分类、对象检测、语义分......