首页 > 其他分享 >MapReduce自定义GroupingComparator

MapReduce自定义GroupingComparator

时间:2023-10-25 19:06:48浏览次数:38  
标签:GroupingComparator 自定义 MapReduce id 订单 key compareTo import public


需求

有如下订单明细数据

0000001	01	222.8
0000002	06	722.4
0000001	05	25.8
0000003	01	222.8
0000003	01	33.8
0000002	03	522.8
0000002	04	122.4

第一列是订单编号,第二列是商品id,第三列是商品金额,列与列之间用制表符分隔。

现在需要求出每一个订单中最贵的商品。

思路

将订单id和商品金额封装成一个对象作为 map 端的 key 输出,value 置为空。利用 map 端在输出数据时会默认按照 key 的 compareTo 方法进行排序这一特点,在 compareTo 方法中制定排序规则:先按照订单id 升序排序,订单id相同的再按照商品金额倒序排序。数据发送到 reduce 端后,再按照订单id进行分组,一组订单中的第一个数据就是该订单中最贵的商品。

代码实现

  1. 自定义订单信息OrderBean
package top8_order;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author 曲健磊
 * @date 2019-09-16 21:25:50
 */
public class OrderBean implements WritableComparable<OrderBean> {

    /**
     * 订单编号
     */
    private Integer orderId;

    /**
     * 商品id
     */
    private Integer shopId;

    /**
     * 商品价格
     */
    private Double price;

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public Integer getShopId() {
        return shopId;
    }

    public void setShopId(Integer shopId) {
        this.shopId = shopId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.orderId);
        out.writeInt(this.shopId);
        out.writeDouble(this.price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readInt();
        this.shopId = in.readInt();
        this.price = in.readDouble();
    }

    /**
     * 这个compareTo方法很关键:
     * map端在输出的时候默认会按照这个 key 的 compareTo 方法进行排序
     * reduce端在聚合的时候默认会按照这个 key 的 compareTo 方法进行分组(返回值为0的分为一组)
     *
     * @return
     */
    @Override
    public int compareTo(OrderBean o) {
        // 该需求要按照订单id进行升序排序,如果订单id相同,按照价格倒序排序(先不考虑分组)

        // 订单id相同的情况下(compareTo返回0),按照价格降序排序
        if (this.orderId.compareTo(o.getOrderId()) == 0) {
            return o.getPrice().compareTo(this.getPrice());
        }

        // compareTo不为0,则按订单id升序
        return this.orderId.compareTo(o.getOrderId());
    }

    @Override
    public String toString() {
        return "orderId:" + this.orderId + "\t shopId:" + this.shopId + "\t price:" + this.price;
    }
}
  1. Mapper
package top8_order;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author 曲健磊
 * @date 2019-09-17 10:32:44
 */
public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

    OrderBean k = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.读取输入文本中的一行数据
        String line = value.toString();

        // 2.按照制表符拆分字段
        String[] fields = line.split("\t");

        // 3.存入对象中
        k.setOrderId(Integer.parseInt(fields[0]));
        k.setShopId(Integer.parseInt(fields[1]));
        k.setPrice(Double.parseDouble(fields[2]));

        // 4.将数据从map端写出
        // 这一步操作就会调用 k 的 compareTo 方法进行排序
        context.write(k, NullWritable.get());
    }

}
  1. Reducer
package top8_order;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author 曲健磊
 * @date 2019-09-17 10:43:27
 */
public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

map端输出的数据在送到 reduce 方法之前会默认调用 key 的 compareTo方法进行分组,compareTo 方法返回 0 的分为一组。

注意:该需求的目标是把每笔订单中价格最高的商品输出, 在map输出时已经按照订单id排序,订单id相同的数据按照价格降序排序。

reduce方法特点:它只会把分到同一组的第一条记录的 key 作为 reduce 方法的key传递进来。

在本例中我们希望送到 reduce 方法的数据仅按照订单id分组,这样传递到 reduce 方法的每一个 key 都是一笔订单中价格最高的那个商品(OrderBean),我们在 reduce 方法中只需要把这个 key 输出出去就可以实现本例的需求。

但是,到目前为止,reduce 端的分组策略并不仅仅只是按照订单id 分组,而是整合了订单id和商品价格两个因素,因为分组默认调用的 key 的 compareTo 方法进行比较。所以,目前为止,没有办法实现该需求。

解决:所以我们不能让 reduce 使用默认的 key 的 compareTo 方法进行排序,需要自定义一个分组比较器来制定只按照订单id进行分组的规则。

  1. 自定义GroupingComparator
package top8_order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * @author 曲健磊
 * @date 2019-09-17 11:08:50
 * @description 制定reduce端的分组规则
 */
public class OrderSortGroupingComparator extends WritableComparator {

    /**
     * 重写空参构造方法(不写会在程序运行的时候报空指针异常)
     * 在实例化的时候实例一个指定类型的比较器对象
     */
    public OrderSortGroupingComparator() {
        // 第一个参数指定key的类型,第二个参数为true表示创建该比较器实例
        super(OrderBean.class, true);
    }

    /**
     * 重写 compare 方法
     *
     * @param a WritableComparable类型的对象
     * @param b WritableComparable类型的对象
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // OrderBean 实现了 WritableComparable 接口,属于 WritableComparable 类型的对象
        OrderBean aBean = (OrderBean) a;
        OrderBean bBean = (OrderBean) b;

        // 订单id相同的分为一组
        return aBean.getOrderId().compareTo(bBean.getOrderId());
    }
}
  1. Driver驱动
package top8_order;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author 曲健磊
 * @date 2019-09-17 11:20:45
 */
public class OrderSortDriver {

