首页 > 其他分享 >sclar

sclar

时间:2023-10-25 14:11:06浏览次数:26  
标签:sclar String val people height import spark

import org.apache.spark.{SparkConf, SparkContext}  
import org.apache.spark.rdd.RDD  
import org.apache.spark.SparkContext._  
import org.apache.spark.SparkConf._  
import org.apache.spark.rdd.PairRDDFunctions._
def mergeAndRemoveDuplicates(fileA: String, fileB: String, outputPath: String): Unit = {  
 val conf = new SparkConf().setAppName("MergeAndRemoveDuplicates")  
 val sc = new SparkContext(conf)  
 
 // 读取两个文件,将它们合并,并使用map将每行转换为一个(key, value)对,其中key是行,value是None  
 val rdd = sc.textFile(fileA) zip sc.textFile(fileB) map { case (lineA, lineB) => (lineA, None.asInstanceOf[Option[String]]) }  
 
 // 使用coalesce函数将RDD中的空值节点合并到一起,然后使用reduceByKey函数将具有相同key的值相加  
 val mergedRdd = rdd.coalesce(1).reduceByKey((x, y) => x ++ y).map { case (key, value) => key }  
 
 // 将结果写入输出文件  
 mergedRdd.saveAsTextFile(outputPath)  
}
import org.apache.spark.{SparkConf, SparkContext}  
import org.apache.spark.sql.SparkSession
object AverageScore {  
  def main(args: Array[String]): Unit = {  
    val conf = new SparkConf().setAppName("Average Score")  
    val sc = new SparkContext(conf)  
    val spark = SparkSession.builder().sparkContext(sc.get).getOrCreate()
def calculateAverage(inputPath: String, outputPath: String): Unit = {  
  val df = spark.read.textFile(inputPath).toDF("name", "score")  
  df.createOrReplaceTempView("scores")  
    
  val averageScore = spark.sql("SELECT AVG(score) as average_score FROM scores")  
    
  averageScore.write.text(outputPath)  
}
calculateAverage("path/to/inputA", "path/to/outputA")  
calculateAverage("path/to/inputB", "path/to/outputB")
import org.apache.spark.{SparkConf, SparkContext}  
import org.apache.spark.sql.SparkSession  
  
object PeopleStatistics {  
  def main(args: Array[String]): Unit = {  
    val conf = new SparkConf().setAppName("People Statistics")  
    val sc = new SparkContext(conf)  
    val spark = SparkSession.builder().sparkContext(sc.get).getOrCreate()  
  
    import spark.implicits._  
  
    // 读取文件,将每行数据分割为三部分,并转换为Person对象  
    val people = spark.read.format("csv").option("header", "false").load("path/to/people.txt")  
      .map(_.split(","))  
      .map { case Array(id, gender, height) => Person(id.toInt, gender, height.toInt) }  
      .toDF()  
      .as[(Int, String, Int)]  
  
    // 计算统计数据  
    val maleCount = people.filter(_._2 == "M").count()  
    val femaleCount = people.filter(_._2 == "F").count()  
    val maleMaxHeight = people.filter(_._2 == "M").agg(max("height")).first()._2  
    val femaleMaxHeight = people.filter(_._2 == "F").agg(max("height")).first()._2  
    val maleMinHeight = people.filter(_._2 == "M").agg(min("height")).first()._2  
    val femaleMinHeight = people.filter(_._2 == "F").agg(min("height")).first()._2  
    val maleAverageHeight = people.filter(_._2 == "M").agg(avg("height")).first()._2  
    val femaleAverageHeight = people.filter(_._2 == "F").agg(avg("height")).first()._2  
  
    // 打印结果  
    println("Male count: " + maleCount)  
    println("Female count: " + femaleCount)  
    println("Male max height: " + maleMaxHeight)  
    println("Female max height: " + femaleMaxHeight)  
    println("Male min height: " + maleMinHeight)  
    println("Female min height: " + femaleMinHeight)  
    println("Male average height: " + maleAverageHeight)  
    println("Female average height: " + femaleAverageHeight)  
  }  
}  
  
case class Person(id: Int, gender: String, height: Int)

标签:sclar,String,val,people,height,import,spark
From: https://www.cnblogs.com/gbcmakehsht/p/17787092.html

相关文章