首页 > 编程语言 >RDD编程

RDD编程

时间:2024-06-09 14:59:15浏览次数:8  
标签:val scala 编程 RDD sparkapp println spark 2.11

环境准备

1.VMware虚拟机(Linux操作系统)

2.Windows7~11

3.JDK

4.HadoopHadoop安装及集群环境配置_hadoop环境搭建与安装-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/2301_81921110/article/details/139362063?spm=1001.2014.3001.55015.spark和sbt

spark安装和编程实践(Spark2.1.0)-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/2301_81921110/article/details/139398991?spm=1001.2014.3001.55016.Xshell 7(用于连接虚拟机与Windows)

Xshell 7与Xftp 7使用教程-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/2301_81921110/article/details/139377831?spm=1001.2014.3001.55017.Xftp 7(用于虚拟机与Windows之间传输文件)

一、启动虚拟机并在Windows中使用Xshell 7连接虚拟机

二、在Ubuntu本地新建一个文件,命名为:2024.txt,内容如下:

China is my motherland

I love China

I am from China

请对文件内单词进行词频统计。

1、启动hadoop

start-all.sh

2、启动spark shell

①cd /usr/local/spark

②bin/spark-shell   # 启动spark shell

3、在Ubuntu本地新建一个文件,命名为:2024.txt,内容如下:

China is my motherland

I love China

I am from China

重新打开一个终端,在其中创建2024.txt文件;

vim 2024.txt

spark shell运行代码:

scala> val lines = sc.textFile("file:///home/hadoop/2024.txt")

scala> val words=lines.flatMap(line => line.split(" "))

scala> var wordcount = words.map(word=>(word,1)).groupByKey()

scala> wordcount.foreach(println)

scala> var wordcount1 = words.map(word=>(word,1)).reduceByKey((a,b)=>a+b)

scala> wordcount1.foreach(println)

Scala应用程序代码

在./sparkapp/src/main/scala下建立一个名为2024_1.scala的文件

vim ./sparkapp/src/main/scala/2024_1.scala

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession



object WordCountProgram extends App {

  val sparkConf = new SparkConf().setAppName("Word Count Application")

  val spark = SparkSession.builder.config(sparkConf).getOrCreate()



  // 读取文件

  val lines = spark.sparkContext.textFile("file:///home/hadoop/2024.txt")



  // 分割单词

  val words = lines.flatMap(line => line.split(" "))



  // 使用groupByKey进行词频统计(不推荐,因为效率较低)

  val wordCountGroupByKey: RDD[(String, Iterable[Int])] = words.map(word => (word, 1)).groupByKey()

  println("Using groupByKey:")

  wordCountGroupByKey.foreach{case (word, counts) => println(s"$word -> ${counts.sum}")}



  // 使用reduceByKey进行高效的词频统计

  val wordCountReduceByKey: RDD[(String, Int)] = words.map(word => (word, 1)).reduceByKey(_ + _)

  println("\nUsing reduceByKey:")

  wordCountReduceByKey.foreach{case (word, count) => println(s"$word -> $count")}



  spark.stop()

}

cd ~/sparkapp

find .

通过如下代码将整个应用程序打包成JAR:

/usr/local/sbt/sbt package

打包成功会输出如下内容:

生成的jar包的位置为 ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar。

通过spark-submit运行程序命令如下:

/usr/local/spark/bin/spark-submit --class "WordCountProgram" ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar

运行结果如下:

三、给出一组键值对数据:("c",8), ("b",25), ("c",17), ("a",42), ("b",4), ("d",9), ("e",17), ("c",2), ("f",29), ("g",21), ("b",9),请分别根据key和values的大小进行降序排列。要求有代码和执行截图。

1、spark shell运行代码:

scala>val d1 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)))  

scala>d1.reduceByKey(_+_).sortByKey(false).collect

scala>val d2 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)))  

scala>d2.reduceByKey(_+_).sortBy(_._2,false).collect

2、Scala应用程序代码

在./sparkapp/src/main/scala下建立一个名为2024_2.scala的文件

①cd ~

②vim ./sparkapp/src/main/scala/2024_2.scala

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession



object WordCountAndSortExample {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("Word Count and Sort Example")

    val spark = SparkSession.builder.config(sparkConf).getOrCreate()



    // 创建初始数据

    val data = Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9))

    

    // 第一部分:按key降序排序

    val d1 = spark.sparkContext.parallelize(data)

    val sortedByKey = d1.reduceByKey(_+_).sortByKey(false)

    println("Sorted by key in descending order:")

    sortedByKey.collect.foreach(println)



    // 第二部分:按value降序排序

    val d2 = spark.sparkContext.parallelize(data)

    val sortedByValue = d2.reduceByKey(_+_).sortBy(_._2, false)

    println("\nSorted by value in descending order:")

    sortedByValue.collect.foreach(println)



    spark.stop()

  }

}

③cd ~/sparkapp

通过如下代码将整个应用程序打包成JAR:

④/usr/local/sbt/sbt package

打包成功会输出如下内容:

生成的jar包的位置为 ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar。

通过spark-submit运行程序命令如下:

/usr/local/spark/bin/spark-submit --class "WordCountAndSortExample" ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar

运行结果如下:

按key降序排序:

