标签:实战 hadoop MapReduce id org apache import 排序 order
辅助排序和二次排序案例
1)需求
有如下订单数据
订单id | 商品id | 成交金额 |
0000001 | Pdt_01 | 222.8 |
0000001 | Pdt_06 | 25.8 |
0000002 | Pdt_03 | 522.8 |
0000002 | Pdt_04 | 122.4 |
0000002 | Pdt_05 | 722.4 |
0000003 | Pdt_01 | 222.8 |
0000003 | Pdt_02 | 33.8 |
现在需要求出每一个订单中最贵的商品。
2)输入数据
0000001 Pdt_01 222.8
0000002 Pdt_06 722.4
0000001 Pdt_05 25.8
0000003 Pdt_01 222.8
0000003 Pdt_01 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
输出数据预期:
0:3 222.8
1:2 722.4
2:1 222.8
3)分析
(1)利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce。
(2)在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值。
4)代码实现
(1)定义订单信息OrderBean
package com.atguigu.mapreduce.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean> { private int order_id; // 订单id号 private double price; // 价格 public OrderBean() { super(); } public OrderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; } @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); } @Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); price = in.readDouble(); } @Override public String toString() { return order_id + "\t" + price; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } // 二次排序 @Override public int compareTo(OrderBean o) { int result; if (order_id > o.getOrder_id()) { result = 1; } else if (order_id < o.getOrder_id()) { result = -1; } else { // 价格倒序排序 result = price > o.getPrice() ? -1 : 1; } return result; } } |
(2)编写OrderSortMapper
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 封装对象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 写出 context.write(k, NullWritable.get()); } } |
(3)编写OrderSortPartitioner
package com.atguigu.mapreduce.order; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; publicclass OrderPartitioner extends Partitioner<OrderBean, NullWritable> { @Override publicint getPartition(OrderBean key, NullWritable value, int numReduceTasks) { return (key.getOrder_id() & Integer.MAX_VALUE) % numReduceTasks; } } |
(4)编写OrderSortGroupingComparator
package com.atguigu.mapreduce.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; publicclass OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @SuppressWarnings("rawtypes") @Override publicint compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } elseif (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } } |
(5)编写OrderSortReducer
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; publicclass OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protectedvoid reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } |
(6)编写OrderSortDriver
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; publicclass OrderDriver { publicstaticvoid main(String[] args) throws Exception, IOException { // 1 获取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置jar包加载路径 job.setJarByClass(OrderDriver.class); // 3 加载map/reduce类 job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); // 4 设置map输出数据key和value类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出数据的key和value类型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 设置输入数据和输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 10 设置reduce端的分组 job.setGroupingComparatorClass(OrderGroupingComparator.class); // 7 设置分区 job.setPartitionerClass(OrderPartitioner.class); // 8 设置reduce个数 job.setNumReduceTasks(3); // 9 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } |
标签:实战,
hadoop,
MapReduce,
id,
org,
apache,
import,
排序,
order
From: https://blog.51cto.com/u_12654321/5843249