首页 > 编程语言 >【博学谷学习记录】超强总结,用心分享 | MapReduec编程

【博学谷学习记录】超强总结,用心分享 | MapReduec编程

时间:2023-06-01 10:00:57浏览次数:48  
标签:String 编程 MapReduec class k2 job 超强 new public

【博学谷IT技术支持】

一、介绍

MapReduce是将一个大的计算任务拆分成一个个小任务,让小任务在不同的计算机中进行处理,最后将任务的结果进行汇总的过程。
MR的工作流程可以分为三个阶段,分别是map、shuffle、reduce

二、编程

Mapper阶段

自定义一个类来集成Mapper类,重写map方法,将方法中的k1、v1转化为k2、v2。同时输入输出的数据类型以键值对的形式,如<key, value>

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    /**
     *  Context context 整个MR的上下文对象
     * */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 根据字符串的不同来切开字符串,比如“1|2|3|4” 就切割|,如果是,就切割
        // 例如数组是hello|age|world|age|age  
        // k1代表各行文本相较于文本开头的偏移量,v1代表这行数据
        String[] wordArray = value.toString().split("|");
        // 根据得到的数组进行处理,根据需求设置k2,v2,同时这里String的类型是Text,Int的类型是LongWritable
        for (String k2 : wordArray) {
            // 这里写入的就是k2,v2
            context.write(new Text(k2), new LongWritable(1));
        }
    }
}

注意: 这里只有两个主要的位置,第一个是如何对进来的v1进去获取,第二点是设置k2,v2并使用context.write写入

Shuffle阶段

map阶段要对数据进行了切片,为每个切片分配一个MapTask任务,在通过处理后得到键值对。此后进入shuffle阶段。
shuffle阶段又分四个阶段分别是分区排序规约分组

分区

将不同键值对的数据,输入到不同的文件中,对数据进行拆分。自定义类,继承Partitioner类,将该类中重写getPartition方法,定义分区规则。getPartition有三个参数,分别是k,v以及设置reduce任务的数量,默认是1

public class MyPartitioner extends Partitioner<Text, Text> {
   
    @Override
    public int getPartition(Text text, Text text2, int i) {
        // 这里定义分区的规则
        //比如说这里给不同的文件打标记
        String newK2 = text.toString() + " " + text.toString().length();
        return (newK2.hashCode() & 2147483647) % i;//这里用来返回分区的编号
    }
}

排序

MR的排序只能根据K2进行排序,因此如果要排序,k2中应该包含关键字。

定义javabean类,同时需要实现WritableComparable接口。该类必须要满足能够序列化与反序列化,序列化与反序列化的过程字段的读写顺序要一致。这里的排序在compareTo方法中实现。

public class Covidbean implements WritableComparable<Covidbean> {
    private Integer cases;
    private Integer deaths;

    public Integer getCases() {
        return cases;
    }

    public void setCases(Integer cases) {
        this.cases = cases;
    }

    public Integer getDeaths() {
        return deaths;
    }

    public void setDeaths(Integer deaths) {
        this.deaths = deaths;
    }

    @Override
    public String toString() {
        return  cases + '\t' +  deaths;
    }

    @Override
    public int compareTo(Covidbean o) {
        int result = this.cases - o.cases;
        if(result == 0) {
            return this.deaths - o.deaths;
        } else {
            return result;
        }
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(state);
        dataOutput.writeUTF(country);
        dataOutput.writeInt(cases);
        dataOutput.writeInt(deaths);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.state = dataInput.readUTF();
        this.country = dataInput.readUTF();
        this.cases = dataInput.readInt();
        this.deaths = dataInput.readInt();
    }
}

规约

Combiner是MR的优化手段,将Map的数据进行提前聚合,减少Map端和Reduce端网络传输的数据量。combiner没有默认的实现,需要显式的设置在conf中才有作用。

public class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        // 得到k2,v2,通过不同的逻辑处理,得到k3,v3
        long count = 0;
        for (LongWritable value : values) {
            count += value.get();
        }
        context.write(key, new LongWritable(count));
    }
}

分组

分组的作用就是根据k2进行去重,将相同的k2分入同一组,相当于group by k2的作用。如果没有指定分组规则,系统会默人调用k2类中的compareTo方法

  • 自定义分组类需要继承 WritableComparator 父类并重写 compare() 方法