    public static void main(String[] args) throws Exception {

        // 手动指定输入输出路径
        args = new String[]{"d:/GroupingComparator.txt", "d:/output"};

        // 任务的相关参数配置
        // 有时候可能会对mapreduce程序进行一些参数调优,参数从这个Configuration中传入
        Configuration conf = new Configuration();
        // conf.set("key", "value");
        Job job = Job.getInstance(conf);

        job.setJarByClass(OrderSortDriver.class);

        job.setMapperClass(OrderSortMapper.class);
        job.setReducerClass(OrderSortReducer.class);

        // 指定map输出的key和value的类型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 指定reduce输出的key和value的类型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 指定reduce端的分组所使用的比较器
        job.setGroupingComparatorClass(OrderSortGroupingComparator.class);

        // 设置输入数据和输出数据的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 执行任务
        job.waitForCompletion(true);

    }
}

程序运行结果:

MapReduce自定义GroupingComparator_mapreduce

原始的订单数据:

MapReduce自定义GroupingComparator_apache_02

可以发现最终的结果输出了每一笔订单中商品金额最大的那一条记录。


标签:GroupingComparator,自定义,MapReduce,id,订单,key,compareTo,import,public
From: https://blog.51cto.com/u_14655640/8023719

相关文章

  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程的集成方法与步骤(二
    更多ruoyi-nbcio功能请看演示系统gitee源代码地址演示地址:RuoYi-Nbcio后台管理系统前面讲了集成的后端部分内容,下面简单介绍一下前端的内容 1、前端生成的页面需要进行修改,增加流程状态启动等相关信息,如demo的index修改如下<template><divclass="app-container"><el-form......
  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程的集成方法与步骤(一
    更多ruoyi-nbcio功能请看演示系统gitee源代码地址演示地址:RuoYi-Nbcio后台管理系统由于大家最自定义业务表单的整个集成方法还不熟悉,下面大概介绍一下这个流程与方法。1、首先需要建立数据库表,根据自己业务进行数据表的建立,目前系统需要在另外sql进行数据库表的建立,以后可以考虑系......
  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程(五)
    更多ruoyi-nbcio功能请看演示系统gitee源代码地址演示地址:RuoYi-Nbcio后台管理系统今天讲一下wf_demo表单的一些修改1、demo的实现类修改如下:主要是增加一个服务名称,后面要用到,同时继承于WfCallBackServiceI,以便进行调用。@Service("wfDemoService")publicclassWfDemoServiceImp......
  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程(四)
    更多ruoyi-nbcio功能请看演示系统gitee源代码地址演示地址:RuoYi-Nbcio后台管理系统自定义业务表单里的流程历史需要单独设计,所以下面就这部分进行介绍。1、后端部分,这部分增加单独的接口,只需要单独的dataID就可以了,如下:/***流程详情信息**@paramdataId业务数......
  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程(三)
    更多ruoyi-nbcio功能请看演示系统gitee源代码地址演示地址:RuoYi-Nbcio后台管理系统相应的后端也要做一些调整1、启动流程修改如下:/***启动流程实例*/privateRstartProcess(ProcessDefinitionprocDef,Map<String,Object>variables){if(ObjectUti......
  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程(二)
    更多ruoyi-nbcio功能请看演示系统gitee源代码地址演示地址:RuoYi-Nbcio后台管理系统   之前讲到了流程保存的时候还要看是否是自定义业务流程应用类型,若是保存的时候不再检查是否有关联表单。    那接下来就需要一个自己进行自定义表的流程关联工作了。1、见下图,在流程管......
  • 新手教程系类:群晖NAS如何自定义域名?保姆级教程包教包会
    感谢各位亲的大力支持,本店推出一些列新手教程希望能帮到你。对于个性化或者访问速度有着更高要求的用户,往往最后都会想给自己整个自定义域名,毕竟能够拥有一个专属的域名来访问自己的NAS,还是很方便的,今天就来更新一下DSM7版本的保姆级教程01申请公网IP公网IP是一定要有的,......
  • SpringBoot内容协商(Content Negotiation)二 —— 自定义消息转换器(MessageConverter)
    SpringBoot内置的消息转换器SpringBoot没有处理返回yaml格式的数据,这里需要手动添加处理这种返回格式的支持。导入依赖<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId></dependency>添加配......
  • [linux] 自定义触摸板功能
    现在ubuntu最新版本使用wayland管理输入。而不是x11了,网上有很多教程建议使用的软件都不能用,搞不好还就把原来系统内置的一些东西搞坏了。在x11(xorg)下可以使用touchegg搭配touche使用,但是在wayland下不行。这里发一个目前实测可以用的自定义触摸板功能的软件叫fusumahttps://githu......
  • 如何在iEDA中添加自定义Tcl命令
    注:ScriptEngine和UserShell头文件和实现在iEDA/src/utility/tcl/ScriptEngine.hh路径下1使用ScriptEngine自定义Tcl命令ScriptEngine是Tcl命令解析器,包含命令、命令选项、解析器等一系列工具。用户可以使用ScriptEngine中的接口轻松实现自定义Tcl命令文件结......