按value降序排序:

四、给出一组数据,("spark",2), ("hadoop",6), ("hadoop",4), ("spark",6), ("hadoop",5), ("spark",4),请根据数据中的key,计算values的平均值。要求有代码和执行截图。

1、spark shell运行代码:

scala>val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))

scala>rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2+y._2)).mapValues(x => (x._1/x._2)).collect().foreach(println)

2、Scala应用程序代码

在./sparkapp/src/main/scala下建立一个名为2024_3.scala的文件

①cd ~

②vim ./sparkapp/src/main/scala/2024_3.scala

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession



object AverageCountExample {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("Average Count Example")

    val spark = SparkSession.builder.config(conf).getOrCreate()



    // 创建初始数据

    val data = Array(("spark", 2), ("hadoop", 6), ("hadoop", 4), ("spark", 6))



    // 实现计算每个键的值的平均数

    val rdd = spark.sparkContext.parallelize(data)

    val avgRdd: RDD[(String, Double)] = rdd

        .mapValues(x => (x, 1)) // 转换每个值为一个二元组(value, 1)

        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) //按键聚合,累加value和计数

        .mapValues(x => x._1.toDouble / x._2) // 计算平均值

    avgRdd.collect().foreach(println)// 打印结果

    spark.stop() // 停止SparkSession

  }

}

③cd ~/sparkapp

通过如下代码将整个应用程序打包成JAR:

④/usr/local/sbt/sbt package

打包成功会输出如下内容:

生成的jar包的位置为 ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar。

通过spark-submit运行程序命令如下:

/usr/local/spark/bin/spark-submit --class "AverageCountExample" ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar

运行结果如下:

标签:val,scala,编程,RDD,sparkapp,println,spark,2.11
From: https://blog.csdn.net/2301_81921110/article/details/139561531

相关文章

  • CUDA编程学习笔记-02
    CUDA代码高效计算策略高效公式✒️Math代表数学计算量,Memory代表每个线程的内存......
  • JAVAEE之网络编程(1)_套接字、UDP数据报套接字编程及从代码实例
    前言什么是网络编程呢? 网络编程,指网络上的主机,通过不同的进程,以编程的方式实现网络通信(或称为网络数据传输)。当然,即便是同一个主机,只要是不同进程,基于网络来传输数据,也属于网络编程一、网路编程中的基本概念1.1发送端和接收端发送端:数据的发送方进程,称为发送端。发......
  • 实验6 C语言结构体、枚举应用编程
    //task4.c#include<stdio.h>#include<stdlib.h>#include<string.h>#defineN10typedefstruct{charisbn[20];//isbn号charname[80];//书名charauthor[80];//作者doublesales_price;//售价int......
  • 实验6 C语言结构体、枚举应用编程
    task.1#defineN3//运行程序输入测试时,可以把这个数组改小一些输入测试#include<stdlib.h>typedefstructstudent{intid;//学号charname[20];//姓名charsubject[20];//考试科目doubleperf;//......
  • 程序员学习Processing和TouchDesigner视觉编程相关工具
     ProessingProcessing是一种用于视觉艺术和创意编程的开发环境和编程语言。它最初是为了帮助非专业程序员学习编程,特别是那些对于创意编程和视觉表达感兴趣的人。Processing提供了简单易用的API,使得绘制图形、创建动画和交互式应用变得相对容易。在前端应用Processing......
  • 实验6 C语言结构体、枚举应用编程
    task1.c1//P286例8.172//对教材上的程序作了微调整,把输出学生信息单独编写成一个函数模块3//打印不及格学生信息和所有学生信息程分别调用45#include<stdio.h>6#include<string.h>7#defineN3//运行程序输入测试时,可以把这个数组......
  • 【机器学习基础】Python编程07:五个实用练习题的解析与总结
    Python是一种广泛使用的高级编程语言,它在机器学习领域中的重要性主要体现在以下几个方面:简洁易学:Python语法简洁清晰,易于学习,使得初学者能够快速上手机器学习项目。丰富的库支持:Python拥有大量的机器学习库,如scikit-learn、TensorFlow、Keras和PyTorch等,这些库提供了......
  • 【机器学习基础】Python编程08:五个实用练习题的解析与总结
    Python是一种广泛使用的高级编程语言,它在机器学习领域中的重要性主要体现在以下几个方面:简洁易学:Python语法简洁清晰,易于学习,使得初学者能够快速上手机器学习项目。丰富的库支持:Python拥有大量的机器学习库,如scikit-learn、TensorFlow、Keras和PyTorch等,这些库提供了......
  • Python_编程基础
    Python_编程基础Python编程基础0、简单介绍解释型语言:一边编译一边运行,不需要进行编译,运行效率比较低解释器JavaScript-浏览器python.exephp.exe编译型语言:运行前需要进行编译,运行效率比较高C.c->.exe组合:anaconda+pycharm、python+pycharm/sublime/geany/vs......
  • Postgres 查询中的宏/元编程
    如果我拥有与本问题中相同的示例数据,并另外声明了以下两个函数:创建或替换函数example.markout_666_example_666_price_table_666_price(_symboltext,_time_oftimestamptz,_startinterval,_durationinterval)返回float8LANGUAGEsqlSTABLESTRICTPARA......