public class GroupingComparator extends WritableComparator {
    public GroupingComparator() {
        super(CovidBean.class,true); //将k2类传给父类,并允许父类能够通过反射创建CovidBean对象
    }

    //3:重写compare方法,在该方法中指定分组的规则
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //分组规则:同一个州的数据分到一个组
        // 根据不同的需求设置分组规则
        CovidBean a1 = (CovidBean) a;
        CovidBean b1 = (CovidBean) b;

        //如果return返回的0,则就会将a1 和 b2分到同一组
        return a1.getState().compareTo(b1.getState());
    }
}

ReducerJoin

map端只完成文件合并 利用相同的关联条件id作为key输出到reducer端,reduce端完成join操作。一般是大表join大表

public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1:确定读取的是哪个源数据文件
        FileSplit fileSplit = (FileSplit) context.getInputSplit(); //获取文件切片
        String fileName = fileSplit.getPath().getName();           //获取源文件的名字

        String[] array = value.toString().split("\|");

        //2:处理订单文件
        if ("itheima_order_goods.txt".equals(fileName)) { //订单文件
            // 1|107860|7191
            //2.1:获取K2
            String k2 = array[1];
            //2.2:获取v2
            String v2 = "o_"+array[0] + "\t" + array[2];
            //2.3:将k2和v2写入上下文中
            context.write(new Text(k2), new Text(v2));
        }
        //3:处理商品文件
        if ("itheima_goods.txt".equals(fileName)) {        //商品文件
            // 107860|3786028|黑色硅胶腕带
            //3.1 获取K2
            String k2 = array[0];
            String v2 = "g_"+array[0] + "\t" + array[2];

            //3.2:将k2和v2写入上下文中
            context.write(new Text(k2), new Text(v2));
        }
    }
}

MapJoin

在map端实现文件合并 重写 setup和map方法 没有reduce。
一般用于小表join大表,小表全部加载到内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map端是进行了join操作,省去了reduce 运行的时间

setup方法会在map方法执行之前先执行,而且只会执行一次,主要用来做初始化工作

public class MapJoin1Mapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    HashMap<String, String> goodsMap = new HashMap<>();
    //将小表从分布式缓存中读取,存入Map集合
    @Override
    protected void setup(Context context) throws IOException {
        //缓冲流            字符流              字节流
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt")));
        String line = null;
        while ((line = bufferedReader.readLine()) != null) {
            String[] array = line.split("\|");
            goodsMap.put(array[0], array[2]);
        }
      
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        String[] array = value.toString().split("\|");
        String k2 = array[1];
        String v2 = array[0] + "\t" + array[2];
        String mapValue = goodsMap.get(k2);
        context.write(new Text(v2 + "\t"+ mapValue), NullWritable.get());
    }
}

Reducer阶段

自定义类继承Reducer类,重写reduce方法,在该方法中将k2,v2转化为k3和v3

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        for (LongWritable value : values) {
            count += value.get();
        }
        context.write(key, new LongWritable(count));
    }
}

主方法的调用

public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
        .......
    }
}

封装job任务

创建一个job对象,设置主类的名称

Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "WordCountBase");
job.setJarByClass(WordCountDriver.class);

设置文件的读取写入路径

FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);

关联Mapper

设置三个参数,自定义类,k2,v2

job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

关联Reducer

设置三个参数,自定义类,k3,v3

job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

设置shuffle分区类以及分区个数

job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(50);

设置Shuffle的Combiner

job.setCombinerClass(MyCombiner.class);

设置Shuffle的分组类

job.setGroupingComparatorClass(GroupingComparator.class);

文件多次执行的处理

