首页 > 其他分享 >hadoop学习之MapReduce案例:输出每个班级中的成绩前三名的学生

hadoop学习之MapReduce案例:输出每个班级中的成绩前三名的学生

时间:2024-05-28 21:31:26浏览次数:16  
标签:java String 前三名 hadoop MapReduce Stu apache import

hadoop学习之MapReduce案例:输出每个班级中的成绩前三名的学生

所要处理的数据案例:

1500100001 施笑槐,22,女,文科六班,406
1500100002 吕金鹏,24,男,文科六班,440
1500100003 单乐蕊,22,女,理科六班,359
1500100004 葛德曜,24,男,理科三班,421
1500100005 宣谷芹,22,女,理科五班,395
1500100006 边昂雄,21,男,理科二班,314
...

1.Map端

package com.shujia.mr.top3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


/*
    TODO
        在编写代码之前需要先定义数据的处理逻辑
            对于各班级中的学生总分进行排序,要求取出各班级中总分前三名学生
            MapTask阶段:
                   ① 读取ReduceJoin的处理结果,并对数据进行提取
                   ② 按照学生的班级信息,对班级作为Key,整行数据作为Value写出到 ReduceTask 端
            ReduceTask阶段:
                   ① 接收到整个班级中的所有学生信息并将该数据存放在迭代器中
 */

public class Top3Mapper extends Mapper<LongWritable, Text, Text, Stu> {

    /**
     *  直接将学生对象发送到Reduce端进行操作
     *      ① 对于Stu学生自定义学生类,作为输出类型,需要将当前类进行序列化操作 implement Writable 接口
     *      ② 同时需要在自定义类中保证 类是具有无参构造的
     *          运行时会出现:
     *              java.lang.RuntimeException: java.lang.NoSuchMethodException: com.shujia.mr.top3.Stu.<init>()
     *          从日志上可以看到调用了 Stu.<init>() 指定的就是无参构造
     *          从逻辑上:
     *              在Mapper端 构建了Stu对象 => 通过调用其 write 对其进行了序列化操作
     *              在Reducer端  需要对其进行反序列化 => 通过无参构造创建自身的空参对象 => 调用readFields方法进行 反序列化
     *                  将数据赋予给当前的空参对象属性
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Stu>.Context context) throws IOException, InterruptedException {
        // 1500100009  沈德昌,21,男,理科一班,251  => 表示读取到的数据
        String[] split = value.toString().split("\t");
        if (split.length == 2) {
            String otherInfo = split[1];
            String[] columns = otherInfo.split(",");
            if (columns.length == 5) {
                String clazz = columns[3];
                Stu stu = new Stu(split[0], columns[0], Integer.valueOf(columns[1]), columns[2], columns[3], Integer.valueOf(columns[4]));
                context.write(new Text(clazz), stu);
            }

        }

    }
}

2.Reduce端

package com.shujia.mr.top3;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/*
    TODO ReduceTask阶段

 */
public class Top3Reducer extends Reducer<Text, Stu, Text, NullWritable> {

    /**
     * 对一个班级中所有的学生成绩进行排序 =>
     * 1.将数据存储在一个容器中
     * 2.对容器中数据进行排序操作
     * 对排序的结果进行取前三
     *
     * @param key     表示班级信息
     * @param values  一个班级中所有的学生对象
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<Stu> values, Reducer<Text, Stu, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        /*
            TODO 当程序执行到Reducer端时,需要对Values中的数据进行遍历,获取每一个学生对象
                    但是在添加过程中,ArrayList中所有的对象信息都变成一样的。
                  表示当前 ArrayList存储的对象为1个,每次添加的引用信息都是指向一个对象地址
             如何解决?
                  每次获取到对象后,对其进行克隆一份(重新创建一个对象进行存储)

         */
        ArrayList<Stu> stus = new ArrayList<>();
        for (Stu stu : values) {
            // 排序方案2:需要对Stu进行序列化
            //TODO 每次获取到对象后,对其进行克隆一份(重新创建一个对象进行存储)
            Stu stu1 = new Stu(stu.id, stu.name, stu.age, stu.gender, stu.clazz, stu.score);
            stus.add(stu1);
        }

        // 进行排序操作,将stus集合传入函数进行排序
        Collections.sort(stus,
                // 设定排序规则
                new Comparator<Stu>() {
                    /*CSDN:
                    return 0:不交换位置,不排序
                    return 1:交换位置
                    return -1:不交换位置
                    return o1-o2:升序排列
                    return o2-o1:降序排列
                     */
                    @Override
                    public int compare(Stu o1, Stu o2) {
                        int compareScore = o1.score - o2.score;
                        // 保证成绩序列是降序排序,若成绩相同,则按照学号进行字典排序返回数值,最后进行字典 升序排序
                        //String中的compareTo方法:用字符串1跟字符串2作比较,如果字符串1的字典顺序在字符串2前面(较小),则返回一个负数。
                        // 若在后面,则返回一个正数。若两个字符串的字典顺序相同,则返回0。
                        return -compareScore > 0 ? 1 : (compareScore == 0 ? o1.id.compareTo(o2.id) : -1);
                    }
                }
        );


        // 对排序的结果进行遍历
        for (int i = 0; i < 3; i++) {
            context.write(new Text(stus.get(i).toString()+","+(i+1)),NullWritable.get());
        }

    }
}

3.main方法

package com.shujia.mr.top3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.FileNotFoundException;
import java.io.IOException;

public class Top3 {
    /*
        TODO:将项目打包到Hadoop中进行执行。
     */
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // TODO MapReduce程序入口中的固定写法
        // TODO 1.获取Job对象 并设置相关Job任务的名称及入口类
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Top3");
        // 设置当前main方法所在的入口类
        job.setJarByClass(Top3.class);


        // TODO 2.设置自定义的Mapper和Reducer类
        job.setMapperClass(Top3Mapper.class);
        job.setReducerClass(Top3Reducer.class);


        // TODO 3.设置Mapper的KeyValue输出类 和 Reducer的输出类 (最终输出)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Stu.class);
        job.setOutputKeyClass(Text.class);
        /*
        NullWritable是Writable的一个特殊类,实现方法为空实现,
        不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,
        如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。
         */
        job.setOutputValueClass(NullWritable.class);


        // TODO 4.设置数据的输入和输出路径
        // 本地路径
        FileSystem fileSystem = FileSystem.get(job.getConfiguration());
        Path outPath = new Path("hadoop/out/new_top3");
//        Path outPath = new Path("/data/hadoop/out/new_top3");
        Path inpath = new Path("hadoop/out/reducejoin");
//        Path inpath = new Path("/data/hadoop/out/reducejoin");
        if (!fileSystem.exists(inpath)) {
            throw new FileNotFoundException(inpath+"不存在");
        }
        TextInputFormat.addInputPath(job,inpath);

        if (fileSystem.exists(outPath)) {
            System.out.println("路径存在,开始删除");
            fileSystem.delete(outPath,true);
        }
        TextOutputFormat.setOutputPath(job,outPath);


        // TODO 5.提交任务开始执行
        job.waitForCompletion(true);

    }
}

4.创建的学生类

package com.shujia.mr.top3;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;

public class Stu implements Writable {
    String id;
    String name;
    int age;
    String gender;
    String clazz;
    int score;

    /*
        TODO 使用Hadoop序列化的问题:
            java.lang.RuntimeException: java.lang.NoSuchMethodException: com.shujia.mr.top3.Stu.<init>()
     */

    //TODO 需要给定无参构造方法(序列化需要)
    public Stu() {
    }

    public Stu(String id, String name, int age, String gender, String clazz, int score) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.gender = gender;
        this.clazz = clazz;
        this.score = score;
    }

    @Override
    public String toString() {
        return id +
                ", " + name +
                ", " + age +
                ", " + gender +
                ", " + clazz +
                ", " + score;
    }


    // TODO 自定义类要重写下列方法才能进行序列化
    /*
        对于Write方法中是对当前的对象进行序列化操作
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(name);
        out.writeInt(age);
        out.writeUTF(gender);
        out.writeUTF(clazz);
        out.writeInt(score);
    }

    /*
        readFields方法中是对当前对象进行反序列化操作
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF(); // 将0101数据反序列化数据并保存到当前属性中
        this.name = in.readUTF();
        this.age = in.readInt();
        this.gender = in.readUTF();
        this.clazz = in.readUTF();
        this.score = in.readInt();
    }
}

标签:java,String,前三名,hadoop,MapReduce,Stu,apache,import
From: https://blog.csdn.net/m0_58050808/article/details/139278477

相关文章

  • Hadoop学习之hdfs的操作
    Hadoop学习之hdfs的操作1.将HDFS中的文件复制到本地packagecom.shujia.hdfs;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.junit.After;importorg.junit.Before;importor......
  • 数据是如何写入到Hadoop HDFS中的?
    胡弦,视频号2023年度优秀创作者,互联网大厂P8技术专家,SpringCloudAlibaba微服务架构实战派(上下册)和RocketMQ消息中间件实战派(上下册)的作者,资深架构师,技术负责人,极客时间训练营讲师,四维口袋KVP最具价值技术专家,技术领域专家团成员,2021电子工业出版社年度优秀作者,获得2023电......
  • Hadoop HDFS NameNode核心原理分析
    胡弦,视频号2023年度优秀创作者,互联网大厂P8技术专家,SpringCloudAlibaba微服务架构实战派(上下册)和RocketMQ消息中间件实战派(上下册)的作者,资深架构师,技术负责人,极客时间训练营讲师,四维口袋KVP最具价值技术专家,技术领域专家团成员,2021电子工业出版社年度优秀作者,获得2023电......
  • 边缘计算|Hadoop——边缘计算,有没有对应的软件?例如数据中心或云计算环境进行数据处理
    边缘计算确实没有直接对应于Hadoop这样的单一软件框架,因为边缘计算更多的是一个概念或技术架构,它涵盖了在网络的边缘(即设备或数据源附近)进行数据处理和计算的能力。然而,这并不意味着边缘计算没有相应的软件支持或解决方案。在边缘计算环境中,通常会使用各种软件、工具和框架来支持......
  • 边缘计算|Hadoop——边缘计算和Hadoop是什么关系?
    边缘计算和Hadoop之间存在关联,但它们是两种不同的技术,分别应用于不同的计算场景。以下是它们之间关系的详细解释:定义与功能:边缘计算:边缘计算是指在靠近物或数据源头的一侧,采用网络、计算、存储、应用核心能力为一体的开放平台,就近提供最近端服务。它降低了延迟,节省了带宽,并允......
  • hadoop3.2.3+flink1.13.0+hbase2.4.8集群搭建
    hadoop3.2.3+flink1.13.0+hbase2.4.8集群搭建hadoop3.2.3+flink1.13.0+hbase2.4.8集群搭建1.准备3台centos7服务器配置hosts(可能需要重启才生效)/etc/hosts192.168.10.209master192.168.10.155slave1192.168.10.234slave2123456免密登录ssh-keygen-trsass......
  • 计算机毕业设计hadoop+hive知识图谱漫画推荐系统 动漫推荐系统 漫画分析可视化大屏 漫
    流程:1.DrissionPage+Selenium自动爬虫工具采集知音漫客动漫数据存入mysql数据库;2.Mapreduce对采集的动漫数据进行数据清洗、拆分数据项等,转为.csv文件上传hadoop的hdfs集群;3.hive建库建表导入.csv动漫数据;4.一半指标使用hive_sql分析得出,一半指标使用Spark之Scala完成;5.sq......
  • Hadoop创建文件、上传文件、下载文件、修改文件名、删除文件精细全流程
    目录一、起步流程1.创建配置参数对象---Configuration类(org.apache.hadoop.conf.Configuration)2.通过配置参数对象指定hdfs的地址3.创建HDFS文件系统的对象---带配置项---FileSystem类​二、具体操作(1)创建目录:/wordcount(2)下载文件:/data/input/word.txt下载到D:/hadoop......
  • Hadoop 学习
    Hadoop三种运行模式:1.本地模式(学习)1.没有HDFS,使用当前系统下的文件系统2.没有YARN,使用的是Linux中的资源3.使用了Map-ReduceFramework2.伪分布式模式(学习)1.只有单台机器2.使用HDFS、Yarn、MapReduce3.分布式模式(企业级)1.多台服务器2.集群模式,包含整......
  • 大数据技术原理(二):搭建hadoop伪分布式集群这一篇就够了
    (实验一搭建hadoop伪分布式)--------------------------------------------------------------------------------------------------------------------------------一、实验目的1.理解Hadoop伪分布式的安装过程实验内容涉及Hadoop平台的搭建和配置,旨在提高对大数据处理框......