import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
class WithCityMapper extends Mapper<LongWritable, Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();
if (name.startsWith("part")){
String line = value.toString();
String[] info = line.split("\t");
String time = info[1];
String[] split= info[0].split("-");
String phoneNum = split[0];
String city_id = split[1];
context.write(new Text(city_id),new Text("#"+phoneNum+"-"+time));
}
if (name.startsWith("city")){
String line = value.toString();
String[] info = line.split(",");
String city_id = info[0];
String city_name = info[1];
context.write(new Text(city_id),new Text("$"+city_name));
}
}
}
class WithCityReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String phoneNumAndTime =null;
String city_name =null;
String city_id = key.toString();
for (Text value : values) {
String line = value.toString();
if (line.startsWith("#")){
phoneNumAndTime=line.substring(1);
}
if (line.startsWith("$")){
city_name=line.substring(1);
}
}
if (phoneNumAndTime!=null){
String[] info = phoneNumAndTime.split("-");
String phoneNum = info[0];
String time = info[1];
if (city_name!=null){
context.write(new Text("城市id:"+city_id),new Text(" 城市:"+city_name+" 手机号:"+phoneNum+" 平均停留时间:"+time));
}else {
context.write(new Text("城市id:"+city_id),new Text("城市:"+"未知"+" 手机号:"+phoneNum+" 平均停留时间:"+time));
}
}
}
}
public class YHWithCity {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(YHWithCity.class);
job.setJobName("用户平均停留时长数据关联城市名统计案例");
job.setMapperClass(WithCityMapper.class);
job.setReducerClass(WithCityReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean b = job.waitForCompletion(true);
if (b){
System.out.println("用户平均停留时长数据关联城市名案例mapreduce实现执行成功!>_-");
}else {
System.out.println("用户平均停留时长数据关联城市名案例mapreduce实现执行失败T_T");
}
}
}
结果