需求:
有如下订单明细数据
0000001 01 222.8
0000002 06 722.4
0000001 05 25.8
0000003 01 222.8
0000003 01 33.8
0000002 03 522.8
0000002 04 122.4
第一列是订单编号,第二列是商品id,第三列是商品金额,列与列之间用制表符分隔。
现在需要求出每一个订单中最贵的商品。
思路:
将订单id和商品金额封装成一个对象作为 map 端的 key 输出,value 置为空。利用 map 端在输出数据时会默认按照 key 的 compareTo 方法进行排序这一特点,在 compareTo 方法中制定排序规则:先按照订单id 升序排序,订单id相同的再按照商品金额倒序排序。数据发送到 reduce 端后,再按照订单id进行分组,一组订单中的第一个数据就是该订单中最贵的商品。
代码实现:
- 自定义订单信息OrderBean
package top8_order;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author 曲健磊
* @date 2019-09-16 21:25:50
*/
public class OrderBean implements WritableComparable<OrderBean> {
/**
* 订单编号
*/
private Integer orderId;
/**
* 商品id
*/
private Integer shopId;
/**
* 商品价格
*/
private Double price;
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public Integer getShopId() {
return shopId;
}
public void setShopId(Integer shopId) {
this.shopId = shopId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.orderId);
out.writeInt(this.shopId);
out.writeDouble(this.price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readInt();
this.shopId = in.readInt();
this.price = in.readDouble();
}
/**
* 这个compareTo方法很关键:
* map端在输出的时候默认会按照这个 key 的 compareTo 方法进行排序
* reduce端在聚合的时候默认会按照这个 key 的 compareTo 方法进行分组(返回值为0的分为一组)
*
* @return
*/
@Override
public int compareTo(OrderBean o) {
// 该需求要按照订单id进行升序排序,如果订单id相同,按照价格倒序排序(先不考虑分组)
// 订单id相同的情况下(compareTo返回0),按照价格降序排序
if (this.orderId.compareTo(o.getOrderId()) == 0) {
return o.getPrice().compareTo(this.getPrice());
}
// compareTo不为0,则按订单id升序
return this.orderId.compareTo(o.getOrderId());
}
@Override
public String toString() {
return "orderId:" + this.orderId + "\t shopId:" + this.shopId + "\t price:" + this.price;
}
}
- Mapper
package top8_order;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author 曲健磊
* @date 2019-09-17 10:32:44
*/
public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean k = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.读取输入文本中的一行数据
String line = value.toString();
// 2.按照制表符拆分字段
String[] fields = line.split("\t");
// 3.存入对象中
k.setOrderId(Integer.parseInt(fields[0]));
k.setShopId(Integer.parseInt(fields[1]));
k.setPrice(Double.parseDouble(fields[2]));
// 4.将数据从map端写出
// 这一步操作就会调用 k 的 compareTo 方法进行排序
context.write(k, NullWritable.get());
}
}
- Reducer
package top8_order;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author 曲健磊
* @date 2019-09-17 10:43:27
*/
public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
map端输出的数据在送到 reduce 方法之前会默认调用 key 的 compareTo方法进行分组,compareTo 方法返回 0 的分为一组。
注意:该需求的目标是把每笔订单中价格最高的商品输出, 在map输出时已经按照订单id排序,订单id相同的数据按照价格降序排序。
reduce方法特点:它只会把分到同一组的第一条记录的 key 作为 reduce 方法的key传递进来。
在本例中我们希望送到 reduce 方法的数据仅按照订单id分组,这样传递到 reduce 方法的每一个 key 都是一笔订单中价格最高的那个商品(OrderBean),我们在 reduce 方法中只需要把这个 key 输出出去就可以实现本例的需求。
但是,到目前为止,reduce 端的分组策略并不仅仅只是按照订单id 分组,而是整合了订单id和商品价格两个因素,因为分组默认调用的 key 的 compareTo 方法进行比较。所以,目前为止,没有办法实现该需求。
解决:所以我们不能让 reduce 使用默认的 key 的 compareTo 方法进行排序,需要自定义一个分组比较器来制定只按照订单id进行分组的规则。
- 自定义GroupingComparator
package top8_order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* @author 曲健磊
* @date 2019-09-17 11:08:50
* @description 制定reduce端的分组规则
*/
public class OrderSortGroupingComparator extends WritableComparator {
/**
* 重写空参构造方法(不写会在程序运行的时候报空指针异常)
* 在实例化的时候实例一个指定类型的比较器对象
*/
public OrderSortGroupingComparator() {
// 第一个参数指定key的类型,第二个参数为true表示创建该比较器实例
super(OrderBean.class, true);
}
/**
* 重写 compare 方法
*
* @param a WritableComparable类型的对象
* @param b WritableComparable类型的对象
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
// OrderBean 实现了 WritableComparable 接口,属于 WritableComparable 类型的对象
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
// 订单id相同的分为一组
return aBean.getOrderId().compareTo(bBean.getOrderId());
}
}
- Driver驱动
package top8_order;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author 曲健磊
* @date 2019-09-17 11:20:45
*/
public class OrderSortDriver {
public static void main(String[] args) throws Exception {
// 手动指定输入输出路径
args = new String[]{"d:/GroupingComparator.txt", "d:/output"};
// 任务的相关参数配置
// 有时候可能会对mapreduce程序进行一些参数调优,参数从这个Configuration中传入
Configuration conf = new Configuration();
// conf.set("key", "value");
Job job = Job.getInstance(conf);
job.setJarByClass(OrderSortDriver.class);
job.setMapperClass(OrderSortMapper.class);
job.setReducerClass(OrderSortReducer.class);
// 指定map输出的key和value的类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 指定reduce输出的key和value的类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 指定reduce端的分组所使用的比较器
job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
// 设置输入数据和输出数据的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 执行任务
job.waitForCompletion(true);
}
}
程序运行结果:
原始的订单数据:
可以发现最终的结果输出了每一笔订单中商品金额最大的那一条记录。