MapReduce 重要组件——Partitioner组件
(1)Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理;
(2)你可以自定义key的一个分发股则,如数据文件包含不同的省份,而输出的要求是每个省份输出一个文件;
(3)提供了一个默认的HashPartitioner
自定义Partitioner:
(1)继承抽象类Partitioner,实现自定义的getPartition()方法;
(2)通过job.setPartitionerClass()来设置自定义的Partitioner;
Partitioner应用场景:
需求:分别统计每个省份用户流量情况
1、要统计的省份用户流量信息
2、实现要求
不同区域用户生成一个文件,主要考虑如果一个文件,数据量太大。就要求分开存储。
假设: 137 开头的用户为北京,代码100
138 开头的用户为天津,代码为200
139 开头的用户为河北,代码为300
其他用户输出到一个文件
为实现不同用户生成一个文件,需要按照用户区域进行分区
3、代码实现
3.1 可以使用hadoop中的Partition功能自定义分组(默认为HashPartition)来实现,在通过指定Reduce的个(默认为1)数完成该需求
3.2 Hadoop中默认Partition的实现方法
/**
* Partitions the key space.
*
*
<p><code>
Partitioner
</code>
controls the partitioning of the keys of the
* intermediate map
-
outputs. The key (or a subset of the key) is used to derive
* the partition, typically by a hash function. The total number of partitions
* is the same as the number of reduce tasks for the job. Hence this controls
* which of the
<code>
m
</code>
reduce tasks the intermediate key (and hence the
* record) is sent for reduction.
</p>
*
* Note: If you require your Partitioner class to obtain the Job's configuration
* object, implement the
{@link Configurable}
interface.
*
*
@see
Reducer
*/
@
InterfaceAudience.
Public
@
InterfaceStability.
Stable
public
abstract
class
Partitioner<KEY, VALUE> {
/**
* Get the partition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce
-
tasks for the job.
*
*
<p>
Typically a hash function on a all or a subset of the key.
</p>
*
*
@param
key the key to be partioned.
*
@param
value the entry value.
*
@param
numPartitions the total number of partitions.
*
@return
the partition number for the
<code>
key
</code>
.
*/
public
abstract
int
getPartition (KEY
key
, VALUE
value
,
int
numPartitions
);
}
/** Partition keys by their
{@link Object#hashCode()}
. */
@
InterfaceAudience.
Public
@
InterfaceStability.
Stable
public
class
HashPartitioner<K, V>
extends
Partitioner<K, V> {
/** Use
{@link Object#hashCode()}
to partition. */
public
int
getPartition(K
key
, V
value
,
int
numReduceTasks
) {
return
(
key
.hashCode() & Integer.
MAX_VALUE
) %
numReduceTasks
;
}
}
3.3 自定义程序实现
代码结构:
具体代码:
package
partition;
import
java.util.HashMap;
import
java.util.Map;
import
org.apache.hadoop.mapreduce.Partitioner;
public
class
AreaPartitioner<KEY, VALUE>
extends
Partitioner<KEY, VALUE>{
public
static
Map<String, Integer>
areaMap
=
new
HashMap<String, Integer>();
static
{
areaMap
.put(
"139"
, 0);
areaMap
.put(
"137"
, 1);
areaMap
.put(
"159"
, 2);
}
@Override
public
int
getPartition(KEY
key
, VALUE
value
,
int
numPartitions
) {
int
area
= 3;
Integer
res
=
areaMap
.get(
key
.toString().substring(0, 3));
if
(
res
!=
null
){
area
=
res
;
}
return
area
;
}
}
package partition;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable{
private String mobileNumber;
private long upFlow;
private long downFlow;
public void set(String mobileNumber,long upFlow,long downFlow){
this.mobileNumber = mobileNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(mobileNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.mobileNumber = in.readUTF();
this.upFlow = in.readLong();
this.downFlow = in.readLong();
}
public String getMobileNumber() {
return mobileNumber;
}
public void setMobileNumber(String mobileNumber) {
this.mobileNumber = mobileNumber;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
@Override
public String toString() {
return this.mobileNumber + "\t" + this.upFlow + "\t" + this.downFlow + "\t" + (this.upFlow + this.downFlow);
}
}
package partition;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowPartitionMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
private Text k = new Text();
private FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] values = StringUtils.split(value.toString(), "\t");
String mobileNumber = values[1];
long upFlow = new Long(values[7]);
long downFlow = new Long(values[8]);
k.set(mobileNumber);
v.set(mobileNumber, upFlow, downFlow);
context.write(k,v);
}
}
package partition;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowPartitionReducer extends Reducer<Text, FlowBean, FlowBean,NullWritable> {
private FlowBean flowBean = new FlowBean();
@Override
protected void reduce(Text k, Iterable<FlowBean> values, Context context) throws IOException,
InterruptedException {
long sumUpFlow = 0;
long sumDownFlow = 0;
for(FlowBean value:values){
sumUpFlow += value.getUpFlow();
sumDownFlow += value.getDownFlow();
}
flowBean.setMobileNumber(k.toString());
flowBean.setUpFlow(sumUpFlow);
flowBean.setDownFlow(sumDownFlow);
context.write(flowBean, NullWritable.get());
}
}
package partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowPartitionRunner extends Configured implements Tool{
private static final String HDFS_PATH = "hdfs://cloud01:9000";
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapreduce.job.jar", "part.jar");
Job job = Job.getInstance(conf);
job.setJarByClass(FlowPartitionRunner.class);
job.setMapperClass(FlowPartitionMapper.class);
job.setReducerClass(FlowPartitionReducer.class);
job.setPartitionerClass(AreaPartitioner.class);
//大于等于分组个数
job.setNumReduceTasks(4);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputKeyClass(NullWritable.class);
Path inputPath = new Path(HDFS_PATH + "/flow/input");
Path outputDir = new Path(HDFS_PATH + "/flow/partition/output");
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputDir);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputDir)) {
fs.delete(outputDir, true);
}
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) {
try {
int status = ToolRunner.run(new Configuration(), new FlowPartitionRunner(), args);
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果:
[hadoop@cloud01 ~]$ hadoop jar /home/hadoop/workspace/HDFSdemo/part.jar partition.FlowPartitionRunner
[hadoop@cloud01 ~]$ hadoop fs -ls /flow/partition/output
标签:4001,hadoop,Partitioner,job,import,apache,org,public From: https://blog.51cto.com/u_14361901/6294507