实验7
Spark初级编程实践
1.实验目的
(1)掌握使用Spark访问本地文件和HDFS文件的方法
(2)掌握Spark应用程序的编写、编译和运行方法
2.实验平台
(1)操作系统:Ubuntu18.04(或Ubuntu16.04);
(2)Spark版本:2.4.0;
(3)Hadoop版本:3.1.3。
3.实验步骤
(1)Spark读取文件系统的数据
(1)在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数;
val localFile = sc.textFile("file:///home/hadoop/test.txt")
val linesCount = localFile.count()
println(s"Total number of lines in the file: $linesCount")
(2)在spark-shell中读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;
val hdfsFile = sc.textFile("hdfs://namenode:8020/user/hadoop/test.txt")
val linesCount = hdfsFile.count()
println(s"Total number of lines in the HDFS file: $linesCount")
(2)编写独立应用程序(推荐使用Scala语言),读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;通过sbt工具将整个应用程序编译打包成 JAR包,并将生成的JAR包通过 spark-submit 提交到 Spark 中运行命令。
import org.apache.spark.sql.SparkSession
object CountLines {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("Count Lines").getOrCreate()
val sc = spark.sparkContext
if (args.length < 1) {
println("Usage: CountLines <input_file>")
sys.exit(1)
}
val inputFile = args(0)
val lines = sc.textFile(inputFile)
val linesCount = lines.count()
println(s"Total number of lines in the file: $linesCount")
spark.stop()
}
}
使用sbt工具编译打包,然后通过spark-submit提交到Spark集群运行:
sbt package
spark-submit --class "CountLines" target/scala-2.12/count-lines_2.12-1.0.jar hdfs://namenode:8020/user/hadoop/test.txt
(2)编写独立应用程序实现数据去重
对于两个输入文件A和B,编写Spark独立应用程序(推荐使用Scala语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
输入文件B的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
import org.apache.spark.sql.{DataFrame, SparkSession}
object MergeAndDeduplicate {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("Merge and Deduplicate").getOrCreate()
import spark.implicits._
if (args.length < 3) {
println("Usage: MergeAndDeduplicate <input_file_A> <input_file_B> <output_file>")
sys.exit(1)
val inputFileA = args(0)
val inputFileB = args(1)
val outputFile = args(2)
// Load files into DataFrames
val dfA = spark.read.textFile(inputFileA).map(_.split("\\s+")).map{ case Array(date, value) => (date, value) }.toDF("date", "value")
val dfB = spark.read.textFile(inputFileB).map(_.split("\\s+")).map{ case Array(date, value) => (date, value) }.toDF("date", "value"
// Union and remove duplicates
val mergedDF: DataFrame = dfA.union(dfB).distinct().orderBy("date", "value")
// Write result to output file
mergedDF.write.option("header", "false").option("delimiter", "\t").csv(outputFile)
spark.stop()
}
}
sbt package
spark-submit --class "MergeAndDeduplicate" target/scala-2.12/merge-and-deduplicate_2.12-1.0.jar /path/to/fileA /path/to/fileB /path/to/output
(3)编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
import org.apache.spark.sql.{DataFrame, SparkSession}
objet AverageScores {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("Average Scores").getOrCreate()
import spark.implicits._
if (args.length < 2) {
println("Usage: AverageScores <input_files_path> <output_file>")
sys.exit(1)
}
spark.read.textFile(inputPath).map(_.split("\\s+")).map{ case Array(name, score) => (name, score.toDouble) }.toDF("name", "score")
val avgScoresDF = scoresDF.groupBy("name").avg("score").withColumnRenamed("avg(score)", "average_score")
avgScoresDF.show()
avgScoresDF.write.option("header", "true").option("delimiter", "\t").csv(outputPath)
spark.stop()
}
}
4.实验报告
题目: |
Spark初级编程实践 |
姓名 |
王士英 |
日期12.16 |
实验环境:Ubuntu18.04,Haddop3.1.3 |
||||
实验内容与完成情况:完成 |
||||
出现的问题:在数据去重过程中,重复数据未能正确去除 |
||||
解决方案(列出遇到的问题和解决办法,列出没有解决的问题):使用Spark的distinct或reduceByKey操作来去除重复数据 |
标签:文件,val,10.27,args,value,spark,Spark From: https://www.cnblogs.com/jais/p/18647912