代码参考:
MapReduce实验 - CodeDancing - 博客园 (cnblogs.com)
编程实现总代码:
编译工具:IDEA
说明:
1.完成不同的任务的时候,需要修改cmd的值
2.conf.set("fs.default.name","hdfs://node1:8020");换上自己的连接路径
3.System.setProperty("HADOOP_USER_NAME","root");
不加上这个会出现权限不足,报错信息:Permission denied: user=1234, access=WRITE, inode=“/“:root:supergroup:drwxr-xr-x
发现idea中连接过去是以本机window用户,比如说我用户是1234,连接过去user就是1234,但是hdfs用户里面没有这个用户。
我的解决方法:System.setProperty("HADOOP_USER_NAME","root");说明用户名是root,同时,hdfs中root有修改添加等权限。
import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Merge { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://node1:8020"); System.setProperty("HADOOP_USER_NAME","root"); Job job = Job.getInstance(conf); job.setJarByClass(Merge.class); int cmd = 3; // String[] file = {"input" + cmd, "output" + cmd}; String[] file = {"/user/root/input", "/user/root/output"}; if (cmd == 1) { job.setMapperClass(Merge.Mapper1.class); job.setCombinerClass(Merge.Reducer1.class); job.setReducerClass(Merge.Reducer1.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(file[0])); FileOutputFormat.setOutputPath(job, new Path(file[1])); } else if (cmd == 2) { job.setMapperClass(Merge.Mapper2.class); job.setReducerClass(Merge.Reducer2.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(file[0])); FileOutputFormat.setOutputPath(job, new Path(file[1])); } else { job.setMapperClass(Merge.Mapper3.class); job.setReducerClass(Merge.Reducer3.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(file[0])); FileOutputFormat.setOutputPath(job, new Path(file[1])); } System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class Mapper1 extends Mapper<Object,Text,Text,Text> { public void map(Object key,Text value,Mapper<Object,Text,Text,Text>.Context context) throws IOException, InterruptedException { context.write(value, new Text("")); } } public static class Reducer1 extends Reducer<Text,Text,Text,Text> { public void reduce(Text key,Iterable<Text> values,Reducer<Text,Text,Text,Text>.Context context) throws IOException,InterruptedException { context.write(key, new Text("")); } } public static class Mapper2 extends Mapper<Object,Text,IntWritable,IntWritable> { public void map(Object key,Text value,Context context) throws IOException, InterruptedException { IntWritable data = new IntWritable(); data.set(Integer.parseInt(value.toString())); context.write(data, new IntWritable(1)); } } public static class Reducer2 extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { private static int sum = 1; public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { for(IntWritable num:values) { context.write(new IntWritable(sum),key); sum++;//可能存在重复的数字 } } } public static class Mapper3 extends Mapper<Object,Text,Text,Text> { public void map(Object key,Text value,Context context) throws IOException, InterruptedException { String[] splStr = value.toString().split(" "); String child = splStr[0]; String parent = splStr[1]; if(child.equals("child")&&parent.equals("parent")) return; context.write(new Text(child), new Text("old#"+parent)); context.write(new Text(parent), new Text("young#"+child)); } } public static class Reducer3 extends Reducer<Text,Text,Text,Text> { private static boolean head = true ; public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException { if(head) { context.write(new Text("grandchild"), new Text("grandparent")); head = false; } ArrayList<String> grandchild = new ArrayList<String>(); ArrayList<String> grandparent = new ArrayList<String>(); String[] temp; for(Text val:values) { temp = val.toString().split("#"); if(temp[0].equals("young")) grandchild.add(temp[1]); else grandparent.add(temp[1]); } if(grandchild.size()==0||grandparent.size()==0) return; for(String gc:grandchild) for(String gp:grandparent) context.write(new Text(gc), new Text(gp)); } } }
(一)
(1)编写A.txt、B.txt
(2)上传到hdfs上
(3)编程实现的结果
代码在上面,总代码,但是cmd要自己改为1
(二)
(1)准备text1.txt、text2.txt、text3.txt数据,且上传到hdfs上
(2)编程实现
总代码cmd改成2
(三)
说明:在写child-parent的时候,中间用一个空格隔开数据,不要用其他的。
(1)数据准备
(2)编程实现
代码cmd改成3
标签:Text,编程,MapReduce,public,job,实验,context,new,class From: https://www.cnblogs.com/hmy22466/p/17877935.html