import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf._
import org.apache.spark.rdd.PairRDDFunctions._
def mergeAndRemoveDuplicates(fileA: String, fileB: String, outputPath: String): Unit = {
val conf = new SparkConf().setAppName("MergeAndRemoveDuplicates")
val sc = new SparkContext(conf)
// 读取两个文件,将它们合并,并使用map将每行转换为一个(key, value)对,其中key是行,value是None
val rdd = sc.textFile(fileA) zip sc.textFile(fileB) map { case (lineA, lineB) => (lineA, None.asInstanceOf[Option[String]]) }
// 使用coalesce函数将RDD中的空值节点合并到一起,然后使用reduceByKey函数将具有相同key的值相加
val mergedRdd = rdd.coalesce(1).reduceByKey((x, y) => x ++ y).map { case (key, value) => key }
// 将结果写入输出文件
mergedRdd.saveAsTextFile(outputPath)
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
object AverageScore {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Average Score")
val sc = new SparkContext(conf)
val spark = SparkSession.builder().sparkContext(sc.get).getOrCreate()
def calculateAverage(inputPath: String, outputPath: String): Unit = {
val df = spark.read.textFile(inputPath).toDF("name", "score")
df.createOrReplaceTempView("scores")
val averageScore = spark.sql("SELECT AVG(score) as average_score FROM scores")
averageScore.write.text(outputPath)
}
calculateAverage("path/to/inputA", "path/to/outputA")
calculateAverage("path/to/inputB", "path/to/outputB")
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
object PeopleStatistics {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("People Statistics")
val sc = new SparkContext(conf)
val spark = SparkSession.builder().sparkContext(sc.get).getOrCreate()
import spark.implicits._
// 读取文件,将每行数据分割为三部分,并转换为Person对象
val people = spark.read.format("csv").option("header", "false").load("path/to/people.txt")
.map(_.split(","))
.map { case Array(id, gender, height) => Person(id.toInt, gender, height.toInt) }
.toDF()
.as[(Int, String, Int)]
// 计算统计数据
val maleCount = people.filter(_._2 == "M").count()
val femaleCount = people.filter(_._2 == "F").count()
val maleMaxHeight = people.filter(_._2 == "M").agg(max("height")).first()._2
val femaleMaxHeight = people.filter(_._2 == "F").agg(max("height")).first()._2
val maleMinHeight = people.filter(_._2 == "M").agg(min("height")).first()._2
val femaleMinHeight = people.filter(_._2 == "F").agg(min("height")).first()._2
val maleAverageHeight = people.filter(_._2 == "M").agg(avg("height")).first()._2
val femaleAverageHeight = people.filter(_._2 == "F").agg(avg("height")).first()._2
// 打印结果
println("Male count: " + maleCount)
println("Female count: " + femaleCount)
println("Male max height: " + maleMaxHeight)
println("Female max height: " + femaleMaxHeight)
println("Male min height: " + maleMinHeight)
println("Female min height: " + femaleMinHeight)
println("Male average height: " + maleAverageHeight)
println("Female average height: " + femaleAverageHeight)
}
}
case class Person(id: Int, gender: String, height: Int)
标签:sclar,String,val,people,height,import,spark
From: https://www.cnblogs.com/gbcmakehsht/p/17787092.html