一、hadoop序列化操作
Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象
MR的任意Key和Value必须实现Writable接口 .
MR的任意key必须实现WritableComparable接口
二、自定义Writable,实现MapReduce程序
1、需求内容
日期 手机号 流量1 流量2
20150403 1380013800 201 51
20150403 1390013800 202 52
20150403 1370013800 203 54
20150403 1360013800 204 55
统计 每个用户、上行流量、下行流量、总流量
2、程序实现代码
package serializable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable{
private String mobileNumber;
private long upFlow;
private long downFlow;
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow,String mobileNumber) {
this.mobileNumber = mobileNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(mobileNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.mobileNumber = in.readUTF();
this.upFlow = in.readLong();
this.downFlow = in.readLong();
}
public String getMobileNumber() {
return mobileNumber;
}
public void setMobileNumber(String mobileNumber) {
this.mobileNumber = mobileNumber;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
@Override
public String toString() {
return this.upFlow + "\t" + this.downFlow + "\t" + (this.upFlow + this.downFlow);
}
}
package
serializable;
import
java.io.IOException;
import
java.net.URI;
import
org.apache.commons.lang.StringUtils;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public
class
FlowSumRunner {
private
static
final
String
HDFS_PATH
=
"hdfs://cloud01:9000"
;
public
static
class
FlowSumMapper
extends
Mapper<LongWritable, Text, Text, FlowBean> {
FlowBean
flowBean
=
null
;
@Override
protected
void
map(LongWritable
key
, Text
value
, Context
context
)
throws
IOException, InterruptedException {
String[]
values
= StringUtils.split(
value
.toString(),
"\t"
);
//根据实际位置调整
String
mobileNumber
=
values
[1];
long
upFlow
=
new
Long(
values
[7]);
long
downFlow
=
new
Long(
values
[8]);
flowBean
=
new
FlowBean(
upFlow
,
downFlow
,
mobileNumber
);
context
.write(
new
Text(
mobileNumber
),
flowBean
);
}
}
public
static
class
FlowSumReducer
extends
Reducer<Text, FlowBean,Text, FlowBean>{
FlowBean
v
=
new
FlowBean();
@Override
protected
void
reduce(Text
k
, Iterable<FlowBean>
values
,Context
context
)
throws
IOException, InterruptedException {
long
sumUpFlow
= 0;
long
sumDownFlow
= 0;
for
(FlowBean
value
:
values
){
sumUpFlow
+=
value
.getUpFlow();
sumDownFlow
+=
value
.getDownFlow();
}
v
.setUpFlow(
sumUpFlow
);
v
.setDownFlow(
sumDownFlow
);
context
.write(
k
,
v
);
}
}
public
static
void
main(String[]
args
) {
Configuration
conf
=
new
Configuration();
try
{
Job
job
= Job.getInstance(
conf
);
job
.setJarByClass(FlowSumRunner.
class
);
job
.setJar(
"flowSumJob.jar"
);
job
.setMapperClass(FlowSumMapper.
class
);
job
.setReducerClass(FlowSumReducer.
class
);
job
.setMapOutputKeyClass(Text.
class
);
job
.setMapOutputValueClass(FlowBean.
class
);
job
.setOutputKeyClass(Text.
class
);
job
.setOutputKeyClass(FlowBean.
class
);
Path
inputPath
=
new
Path(
HDFS_PATH
+
"/flow/input"
);
Path
outputDir
=
new
Path(
HDFS_PATH
+
"/flow/output"
);
FileInputFormat. setInputPaths(
job
,
inputPath
);
FileOutputFormat. setOutputPath(
job
,
outputDir
);
FileSystem
fs
= FileSystem.get(
new
URI(
HDFS_PATH
),
conf
);
if
(
fs
.exists(
outputDir
)) {
fs
.delete(
outputDir
,
true
);
}
System. exit(
job
.waitForCompletion(
true
) ? 0 : 1);
}
catch
(Exception
e
) {
e
.printStackTrace();
}
}
}
3、查看统计结果
[hadoop@cloud01 ~]$ hadoop fs -cat /flow/output/part-r-00000
具体执行结果略