1.实验目的
(1)掌握使用Spark访问本地文件和HDFS文件的方法
(2)掌握Spark应用程序的编写、编译和运行方法
2.实验平台
(1)操作系统:Ubuntu18.04(或Ubuntu16.04);
(2)Spark版本:2.4.0;
(3)Hadoop版本:3.1.3。
3.实验步骤
(1)Spark读取文件系统的数据
(1)在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数;
(2)在spark-shell中读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;
(3)编写独立应用程序(推荐使用Scala语言),读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;通过sbt工具将整个应用程序编译打包成 JAR包,并将生成的JAR包通过 spark-submit 提交到 Spark 中运行命令。
源代码:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class FileLineCounter {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("File Line Counter").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取HDFS文件
String filePath = "hdfs://node1:8020/user/hadoop/test.txt";
JavaRDD<String> lines = sc.textFile(filePath);
// 统计文件行数
long lineCount = lines.count();
System.out.println("文件总行数: " + lineCount);
sc.close();
}
}
(2)编写独立应用程序实现数据去重
对于两个输入文件A和B,编写Spark独立应用程序(推荐使用Scala语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
输入文件B的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
源代码:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class FileDeduplicator {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("File Deduplicator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取文件A和文件B
String fileA = "hdfs://node1:8020/user/hadoop/fileA.txt";
String fileB = "hdfs://node1:8020/user/hadoop/fileB.txt";
JavaRDD<String> rddA = sc.textFile(fileA);
JavaRDD<String> rddB = sc.textFile(fileB);
// 合并文件并去重
JavaRDD<String> mergedRDD = rddA.union(rddB).distinct();
mergedRDD.saveAsTextFile("hdfs://node1:8020/user/hadoop/outputC");
System.out.println("去重后的数据已保存到HDFS的outputC目录");
sc.close();
}
}
(3)编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
源代码:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class AverageScoreCalculator {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Average Score Calculator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取各科成绩文件
String[] filePaths = {
"hdfs://node1:8020/user/hadoop/Algorithm.txt",
"hdfs://node1:8020/user/hadoop/Database.txt",
"hdfs://node1:8020/user/hadoop/Python.txt"
};
JavaRDD<String> allScores = sc.emptyRDD();
for (String path : filePaths) {
allScores = allScores.union(sc.textFile(path));
}
// 解析数据并计算平均值
JavaPairRDD<String, Tuple2<Integer, Integer>> studentScores = allScores.mapToPair(line -> {
String[] parts = line.split(" ");
return new Tuple2<>(parts[0], new Tuple2<>(Integer.parseInt(parts[1]), 1));
});
JavaPairRDD<String, Tuple2<Integer, Integer>> aggregatedScores = studentScores.reduceByKey((x, y) ->
new Tuple2<>(x._1 + y._1, x._2 + y._2)
);
JavaPairRDD<String, Double> averageScores = aggregatedScores.mapValues(x -> x._1 * 1.0 / x._2);
// 保存结果到HDFS
averageScores.saveAsTextFile("hdfs://node1:8020/user/hadoop/AverageScores");
System.out.println("学生平均成绩已保存到HDFS的AverageScores目录");
sc.close();
}
}