1.Mapper文件WordCountMapper.java
package com.hdfs; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final Text outK = new Text(); private final IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { outK.set(word); context.write(outK, outV); } } }
2.Reducer文件WordCountReducer.java
package com.hdfs; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private final IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outV.set(sum); context.write(key, outV); } }
3.Driver文件WordCountDriver.java
package com.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 1.创建job实例 Configuration config = new Configuration(); Job job = Job.getInstance(config); // 2.设置jar job.setJarByClass(WordCountDriver.class); // 3.设置Mapper和Reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4.设置map输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5.设置最终输出的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6.设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path("/home/navy/Desktop/work/hadoop/HadoopOperate/input")); FileOutputFormat.setOutputPath(job, new Path("/home/navy/Desktop/work/hadoop/HadoopOperate/output")); // 7.提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
4.pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>HdfsOperate</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.4</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>2.0.5</version> </dependency> </dependencies> </project>
5.项目结构
6.运行WordCountDriver.java里的main方法
标签:java,hadoop,MapReduce,job,io,org,apache,import From: https://www.cnblogs.com/navysummer/p/16971889.html