FileSystem fileSystem = FileSystem.get(new URI("file:///"), new Configuration());
boolean is_exists = fileSystem.exists(outputPath);
if(is_exists == true) {
    //如果目标文件存在,则删除
    fileSystem.delete(outputPath, true);

yran执行

将job提交给yarn执行
```js
boolean bl = job.waitForCompletion(true);
System.exit(bl ? 0 : 1);

JobControl控制作业流顺序

ControlledJob cj1 = new ControlledJob(configuration);
cj1.setJob(job1);
ControlledJob cj2 = new ControlledJob(configuration);
cj2.setJob(job2);

//设置作业之间的依赖关系
cj2.addDependingJob(cj1);

// 创建主控制器
JobControl jc = new JobControl("myCtrl");
jc.addJob(cj1);
jc.addJob(cj2);

// 使用线程启动JobControl
Thread t = new Thread(jc);
t.start();

while (true) {
    if (jc.allFinished()) {
        jc.stop();
        break;
    }
}

标签:String,编程,MapReduec,class,k2,job,超强,new,public
From: https://www.cnblogs.com/neilniu/p/17448108.html

相关文章

  • 【博学谷学习记录】超强总结,用心分享 | java基础
    【博学谷IT技术支持】前言java是一门非常好的语言,比较有必要学习一下:随着科技发展,只会前端是不行的,学习一门后端序言非常有必要,这里记录下自己学习的过程。一、数据类型java是一种强类型语言,数据必须明确类型。基础数据类型有8种,分别是整数类型:byte,short,int,long,......
  • 博学谷学习记录】超强总结,用心分享 | 常用api
    【博学谷IT技术支持】常用APIMath类的常用方法方法名说明publicstaticintabs(inta)返回参数的绝对值publicstaticdoubleceil(doublea)向上取整publicstaticdoublefloor(doublea)向下取整publicstaticintround(floata)四舍五入publicstaticintmax(......
  • 【博学谷学习记录】超强总结,用心分享 | 集合
    【博学谷IT技术支持】集合集合根据存储分为单列集合java.util.Collection和双列结合java.util.Map,集合的长度是可变的,集合只能存引用数据类型,如果要存基本数据类型,需要存对应的包装类。数组可以存基本数据类型和引用数据类型Collection单列集合的跟接口,用于存储一系列......
  • 【博学谷学习记录】超强总结,用心分享 | python基础学习(数据类型,运算符)
    【博学谷IT技术支持】基础数据类型Python中的变量不需要声明。每个变量在使用前都必须赋值,变量赋值以后该变量才会被创建赋值方式直接赋值a=1#整型变量b=1.0#浮点型变量c='abc'#字符串多个赋值a=b=c=1a,b,c=1,2,3标准数据类型标准数据类型......
  • 博学谷学习记录】超强总结,用心分享 | mongodb基础用法
    【博学谷IT技术支持】数据库连接后端数据库连接语法:mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]mongodb://是固定搭配,后边是可选参数用户名加密码,host是要连接服务器的地址,portx是指定的端口,默认27017da......
  • 走进Linux编程的大门
    随着Linux的不断普及,使用Linux的人也越来越多了。然而在Linux中如何进行程序设计,用什么样的开发工具好呢?本文就以我初学Linux编程的一点心得体会,和大家共同探讨。在Linux中进行程序设计,可以使用各种编程语言和开发工具,以下是一些常用的方法:1、C/C++编程C/C++是Linux系统中......
  • Linux系统下C语言的编程技巧
    Linux系统能够为人们提供更加安全实用的效果,保证计算机系统能够稳定的运行。利用Linux系统下首先要进行C语言的编程,掌握编程的技巧能够更好的发挥计算机的作用。如何掌握Linux系统下计算机C语言的编程技巧是计算机发展的关键要素。本文对Linux系统下计算机C语言的编程技巧进行相......
  • 掌握嵌入式Linux编程0简介
    简介多年来,Linux一直是嵌入式计算的主流。然而,涵盖这一主题的书籍却少之又少:本书旨在填补这一空白。术语"嵌入式Linux"没有很好的定义,可以应用于从恒温器到Wi-Fi路由器到工业控制单元等各种设备内部的操作系统。然而,它们都是建立在相同的基本开源软件上。这些就是我在本书中描......
  • 【博学谷学习记录】超强总结,用心分享 | spark知识点总结2
    【博学谷IT技术支持】Action动作算子reduce:通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的collect:在驱动程序中,以数组的形式返回数据集的所有元素count:返回RDD的元素个数first:返回RDD的第一个元素(类似于take(1))take:返回一个由数据集的前n个元......
  • 函数式编程和java
    函数式编程和java在计算机科学中,函数式编程是一种编程范式,通过应用和组合函数来构建程序。它是一种声明式编程范式(对应命令式编程),其中函数定义是将数值映射到其他数值的表达式树,而不是更新程序运行状态的命令式语句序列。函数的定义数学上的函数是自变量到因变量的映射关系,......