首页 > 编程语言 >【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化

时间:2022-10-27 11:34:48浏览次数:63  
标签:jsc scala JavaPairRDD tp mapToPair pom JavaRDD JavaAPI new


【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化_java

一、pom

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.10</scala.version>
<spark.version>3.0.0</spark.version>
<hadoop.version>3.2.1</hadoop.version>
<encoding>UTF-8</encoding>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>


<build>
<pLuginManagement>
<plugins>
<!--编译scala的插件-- ->
<pLugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!--编译java的插件-->
<pLugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pLuginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

二、Spark3.0-JavaAPI程序

实现Spark读取HDFS中的文本文件,实现单词计数,并2将结果输出到HDFS中。

2.1 java匿名实现类

// 1.创建配置
SparkConf conf = SparkConf().setAppName("JavaWordCount");

// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);

// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);

// 4.切分压平
JavaRDD<String> words = lines.flatMap(new FlatMapFuntion<String,String>(){
@Override
public Iterator<String> call(String line) throws Expection{
return Arrays.asList(line.split(" ")).iterator;
}
});

// 5.单词和1组合
JavaPairRDD<String,Integer> wordAndOne = words.maoToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String,Integer> call(String words) throws Exception{
return Tuple2.apply(word,1);
}
});

// 6.分组聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1,Integer v2) throws Exception{
return v1+v2;
}
});

// 7.交换kv顺序
JavaPairRDD<Integer,String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String,Integer>,Integer,String>(){
@Override
public Tuple2<Integer,String> call(Tuple2<String,Integer> tp) throws Exception{
return tp.swap();
}
});

// 8.排序
JavaPairRDD<Integer,String> sorted = swapped.sortedByKey(false);

// 9.交换kv顺序
JavaPairRDD<String,Integer> result = sorted.mapToPair(new PairFunction<Tuple2<String,Integer>,String,Integer>(){
@Override
public Tuple2<String,Integer> call(Tuple2<Integer,String> tp) throws Exception{
return tp.swap();
}
});

// 10.触发action保存到HDFS
result.saveAsTextFile(args[1]);

// 11.释放资源
jsc.stop();

2.2 Lambda表达式实现

// 1.创建配置
SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount");

// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);

// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);

// 4.切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());

// 5.单词 1
JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1));

// 6.聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j);

// 7.排序
JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false)
.mapToPair(tp -> tp.swap());

// 8.保存hdfs
sorted.saveAsTextFile(args[1]);

// 9.释放资源
jsc.stop();

2.3 程序打包

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化_maven_02

2.4 上传到Linux

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化_java_03

2.5 启动HDFS

hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode

2.6 Spark执行jar包

bin/spark-3.0.0-bin-hadoop3.2/bin/spark-submit  --master  spark://hadoop1:7077,hadoop2:7077,hadoop3:7077  --executor-memory 1g  --total-executor-cores 5  --class com.wang.spark.LambdaJavaWordCount  /root/spark-in-active-1.0-SNAPSHOT.jar  hdfs://hadoop1:9000/wc  hdfs://hadoop1:9000/out

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化_java_04

2.7 查看结果

hdfs -dfs -cat /out/*

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化_java_05

三、本机执行

本地测试,不会建立集群链接,再本地的一个进程运行。

// 1.创建配置【本地测试】
SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount").setMaster("local[*]");

// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);

// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);

// 4.切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());

// 5.单词 1
JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1));

// 6.聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j);

// 7.排序
JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false)
.mapToPair(tp -> tp.swap());

// 8.保存hdfs
sorted.saveAsTextFile(args[1]);

// 9.释放资源
jsc.stop();

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化_java_06


【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化_spark_07


运行时候,传入参数本地数据或者hdfs的数据。

如果出现这个错误,需要将pom中的scala的****放开

或者全部读取本机的文件。

执行结果如下:

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化_maven_08


标签:jsc,scala,JavaPairRDD,tp,mapToPair,pom,JavaRDD,JavaAPI,new
From: https://blog.51cto.com/u_15848894/5800572

相关文章

  • maven打包---pom配置
    <project....>...............<build><finalName>包名</finalName><plugins><plugin><groupId>org.apache.m......
  • pom.xml配置资源过滤
    <build><!--设置资源过滤--><resources><resource><directory>src/main/java</directory><includes>......
  • 下载插件失败? pom.xml
    parent标签下加,表示直接去远程仓库下载<relativePath/>比如<parent><artifactId>spring-boot-dependencies</artifactId><groupId>org.springfra......
  • 使用versions-maven-plugin插件批量修改pom.xml版本
    Java开发过程中,通常一个项目有多个子模块(项目结构如下),我们发布新版本后,需要创建新的分支并修改pom.xml中的版本号,如果模块过多,这是一个麻烦的事情,但我们有versions-maven-p......
  • pom.xml
    pom.xml<dependencies><!--servlet依赖--><dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api......
  • Elasticsearch——JavaApi实现索引管理
    版本不同版本的elasticsearch-rest-high-level-client和elasticsearch之间存在兼容风险,请确保和elasticsearch版本一致,否则会出现无法预计的错误。es配置maven依赖<dep......
  • Linux Centos系统使用yum时出现:Error:Cannot retrieve repository metadata (repomd.xm
    一、问题描述:在安装Oracle数据库时,需要用到yum安装所需要的软件包时,出现了:Error:Cannotretrieverepositorymetadata(repomd.xml)forreposi......
  • Idea-->Maven-->Mybatis-->pom.xml配置
    <!--依赖列表--><dependencies><!--MyBatis依赖--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</......
  • nexus上传pom文件问题
    原因:pom文件名问题原文件名:dpms-dependencies-0.3.0-RELEASE.pom成功上传文件名:`dpms-dependencies-0.3.0-RELEASE.pom```......
  • maven篇4:pom文件详解
    1、pom.xml文件详解<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://m......