首页 > 其他分享 >Spark WordCount

Spark WordCount

时间:2023-05-15 19:32:38浏览次数:39  
标签:java String WordCount import apache org Spark spark


一:启动hadoop和spark

cd /usr/local/Cellar/hadoop/3.2.1/sbin
./start-all.sh


cd /usr/local/Cellar/spark-3.0.0-preview2/sbin
./start-all.sh

二:Java WordCount

Spark WordCount_spark

1. 引入依赖

依赖的版本号要与安装程序的版本号保持一致。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>3.0.0-preview2</version>
</dependency>

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>3.2.1</version>
</dependency>

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>23.6-jre</version>
</dependency>
2. word.txt
hello spark
spark sql
spark streaming
3. JavaWordCount.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;


public class JavaWordCount {
    public static void main(String[] args) {
    	// local模拟一个集群环境运行任务
		// local[num],使用的线程数目去模拟一个集群
		// local[*],使用本地所有有空闲的线程模拟集群,默认为2
        SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        // 读取文本文件内容,每一行内容作为一个元素
        // [hello spark, spark sql, spark streaming]
        JavaRDD<String> rdd1 = jsc.textFile("/Users/mengday/Desktop/spark/word.txt");


        // 将每行文本内容通过空格分隔转为独立的单词,每个单词作为一个元素
        // [hello, spark, spark, sql, spark, streaming]
        JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });


        // 映射, 将列表中的每个单词转为元组类型,将单词作为元组的第一个元素,并给一个初始值1作为元组的第二个元素。 word -> (word, 1)
        // [(hello,1), (spark,1), (spark,1), (sql,1), (spark,1), (streaming,1)]
        JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });


        // reduce归纳,将元组中的key相同的分组,对值求和
        // [(spark,3), (hello,1), (streaming,1), (sql,1)]
        JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });


        // 打印最终结果
        List<Tuple2<String,Integer>> list = rdd4.collect();
        for(Tuple2<String, Integer> t : list){
            System.out.println(t._1() + " : " + t._2());
        }

        // 将最终结果写到文件中
        rdd4.saveAsTextFile("/Users/mengday/Desktop/spark/out");
        // 将最终结果写到HDFS中
        rdd4.saveAsHadoopFile("hdfs://localhost:8020/wordcount/output", String.class, Integer.class, TextOutputFormat.class);
        jsc.stop();
    }

}
4. 运行

方式一:直接运行main方法

方式二:通过spark-submit命令执行

cd /usr/local/Cellar/spark-3.0.0-preview2/bin
./spark-submit --class org.example.wordcount.JavaWordCount ~/Temp/spark-wordcount-java/target/spark-wordcount-java-1.0-SNAPSHOT.jar

Spark WordCount_apache_02

Spark WordCount_scala_03

Spark WordCount_spark_04

Spark WordCount_apache_05

三:Scala WordCount

1. 安装或者更新Scala插件

Spark WordCount_spark_06

2. 创建maven项目(scala-archetype-simple)

Spark WordCount_scala_07

3. 设置Setup Scala SDK

Spark WordCount_apache_08

4. 修改pom.xml中scala.version和-target:jvm-1.8
<properties>
    <scala.version>2.12.10</scala.version>
</properties>

<build>
	<sourceDirectory>src/main/scala</sourceDirectory>
	<testSourceDirectory>src/test/scala</testSourceDirectory>
	<plugins>
	  <plugin>
	    <groupId>org.scala-tools</groupId>
	    <artifactId>maven-scala-plugin</artifactId>
	    <executions>
	      <execution>
	        <goals>
	          <goal>compile</goal>
	          <goal>testCompile</goal>
	        </goals>
	      </execution>
	    </executions>
	    <configuration>
	      <scalaVersion>${scala.version}</scalaVersion>
	      <args>
	        <arg>-target:jvm-1.8</arg>
	      </args>
	    </configuration>
	  </plugin>
	</plugins>
</build>
5. 删除App类
6. 添加依赖
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>3.0.0-preview2</version>
</dependency>

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>3.2.1</version>
</dependency>

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>23.6-jre</version>
</dependency>
7. ScalaWordCount
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object ScalaWordCount {

  def main(args:Array[String]): Unit = {
    val sparkConf:SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local")
    val sc:SparkContext = new SparkContext(sparkConf)
    val file:RDD[String] = sc.textFile("/Users/mengday/Desktop/spark/word.txt")

    // val result:RDD[(String,Int)] = file.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
    val words:RDD[String] = file.flatMap(_.split(" "))
    val wordAndCount:RDD[(String, Int)] = words.map(x=>(x, 1))
    val result:RDD[(String, Int)] = wordAndCount.reduceByKey(_ + _)

    // 打印结果
    result.collect().foreach(println)

    // 将结果写到文件中
    result.saveAsTextFile("/Users/mengday/Desktop/spark/out")
    // 将结果写到HDFS中
    result.saveAsHadoopFile("hdfs://localhost:8020/wordcount/output", classOf[String], classOf[Integer], classOf[TextOutputFormat[String, Integer]])
    sc.stop()
  }
}
8. 运行ScalaWordCount#main

