一、pom
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.10</scala.version>
<spark.version>3.0.0</spark.version>
<hadoop.version>3.2.1</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<build>
<pLuginManagement>
<plugins>
<!--编译scala的插件-- ->
<pLugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!--编译java的插件-->
<pLugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pLuginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
二、Spark3.0-JavaAPI程序
实现Spark读取HDFS中的文本文件,实现单词计数,并2将结果输出到HDFS中。
2.1 java匿名实现类
// 1.创建配置
SparkConf conf = SparkConf().setAppName("JavaWordCount");
// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);
// 4.切分压平
JavaRDD<String> words = lines.flatMap(new FlatMapFuntion<String,String>(){
@Override
public Iterator<String> call(String line) throws Expection{
return Arrays.asList(line.split(" ")).iterator;
}
});
// 5.单词和1组合
JavaPairRDD<String,Integer> wordAndOne = words.maoToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String,Integer> call(String words) throws Exception{
return Tuple2.apply(word,1);
}
});
// 6.分组聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1,Integer v2) throws Exception{
return v1+v2;
}
});
// 7.交换kv顺序
JavaPairRDD<Integer,String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String,Integer>,Integer,String>(){
@Override
public Tuple2<Integer,String> call(Tuple2<String,Integer> tp) throws Exception{
return tp.swap();
}
});
// 8.排序
JavaPairRDD<Integer,String> sorted = swapped.sortedByKey(false);
// 9.交换kv顺序
JavaPairRDD<String,Integer> result = sorted.mapToPair(new PairFunction<Tuple2<String,Integer>,String,Integer>(){
@Override
public Tuple2<String,Integer> call(Tuple2<Integer,String> tp) throws Exception{
return tp.swap();
}
});
// 10.触发action保存到HDFS
result.saveAsTextFile(args[1]);
// 11.释放资源
jsc.stop();
2.2 Lambda表达式实现
// 1.创建配置
SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount");
// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);
// 4.切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());
// 5.单词 1
JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1));
// 6.聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j);
// 7.排序
JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false)
.mapToPair(tp -> tp.swap());
// 8.保存hdfs
sorted.saveAsTextFile(args[1]);
// 9.释放资源
jsc.stop();
2.3 程序打包
2.4 上传到Linux
2.5 启动HDFS
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
2.6 Spark执行jar包
bin/spark-3.0.0-bin-hadoop3.2/bin/spark-submit --master spark://hadoop1:7077,hadoop2:7077,hadoop3:7077 --executor-memory 1g --total-executor-cores 5 --class com.wang.spark.LambdaJavaWordCount /root/spark-in-active-1.0-SNAPSHOT.jar hdfs://hadoop1:9000/wc hdfs://hadoop1:9000/out
2.7 查看结果
hdfs -dfs -cat /out/*
三、本机执行
本地测试,不会建立集群链接,再本地的一个进程运行。
// 1.创建配置【本地测试】
SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount").setMaster("local[*]");
// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);
// 4.切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());
// 5.单词 1
JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1));
// 6.聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j);
// 7.排序
JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false)
.mapToPair(tp -> tp.swap());
// 8.保存hdfs
sorted.saveAsTextFile(args[1]);
// 9.释放资源
jsc.stop();
运行时候,传入参数本地数据或者hdfs的数据。
如果出现这个错误,需要将pom中的scala的****放开
或者全部读取本机的文件。
执行结果如下: