首页 > 其他分享 >大数据框架之一——Hadoop学习第三天

大数据框架之一——Hadoop学习第三天

时间:2024-08-08 15:42:09浏览次数:8  
标签:框架 Text 第三天 hadoop Hadoop job org apache import

1、MapReduce概述及原理

  • MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.

  • MapReduce是分布式运行的,由两个阶段组成:Map和Reduce,Map阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据。Reduce阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据【在这先把reduce理解为一个单独的聚合程序即可】.

  • MapReduce框架都有默认实现,用户只需要覆盖map()和reduce()两个函数,即可实现分布式计算,非常简单.

    • 这两个函数的形参和返回值都是<key、value>,使用的时候一定要注意构造<k,v>。

2、WordCount计算

2.1常用的Writable实现类

  • MapReduce中对应数据格式为Key-Value格式,并且Key和Value都是Writable实现类

image.png

2.2MapReduce计算-WordCount

  • 主函数入口代码如下
package com.mr.worcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


import java.io.FileNotFoundException;
import java.io.IOException;

public class WordCount {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // TODO MapReduce程序入口中的固定写法
        // TODO 1.获取Job对象 并设置相关Job任务的名称及入口类
//        Job job = new Job();
//        job.setJobName("word count");
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        // 设置当前main方法所在的入口类
        job.setJarByClass(WordCount.class);
        // TODO 2.设置自定义的Mapper和Reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // TODO 3.设置Mapper的KeyValue输出类 和 Reducer的输出类 (最终输出)
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置ReduceTask的数量为2
        job.setNumReduceTasks(2);

        // TODO 4.设置数据的输入和输出路径
        //  org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
        //  org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
        //在HDFS上进行文件的读取和写出
//        TextInputFormat.addInputPath(job,new Path("/data/words.txt"));
//        TextOutputFormat.setOutputPath(job,new Path("/api/wordCount"));

        // 本地路径
        FileSystem fileSystem = FileSystem.get(job.getConfiguration());
        Path outPath = new Path("hadoop/out/wordCount");
//        Path inpath = new Path("hadoop/data/words.txt");
        Path inpath = new Path("hadoop/data/words");
        if (!fileSystem.exists(inpath)) {
            throw new FileNotFoundException(inpath+"不存在");
//            System.out.println(inpath+"不存在");
//            System.exit(1);
        }
//        TextInputFormat.addInputPath(job,inpath);
        FileInputFormat.addInputPath(job,inpath);
        
        if (fileSystem.exists(outPath)) {
            System.out.println("路径存在,开始删除");
            fileSystem.delete(outPath,true);
        }
//        TextOutputFormat.setOutputPath(job,outPath);
        FileOutputFormat.setOutputPath(job,outPath);

        // TODO 5.提交任务开始执行
        job.waitForCompletion(true);
    }
}

  • Mapper代码
package com.mr.worcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/*
    TODO MapTask阶段
        自定义类继承Mapper,该Mapper类为一个具体的类,并其中定义了一些泛型
            <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        通过之前的学习,知道MapTask阶段需要编写map函数,定义数据处理的逻辑
        KEYIN: 表示输入的Key的类型 表示map函数处理的Key类型 变量保存的数据是偏移量
                    读取数据的位置 字节数的位置非常大,需要使用Long类型 => LongWritable
        VALUEIN: 表示输入的Value类型  表示map函数处理的Value类型  表示的是一行字符串数据 String => Text
        KEYOUT: 表示输出的Key的类型 根据要处理的数据逻辑来进行定义 => 输出的Key为单词 => Java中的String类型 => Hadoop中的Text
        VALUEOUT:表示输出的Value的类型 根据要处理的数据逻辑来进行定义 => 输出的Value为1 => Java中的int类型 => Hadoop中的IntWritable

        注意:当数据在Hadoop中进行传递时,需要进行序列化,而Java中的序列化内容多,比较重,导致网络IO开销大
              为了计算速度快,Hadoop提供一套新的序列化类型

 */
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    /**
     *  map函数中定义了Task任务在Map阶段所做的数据处理任务
     *      当前函数中需要对获取到的一行字符串进行按照 空格切分,再将单词遍历 之后再形成 Key为单词  1为Value的数据形式
     *  TODO 注意:map方法在执行的过程中是一行数据对应调用一次该函数
     * @param key 变量保存的数据是偏移量
     * @param value 表示的是一行字符串数据 是从文本文件中按行读取出来的
     * @param context 表示的是 Mapper.Context的上下文对象,作用是连接 Map阶段和Reduce阶段的桥梁
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // value遍历中的数据 => hello hadoop
        // TODO 获取到的一行字符串进行按照 空格切分
        String[] words = value.toString().split(" ");

        // TODO 再将单词遍历
        for (String word : words) {
            // TODO 形成Key为单词  1为Value的数据形式
            // context 对象可以将Map阶段生成的数据发送给reduce阶段
            context.write(new Text(word),new IntWritable(1));
        }
    }
}
  • Reducer阶段
package com.mr.worcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
    TODO ReduceTask阶段
        自定义类继承Reducer,该Reducer类为一个具体的类,并其中定义了一些泛型
            <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        Reduce阶段的数据是由Map阶段发送过来的,所以Map阶段输出的类型就是Reduce阶段接收的类型
        根据处理逻辑:
            KEYIN: Text
            VALUEIN: IntWritable
        根据数据最终的要求:
            KEYOUT, VALUEOUT 表示最终每个单词出现的次数
            KEYOUT : Text
            VALUEOUT: IntWritable
 */
public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {

    /**
     *  reduce函数中定义了 Reduce阶段中要执行的代码逻辑
     *      将相同单词的KeyValue数据汇集到一起,再将所有的Value值 1 进行相加 得到最终的结果
     *  TODO 注意:① 对于reduce函数需要等Mapper阶段执行完成后才能再执行
     *            ② 对于每个Key会调用一次reduce函数
     *            ③ 对于Key的处理是存在有先后顺序的 按照字典序进行排序
     * @param key  表示map端输出的Key数据 单词
     * @param values 类型为Iterable 表示相同Key的Value数据形成的迭代器
     * @param context 上下文对象  可以将数据写出到HDFS
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 定义num 用于记录单词出现的次数
        int num = 0;
        // TODO 再将所有的Value值 1 进行相加 得到最终的结果
        for (IntWritable value : values) {
            num += value.get();
        }
        context.write(key,new IntWritable(num));
    }
}

3、Map Reduce关联分析

3.1案例:针对学生的各科成绩数据进行汇总

①主函数

package com.mr.count;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.FileNotFoundException;
import java.io.IOException;

public class Count {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // TODO MapReduce程序入口中的固定写法
        // TODO 1.获取Job对象 并设置相关Job任务的名称及入口类
//        Job job = new Job();
//        job.setJobName("word count");
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "ReduceJoin");
        // 设置当前main方法所在的入口类
        job.setJarByClass(Count.class);
        // TODO 2.设置自定义的Mapper和Reducer类
        job.setMapperClass(CountMapper.class);
        job.setReducerClass(CountReducer.class);

        // TODO 3.设置Mapper的KeyValue输出类 和 Reducer的输出类 (最终输出)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // TODO 4.设置数据的输入和输出路径

        // 本地路径
        FileSystem fileSystem = FileSystem.get(job.getConfiguration());
        Path outPath = new Path("hadoop/out/count");
        Path inpath = new Path("hadoop/data/score.txt");
        if (!fileSystem.exists(inpath)) {
            throw new FileNotFoundException(inpath+"不存在");
        }
        TextInputFormat.addInputPath(job,inpath);
//        TextInputFormat.addInputPath(job,inpath);


        if (fileSystem.exists(outPath)) {
            System.out.println("路径存在,开始删除");
            fileSystem.delete(outPath,true);
        }
        TextOutputFormat.setOutputPath(job,outPath);

        // TODO 5.提交任务开始执行
        job.waitForCompletion(true);
    }
}

②对应的Mapper代码

package com.mr.count;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/*
    TODO
        在编写代码之前需要先定义数据的处理逻辑
            MapTask阶段:
                根据读取的一行数据 进行按照,进行切分 获取学生ID和学生成绩  将学生ID作为key 成绩作为Value写出到 Reduce
            ReduceTask阶段:
                根据相同的学生ID将所有的成绩数据作为values获取到 形成迭代器,再进行遍历求其总和

        KEYIN: 表示输入的Key的类型 表示map函数处理的Key类型 变量保存的数据是偏移量
                    读取数据的位置 字节数的位置非常大,需要使用Long类型 => LongWritable
        VALUEIN: 表示输入的Value类型  表示map函数处理的Value类型  表示的是一行字符串数据 String => Text
        KEYOUT:  学生ID => 字符串 => Text
        VALUEOUT: 各科的成绩 => int  => IntWritable


 */
public class CountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * map函数中定义了Task任务在Map阶段所做的数据处理任务
     * 根据读取的一行数据 进行按照,进行切分 获取学生ID和学生成绩  将学生ID作为key 成绩作为Value写出到 Reduce
     *
     * @param key     变量保存的数据是偏移量
     * @param value   表示的是一行字符串数据 是从文本文件中按行读取出来的
     * @param context 表示的是 Mapper.Context的上下文对象,作用是连接 Map阶段和Reduce阶段的桥梁
     */

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String[] columns = value.toString().split(",");
        if (columns.length == 3) {
            String id = columns[0];
            String score = columns[2];
            context.write(new Text(id),new IntWritable(Integer.valueOf(score)));
        }
    }
}

③Reducer阶段

package com.mr.count;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
    TODO ReduceTask阶段
        自定义类继承Reducer,该Reducer类为一个具体的类,并其中定义了一些泛型
            <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        Reduce阶段的数据是由Map阶段发送过来的,所以Map阶段输出的类型就是Reduce阶段接收的类型
        根据处理逻辑:
            KEYIN: Text
            VALUEIN: IntWritable
        根据数据最终的要求:
            KEYOUT, VALUEOUT 表示最终每个单词出现的次数
            KEYOUT : Text
            VALUEOUT: IntWritable
 */
public class CountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {

    /**
     *  reduce函数中定义了 Reduce阶段中要执行的代码逻辑
     *      根据相同的学生ID将所有的成绩数据作为values获取到 形成迭代器,再进行遍历求其总和
     * @param key  表示map端输出的Key数据 学生ID
     * @param values 类型为Iterable 表示相同学生ID的成绩数据
     * @param context 上下文对象  可以将数据写出到HDFS
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int totalScore= 0;
        for (IntWritable score : values) {
            totalScore += score.get();
        }
        context.write(key,new IntWritable(totalScore));
    }
}

3.2ReduceJoin

  • 在map阶段,map函数同时读取两个文件File1和File2,注意区分的方法

①主入口

package com.mr.reduceJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.FileNotFoundException;
import java.io.IOException;

public class ReduceJoin {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // TODO MapReduce程序入口中的固定写法
        // TODO 1.获取Job对象 并设置相关Job任务的名称及入口类
//        Job job = new Job();
//        job.setJobName("word count");
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "ReduceJoin");
        // 设置当前main方法所在的入口类
        job.setJarByClass(ReduceJoin.class);
        // TODO 2.设置自定义的Mapper和Reducer类
        job.setMapperClass(ReduceJoinMapper.class);
        job.setReducerClass(ReduceJoinReducer.class);

        // TODO 3.设置Mapper的KeyValue输出类 和 Reducer的输出类 (最终输出)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // TODO 4.设置数据的输入和输出路径

        // 本地路径
        FileSystem fileSystem = FileSystem.get(job.getConfiguration());
        Path outPath = new Path("hadoop/out/reducejoin");

        // TODO 对输入路径添加了学生基本信息数据和总分数据
        Path studentInpath = new Path("hadoop/data/students.txt");
        Path totalScoreInpath = new Path("hadoop/out/count");
        if (!fileSystem.exists(studentInpath)) {
            throw new FileNotFoundException(studentInpath+"不存在");
        }
        TextInputFormat.addInputPath(job,studentInpath);

        if (!fileSystem.exists(totalScoreInpath)) {
            throw new FileNotFoundException(totalScoreInpath+"不存在");
        }
        TextInputFormat.addInputPath(job,totalScoreInpath);


        if (fileSystem.exists(outPath)) {
            System.out.println("路径存在,开始删除");
            fileSystem.delete(outPath,true);
        }
        TextOutputFormat.setOutputPath(job,outPath);

        // TODO 5.提交任务开始执行
        job.waitForCompletion(true);
    }
}

②MapTask阶段

package com.mr.reduceJoin;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
    TODO
        在编写代码之前需要先定义数据的处理逻辑
           MapTask阶段
                ① 对于MapTask需要对不同文件的数据进行判断
                ② 将读取到的字符串进行切分处理,将学生ID作为Key,其他信息作为Value写出到ReduceTask阶段
                        其中其他信息包含: 学生的所有基本信息和 学生的成绩数据 两类
                注意:
                    对于Mapper阶段的输出KeyValue类型为 Text Text
           ReduceTask阶段
                ① 接收的数据类型,就是MapTask阶段输出的数据类型
                ② 针对相同Key的Value数据进行判断是否为成绩数据或者基本信息数据
                ③ 如果判断出来,再拼接成一个整的字符串,再进行输出

 */
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    /**
     *   通过Debug可以看到对于多个文件数据,先读取students.txt数据再读取 count的结果数据
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {


        String readLine = value.toString();
        if (readLine.contains(",")) {
            String[] columns = readLine.split(",");
            if (columns.length == 5){
                String id = columns[0];
                String name = columns[1];
                String age = columns[2];
                String gender = columns[3];
                String clazz = columns[4];
                context.write(new Text(id),new Text(name+","+age+","+gender+","+clazz));
            }
        }else {
            String[] columns = readLine.split("\t");
            if (columns.length == 2){
                String id = columns[0];
                String score = columns[1];
                context.write(new Text(id),new Text(score));
            }
        }
    }
}

③ReduceTask阶段

package com.mr.reduceJoin;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
    TODO ReduceTask阶段
        自定义类继承Reducer,该Reducer类为一个具体的类,并其中定义了一些泛型
            <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        Reduce阶段的数据是由Map阶段发送过来的,所以Map阶段输出的类型就是Reduce阶段接收的类型
        根据处理逻辑:
            KEYIN: Text
            VALUEIN: IntWritable
        根据数据最终的要求:
            KEYOUT, VALUEOUT 表示最终每个单词出现的次数
            KEYOUT : Text
            VALUEOUT: IntWritable
 */
public class ReduceJoinReducer extends Reducer<Text, Text,Text, Text> {

    /**
     *  ReduceJoin 是指数据关联是产生再Reduce阶段
     */
    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        String info ="";
        String score ="";
        for (Text value : values) {
            if (!value.toString().contains(",")){
                // 当长度为1表示学生的成绩数据
                score = value.toString();
            }else {
                // 学生的基本信息数据
                info = value.toString();
            }
        }
        context.write(key,new Text(info+","+score));
    }
}

4、MapReduce过滤

  • 在MapReduce过程中可以根据判断逻辑选择适当的数据进行写出,同时MapReduce过程中允许只存在有Map过程

4.1过滤出总分大于450分的同学

①主函数

package com.mr.filter.more450;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.FileNotFoundException;
import java.io.IOException;

public class FilterMore450 {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // TODO MapReduce程序入口中的固定写法
        // TODO 1.获取Job对象 并设置相关Job任务的名称及入口类
//        Job job = new Job();
//        job.setJobName("word count");
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "FilterMore450");
        // 设置当前main方法所在的入口类
        job.setJarByClass(FilterMore450.class);
        // TODO 2.设置自定义的Mapper和Reducer类
        job.setMapperClass(FilterMore450Mapper.class);


        // TODO 3.设置Mapper的KeyValue输出类 (最终输出)
        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
//        job.setOutputValueClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // TODO 4.设置数据的输入和输出路径

        // 本地路径
        FileSystem fileSystem = FileSystem.get(job.getConfiguration());
        Path outPath = new Path("hadoop/out/filterMore450");
        Path inpath = new Path("hadoop/out/reducejoin");
        if (!fileSystem.exists(inpath)) {
            throw new FileNotFoundException(inpath+"不存在");
        }
        TextInputFormat.addInputPath(job,inpath);

        if (fileSystem.exists(outPath)) {
            System.out.println("路径存在,开始删除");
            fileSystem.delete(outPath,true);
        }
        TextOutputFormat.setOutputPath(job,outPath);

        // TODO 5.提交任务开始执行
        job.waitForCompletion(true);
    }
}

②MapTask阶段

package com.mr.filter.more450;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/*
    TODO
        在编写代码之前需要先定义数据的处理逻辑
            MapTask阶段:
                可以直接读取ReduceJoin的结果数据
        注意:
            ① 对于MapReduce可以只有Mapper阶段
            ② 对于输出的数据,可以只有Key 对于Value的数据类型从可以使用 NullWritable

 */
public class FilterMore450Mapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    /**
     *
     */

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 1500100008	符半双,22,女,理科六班,363
        String oneLine = value.toString();
        String[] split = oneLine.split("\t");
        String id = split[0];
        Integer score = Integer.valueOf(split[1].split(",")[4]);
        if (score > 450){
            context.write(new Text(id+","+split[1]),NullWritable.get());
        }
    }
}

4.2对于各班级中的学生总分进行排序,要求取出各班级中总分前三名学生

①主入口

package com.mr.groupby;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.FileNotFoundException;
import java.io.IOException;

public class Top3 {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // TODO MapReduce程序入口中的固定写法
        // TODO 1.获取Job对象 并设置相关Job任务的名称及入口类

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Sort");
        // 设置当前main方法所在的入口类
        job.setJarByClass(Top3.class);
        // TODO 2.设置自定义的Mapper和Reducer类
        job.setMapperClass(Top3Mapper.class);
        job.setReducerClass(Top3Reducer.class);

        // TODO 3.设置Mapper的KeyValue输出类 和 Reducer的输出类 (最终输出)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // TODO 4.设置数据的输入和输出路径

        // 本地路径
        FileSystem fileSystem = FileSystem.get(job.getConfiguration());
        Path outPath = new Path("hadoop/out/top3");
        Path inpath = new Path("hadoop/out/reducejoin");
        if (!fileSystem.exists(inpath)) {
            throw new FileNotFoundException(inpath+"不存在");
        }
        TextInputFormat.addInputPath(job,inpath);

        if (fileSystem.exists(outPath)) {
            System.out.println("路径存在,开始删除");
            fileSystem.delete(outPath,true);
        }
        TextOutputFormat.setOutputPath(job,outPath);

        // TODO 5.提交任务开始执行
        job.waitForCompletion(true);
    }
}

②MapTask阶段

package com.mr.groupby;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/*
    TODO
        在编写代码之前需要先定义数据的处理逻辑
            对于各班级中的学生总分进行排序,要求取出各班级中总分前三名学生
            MapTask阶段:
                   ① 读取ReduceJoin的处理结果,并对数据进行提取
                   ② 按照学生的班级信息,对班级作为Key,整行数据作为Value写出到 ReduceTask 端
            ReduceTask阶段:
                   ① 接收到整个班级中的所有学生信息并将该数据存放在迭代器中

 */
public class Top3Mapper extends Mapper<LongWritable, Text, Text, Text> {


    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        // 1500100009	沈德昌,21,男,理科一班,251  => 表示读取到的数据
        String[] split = value.toString().split("\t");
        if (split.length == 2) {
            String otherInfo = split[1];
            String[] columns = otherInfo.split(",");
            if (columns.length == 5) {
                String clazz = columns[3];
                context.write(new Text(clazz), value);
            }
        }
    }
}

③ReduceTask阶段

package com.mr.groupby;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/*
    TODO ReduceTask阶段
        自定义类继承Reducer,该Reducer类为一个具体的类,并其中定义了一些泛型
            <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        Reduce阶段的数据是由Map阶段发送过来的,所以Map阶段输出的类型就是Reduce阶段接收的类型
        根据处理逻辑:
            KEYIN: Text
            VALUEIN: IntWritable
        根据数据最终的要求:
            KEYOUT, VALUEOUT 表示最终每个单词出现的次数
            KEYOUT : Text
            VALUEOUT: IntWritable
 */
public class Top3Reducer extends Reducer<Text, Text,Text, NullWritable> {

    /**
     *  对一个班级中所有的学生成绩进行排序 =>
     *         1.将数据存储在一个容器中
     *         2.对容器中数据进行排序操作
     *  对排序的结果进行取前三
     * @param key  表示班级信息
     * @param values 一个班级中所有的学生数据
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 1500100009	沈德昌,21,男,理科一班,251
        List<Stu> stus = new ArrayList<>();
        for (Text stu : values) {
            String[] split = stu.toString().split("\t");
            String[] columns = split[1].split(",");
            stus.add(new Stu(split[0],columns[0],Integer.valueOf(columns[1]),columns[2],columns[3],Integer.valueOf(columns[4])));
        }
        // 对List中的数据进行排序操作
        Collections.sort(
                stus,
                new Comparator<Stu>() {
                    @Override
                    public int compare(Stu o1, Stu o2) {
                        int compareScore = o1.score - o2.score;
                        return - compareScore > 0 ? 1: (compareScore == 0? o1.id.compareTo(o2.id):-1);
                    }
                }
        );

        // 对排序的结果进行遍历
        for (int i = 0; i < 3; i++) {
            context.write(new Text(stus.get(i).toString()+","+(i+1)),NullWritable.get());
        }

    }
}

④Student类

package com.mr.groupby;

public class Stu {
    String id;
    String name;
    int age;
    String gender;
    String clazz;
    int score;

    public Stu(String id, String name, int age, String gender, String clazz, int score) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.gender = gender;
        this.clazz = clazz;
        this.score = score;
    }

    @Override
    public String toString() {
        return id +
                ", " + name +
                ", " + age +
                ", " + gender +
                ", " + clazz +
                ", " + score;
    }
}

标签:框架,Text,第三天,hadoop,Hadoop,job,org,apache,import
From: https://www.cnblogs.com/shmil/p/18349022

相关文章

  • 全网最全:一文入门最热的LLM应用开发框架LangChain_langchain框架
    f####1.LangChain简介1.1.LangChain发展史LangChain的作者是HarrisonChase,最初是于2022年10月开源的一个项目,在GitHub上获得大量关注之后迅速转变为一家初创公司。2017年HarrisonChase还在哈佛上大学,如今已是硅谷的一家热门初创公司的CEO,这对他来说是......
  • 比肩DRF,轻量级、快速且强大的 API 开发:探索 DN 框架
    Django-Ninja框架,简称DN框架,是一个用于快速构建API的现代化框架。它基于Django构建,但专注于简洁性和性能,使用Pydantic进行数据验证,使得开发体验更加流畅和高效。为什么选择DN框架?DN框架结合了Django的稳定性和Pydantic的强大数据处理能力,适用于需要快速迭代和高......
  • .NET 与 LayUI 实现高效敏捷开发框架
    前言WaterCloud是一个集成了LayUI的高效敏捷开发框架,专为.NET开发者设计。它不仅支持多种.NET版本(.NET4.5、.NETCore3.1、.NET5、.NET6),还内置了丰富的功能,如权限管理、流程表单设计以及多数据库支持下的多租户架构。使用了ORM(SqlSugar和Chloe)能够轻松应对复杂......
  • Laravel --最优雅的 PHP 框架
    在PHP生态系统中,有许多框架可供开发者选择。在众多选择中,Laravel突出其优雅的设计、简洁的语法和强大的功能,迅速成为最受欢迎的PHP框架之一。本文将探讨Laravel的一些关键特性,通过数据支持、实际操作案例和代码示例来展示其优雅性和实用性。1.Laravel的优雅设计1.1......
  • 常见的框架漏洞
    框架        Web框架(Webframework)或者叫做Web应⽤框架(Webapplicationframework),是⽤于进⾏Web开发的⼀套软件架构。⼤多数的Web框架提供了⼀套开发和部署⽹站的⽅式。为Web的⾏为提供了⼀套⽀持⽀持的⽅法。使⽤Web框架,很多的业务逻辑外的功能不需要⾃⼰再去......
  • 034.CI4框架CodeIgniter,纯净windows系统,一步步安装composer和CodeIgniter 4.5.4
    安装git选择路径 一路回车安装 安装phpstudy 安装好的界面 下载php8.2.9  点一下默认配置,确定 php版本要选择php8.2.9 需要安装的php扩展如下 点开网站的管理,设置一个根目录 php,启动 在根目录创建一个index.html的文件,用浏览器打开,看看能不能访......
  • HTML5+CSS3笔记(Xmind格式):第三天
    Xmind鸟瞰图:简单文字总结:过渡transition:过渡属性过渡时间运动曲线何时开始 2D变形transform:  1.平移:translate(单位px)  2.缩放:scale(默认1,大于1放大,小于1缩小)  3.旋转:rotate(单位deg)  4.倾斜:skew(单位deg)3D变形transform:  1.rotateX......
  • Hadoop3.4.0跑wordcount程序报错:org.apache.hadoop.mapreduce.v2.app.MRAppMaster
    部署完Hadoop3.4.0HA后跑wordcount程序报错,在日志文件里 http://rsnode:8042/logs/userlogs 里看到报错日志说不能加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster网上给的办法大多都是让执行hadoopclasspath然后把那一长串配置到 mapred-site.xml。如图 ......
  • 部署伪分布式 Hadoop集群
    部署伪分布式Hadoop集群一、JDK安装配置1.1下载JDK1.2上传解压1.3java环境配置二、伪分布式Hadoop安装配置2.1Hadoop下载2.2上传解压2.3Hadoop文件目录介绍2.4Hadoop配置2.4.1修改core-site.xml配置文件2.4.2修改hdfs-site.xml配置文件2.4.3修改ha......
  • 新手小白的Hadoop分布式和集群简述
    Hadoop分布式简介:ApacheHadoop是一个开源的分布式计算框架,它允许用户在节点组成的集群中处理和分析大数据。Hadoop是“Hadoop之父”DougCutting的著作,最初是在Nutch搜索引擎项目中开发的,用于解决网页爬虫的存储和搜索问题。Hadoop的核心由以下几个部分组成:HDFS(Hadoop......