Spark WordCount_spark_09

9. 命令行运行

Spark WordCount_spark_10

cd /usr/local/Cellar/spark-3.0.0-preview2/bin
./spark-submit --class org.example.wordcount.ScalaWordCount ~/Temp/spark-wordcount-scala/target/spark-wordcount-scala-1.0-SNAPSHOT.jar

Spark WordCount_spark_11

Spark提供多种语言对应的API,Scala运行在Java虚拟机上,可以与Java互操作。Spark是用Scala语言编写的,通过对比WordCount示例,Scala比Java代码量少很多。


标签:java,String,WordCount,import,apache,org,Spark,spark
From: https://blog.51cto.com/u_16114318/6280714

相关文章

  • Mac安装Spark
    安装scala安装Scala不是任何版本都可以,必须安装Spark指定的版本。如果brew安装不了指定的版本就去官网下载安装包手动安装。[email protected]配置环境变量exportSCALA_HOME=/usr/local/Cellar/scala-2.12.10exportPATH=$PATH:$SCALA_HOME/bin安装spar......
  • 15、MapReduce介绍及wordcount
    文章目录Hadoop系列文章目录一、mapreduce编程模型1、MapReduce介绍2、MapReduce编程规范3、序列化4、hadoop数据类型5、示例二、wordcount实现1、pom.xml2、Mapper3、Reducer4、Driver5、完整的代码(WordCount)6、Driver推荐写法7、运行结果1)、运行日志2)、运行结果三、运行环境介绍......
  • spark-sql的使用
    使用spark-sql操作hive数据仓库的前提是需要把hive配置文件hive-site.xml复制到sparkconf目录下进入到sparkbin目录下使用命令开启一个spark-sql:./spark-sql--masterlocal[*]--jars/usr/hdp/current/hive-server2/lib/mysql-connector-java.jar因为我们要操作hive,因此......
  • spark-sql 启动客户端报错问题
    1.spark-sql启动报错java.lang.NoClassDefFoundError:org/apache/tez/dag/api/SessionNotRunning原因:在启动时找不到tez的jar包解决方法:将tez目录下的tez-api-0.9.1.jar复制一份到spark的jars目录下cptez-api-0.9.1.jar/usr/hdp/current/spark2-client/jars/分发到其他......
  • spark资源动态调整--内容搬运,周知
    动态资源分配Spark的动态资源分配就是executor数据量的动态增减,具体的增加和删除数量根据业务的实际需要动态的调整。具体表现为:如果executor数据量不够,则增加数量,如果executor在一段时间内空闲,则移除这个executor。动态增加executor配置项:spark.dynamicAllocation.schedulerB......
  • Hudi学习笔记4 - Hudi配置之Spark配置
    SparkDatasourceConfigs读配置配置项是否必须默认值配置说明as.of.instantYN/A0.9.0版本新增,时间旅行查询从哪儿开始,有两种格式的值:yyyyMMddHHmmss和yyyy-MM-ddHH:mm:ss,如果不指定则从最新的snapshot开始hoodie.file.index.enableNtruehoodie.......
  • [HiBench] 安装HiBench,测试在Spark上跑PageRank与修改源码测试
    [HiBench]安装HiBench,测试在Spark上跑PageRank与修改源码测试背景:我想在HiBench上测试在Spark上跑PageRank性能,并想要修改PageRank的源码进行测试。本来,HiBench在README里写的已经挺清楚的了,直接照着做就行。奈何我用的服务器没有珂学上网,所以还是遇到了一点小麻烦。下载HiBe......
  • Spark安装部署与基础实践
    安装java运行命令sudoaptinstallopenjdk-8-jdk-headless进行安装运行java-version测试安装是否成功,结果如下,安装成功安装Spark运行wgethttp://mirror.bit.edu.cn/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz下载spark运行tar-xzvfspark-2.4.5-bin-......
  • hive on spark报错
    Errorinquery:DetectedimplicitcartesianproductforLEFTOUTERjoinbetweenlogicalplansUnionJoinconditionismissingortrivial.Either:usetheCROSSJOINsyntaxtoallowcartesianproductsbetweentheserelations,or:enableimplicitcartesianpro......
  • storm-0.9.3 wordcount例子运行步骤(单机版)
    网上关于Stormwordcount的例子很多,不过都是基于storm-0.9.0.1,在运行例子过程中torm0.9.0.1.jar在maven中央仓库没有找到只有0.9.3,在运行过程中出现了好多问题。所以记下笔记。方便以后查看。1,下载Storm,地址为http://www.apache.org/dyn/closer.cgi/storm/apache-storm-0.9.3/......