// 计算每个人的平均成绩
JavaPairRDD<String, Double> averages = scores.join(counts).mapValues(new Function<Tuple2<Integer, Integer>, Double>() {
@Override
public Double call(Tuple2<Integer, Integer> tuple) {
return (double) tuple._1 / tuple._2;
}
});
在这个修正后的代码中,我们使用 mapValues
来转换 scores.join(counts)
的值。mapValues
接受一个 Function
,这个 Function
只作用于 Tuple2
的值部分,即 Integer
和 Integer
的对。我们计算这两个数的商,得到平均成绩。
完整的程序应该如下所示:
package com.example;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.util.List;
public class AverageScores {
public static void main(String[] args) {
// 配置Spark
SparkConf conf = new SparkConf()
.setAppName("AverageScores")
.setMaster("local[*]"); // 根据你的环境修改
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取HDFS文件
JavaRDD<String> lines = sc.textFile("hdfs://node1:8020/test/1.txt");
// 解析文件内容并按人名分组成绩
JavaPairRDD<String, Integer> scores = lines.filter(line -> line.contains(" "))
.mapToPair(line -> new Tuple2<>(line.split(" ")[0], Integer.parseInt(line.split(" ")[1])));
// 计算每个人的总成绩
JavaPairRDD<String, Integer> totalScores = scores.reduceByKey((a, b) -> a + b);
// 计算每个人成绩的数量
JavaPairRDD<String, Integer> scoreCounts = lines.filter(line -> line.contains(" "))
.mapToPair(line -> new Tuple2<>(line.split(" ")[0], 1))
.reduceByKey((a, b) -> a + b);
// 计算每个人的平均成绩
JavaPairRDD<String, Double> averages = totalScores.join(scoreCounts).mapValues(new Function<Tuple2<Integer, Integer>, Double>() {
@Override
public Double call(Tuple2<Integer, Integer> tuple) {
return (double) tuple._1 / tuple._2;
}
});
// 收集并输出结果
List<Tuple2<String, Double>> output = averages.collect();
for (Tuple2<String, Double> tuple : output) {
System.out.printf("(%s, %.2f)%n", tuple._1, tuple._2);
}
// 关闭Spark上下文
sc.close();
}
}
标签:2024.12,tuple,JavaPairRDD,Tuple2,new,import,line
From: https://www.cnblogs.com/258-333/p/18585240