hadoop学习之MapReduce案例:输出每个班级中的成绩前三名的学生
所要处理的数据案例:
1500100001 施笑槐,22,女,文科六班,406
1500100002 吕金鹏,24,男,文科六班,440
1500100003 单乐蕊,22,女,理科六班,359
1500100004 葛德曜,24,男,理科三班,421
1500100005 宣谷芹,22,女,理科五班,395
1500100006 边昂雄,21,男,理科二班,314
...
1.Map端
package com.shujia.mr.top3;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
TODO
在编写代码之前需要先定义数据的处理逻辑
对于各班级中的学生总分进行排序,要求取出各班级中总分前三名学生
MapTask阶段:
① 读取ReduceJoin的处理结果,并对数据进行提取
② 按照学生的班级信息,对班级作为Key,整行数据作为Value写出到 ReduceTask 端
ReduceTask阶段:
① 接收到整个班级中的所有学生信息并将该数据存放在迭代器中
*/
public class Top3Mapper extends Mapper<LongWritable, Text, Text, Stu> {
/**
* 直接将学生对象发送到Reduce端进行操作
* ① 对于Stu学生自定义学生类,作为输出类型,需要将当前类进行序列化操作 implement Writable 接口
* ② 同时需要在自定义类中保证 类是具有无参构造的
* 运行时会出现:
* java.lang.RuntimeException: java.lang.NoSuchMethodException: com.shujia.mr.top3.Stu.<init>()
* 从日志上可以看到调用了 Stu.<init>() 指定的就是无参构造
* 从逻辑上:
* 在Mapper端 构建了Stu对象 => 通过调用其 write 对其进行了序列化操作
* 在Reducer端 需要对其进行反序列化 => 通过无参构造创建自身的空参对象 => 调用readFields方法进行 反序列化
* 将数据赋予给当前的空参对象属性
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Stu>.Context context) throws IOException, InterruptedException {
// 1500100009 沈德昌,21,男,理科一班,251 => 表示读取到的数据
String[] split = value.toString().split("\t");
if (split.length == 2) {
String otherInfo = split[1];
String[] columns = otherInfo.split(",");
if (columns.length == 5) {
String clazz = columns[3];
Stu stu = new Stu(split[0], columns[0], Integer.valueOf(columns[1]), columns[2], columns[3], Integer.valueOf(columns[4]));
context.write(new Text(clazz), stu);
}
}
}
}
2.Reduce端
package com.shujia.mr.top3;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/*
TODO ReduceTask阶段
*/
public class Top3Reducer extends Reducer<Text, Stu, Text, NullWritable> {
/**
* 对一个班级中所有的学生成绩进行排序 =>
* 1.将数据存储在一个容器中
* 2.对容器中数据进行排序操作
* 对排序的结果进行取前三
*
* @param key 表示班级信息
* @param values 一个班级中所有的学生对象
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<Stu> values, Reducer<Text, Stu, Text, NullWritable>.Context context) throws IOException, InterruptedException {
/*
TODO 当程序执行到Reducer端时,需要对Values中的数据进行遍历,获取每一个学生对象
但是在添加过程中,ArrayList中所有的对象信息都变成一样的。
表示当前 ArrayList存储的对象为1个,每次添加的引用信息都是指向一个对象地址
如何解决?
每次获取到对象后,对其进行克隆一份(重新创建一个对象进行存储)
*/
ArrayList<Stu> stus = new ArrayList<>();
for (Stu stu : values) {
// 排序方案2:需要对Stu进行序列化
//TODO 每次获取到对象后,对其进行克隆一份(重新创建一个对象进行存储)
Stu stu1 = new Stu(stu.id, stu.name, stu.age, stu.gender, stu.clazz, stu.score);
stus.add(stu1);
}
// 进行排序操作,将stus集合传入函数进行排序
Collections.sort(stus,
// 设定排序规则
new Comparator<Stu>() {
/*CSDN:
return 0:不交换位置,不排序
return 1:交换位置
return -1:不交换位置
return o1-o2:升序排列
return o2-o1:降序排列
*/
@Override
public int compare(Stu o1, Stu o2) {
int compareScore = o1.score - o2.score;
// 保证成绩序列是降序排序,若成绩相同,则按照学号进行字典排序返回数值,最后进行字典 升序排序
//String中的compareTo方法:用字符串1跟字符串2作比较,如果字符串1的字典顺序在字符串2前面(较小),则返回一个负数。
// 若在后面,则返回一个正数。若两个字符串的字典顺序相同,则返回0。
return -compareScore > 0 ? 1 : (compareScore == 0 ? o1.id.compareTo(o2.id) : -1);
}
}
);
// 对排序的结果进行遍历
for (int i = 0; i < 3; i++) {
context.write(new Text(stus.get(i).toString()+","+(i+1)),NullWritable.get());
}
}
}
3.main方法
package com.shujia.mr.top3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.FileNotFoundException;
import java.io.IOException;
public class Top3 {
/*
TODO:将项目打包到Hadoop中进行执行。
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO MapReduce程序入口中的固定写法
// TODO 1.获取Job对象 并设置相关Job任务的名称及入口类
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Top3");
// 设置当前main方法所在的入口类
job.setJarByClass(Top3.class);
// TODO 2.设置自定义的Mapper和Reducer类
job.setMapperClass(Top3Mapper.class);
job.setReducerClass(Top3Reducer.class);
// TODO 3.设置Mapper的KeyValue输出类 和 Reducer的输出类 (最终输出)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Stu.class);
job.setOutputKeyClass(Text.class);
/*
NullWritable是Writable的一个特殊类,实现方法为空实现,
不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,
如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。
*/
job.setOutputValueClass(NullWritable.class);
// TODO 4.设置数据的输入和输出路径
// 本地路径
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
Path outPath = new Path("hadoop/out/new_top3");
// Path outPath = new Path("/data/hadoop/out/new_top3");
Path inpath = new Path("hadoop/out/reducejoin");
// Path inpath = new Path("/data/hadoop/out/reducejoin");
if (!fileSystem.exists(inpath)) {
throw new FileNotFoundException(inpath+"不存在");
}
TextInputFormat.addInputPath(job,inpath);
if (fileSystem.exists(outPath)) {
System.out.println("路径存在,开始删除");
fileSystem.delete(outPath,true);
}
TextOutputFormat.setOutputPath(job,outPath);
// TODO 5.提交任务开始执行
job.waitForCompletion(true);
}
}
4.创建的学生类
package com.shujia.mr.top3;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
public class Stu implements Writable {
String id;
String name;
int age;
String gender;
String clazz;
int score;
/*
TODO 使用Hadoop序列化的问题:
java.lang.RuntimeException: java.lang.NoSuchMethodException: com.shujia.mr.top3.Stu.<init>()
*/
//TODO 需要给定无参构造方法(序列化需要)
public Stu() {
}
public Stu(String id, String name, int age, String gender, String clazz, int score) {
this.id = id;
this.name = name;
this.age = age;
this.gender = gender;
this.clazz = clazz;
this.score = score;
}
@Override
public String toString() {
return id +
", " + name +
", " + age +
", " + gender +
", " + clazz +
", " + score;
}
// TODO 自定义类要重写下列方法才能进行序列化
/*
对于Write方法中是对当前的对象进行序列化操作
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(name);
out.writeInt(age);
out.writeUTF(gender);
out.writeUTF(clazz);
out.writeInt(score);
}
/*
readFields方法中是对当前对象进行反序列化操作
*/
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF(); // 将0101数据反序列化数据并保存到当前属性中
this.name = in.readUTF();
this.age = in.readInt();
this.gender = in.readUTF();
this.clazz = in.readUTF();
this.score = in.readInt();
}
}
标签:java,String,前三名,hadoop,MapReduce,Stu,apache,import
From: https://blog.csdn.net/m0_58050808/article/details/139278477