MapReduce是一种分布式运算程序的编程框架,是用户开发“基于hadoop数据分析应用”的核心框架。
核心功能是用户编写的业务逻辑代码和系统自带的组件组合在一起,构成一个分布式运算程序,并发运行在Hadoop集群上。
MapReduce优缺点
MapReduce易于编程,简单实现它的接口,就可以完成一个分布式程序。并且分布式程序可以分布在大量廉价的机器上运行。这使得用户在编写分布式应用程序时跟编写简单的串行应用程序相同。
良好的扩展性:当计算资源得不到满足时可以简单地增加机器来扩展他的计算能力。
高容错性:MapReduce设计就是为了程序能够在廉价的机器上运行,当一台机器出现故障时他会将计算任务移交给其他的节点来完成,这个过程不需要人工参与由hadoop内部来完成。
适合PB级别海量数据的离线处理:主要是可以实现上千台以上的服务器集群并发工作,提供数据处理能力。
MapReduce缺点
不擅长实时计算
不擅长流式计算
补充:
。MapReduce输入的数据是静态的,不能动态变化。它自身的设计特点决定了他输入的数据资源不能动态变化。
不擅长有向无环图计算
多道应用程序存在依赖关系时(后一个应用程序的输入作为前一个应用程序的输出),如果MapReduce做的话,它会将每个MapReduce作业的输出结果写入到磁盘上,这会产生大量的磁盘io,导致性能十分低下。
MapReduce的核心编程思想——以统计单词次数为例
(1)分布式应用程序总共分为两个阶段——Map阶段和Reduce阶段
(2)Map阶段的MapTask(1 读数据,按行处理。2 按空格切分行内单词。3 形成kv键值对(单词,1)。4)完全并发运行互不干扰。
(3)Reduce阶段ReduceTask并发运行互不干扰,但数据依赖map阶段的输出。
(4)MapReduce编程模型只包含一个map阶段和reduce阶段,当用户业务逻辑十分复杂,就需要多道MapReduce应用程序串行运行(效率不高,所有有了Spark)
MapReduce进程
MapReduce程序在分布式运行有三个进程
MrAppMaster负责整个程序过程的调度和资源协调。
MapTask:负责map阶段的整个数据处理流程。
ReduceTask:负责reduce阶段的整个数据处理流程。
MapReduce的数据类型——自己的数据序列化类型
ByteWritable、BooleanWritable、IntWritable,FloatWritable、DoubleWritable、MapWritable、ArrayWritable、NullWritable、Text
MapReduce的变成规范
用户编写的程序有三个类,Mapper类、Reducer类和Driver类
Map阶段
(1)用户编写的mapper类要继承自己的父类
(2)用户的业务逻辑封装在重写父类的map()方法里。
(3)Mapper输入、输出数据都是<K,V>类型,KV类型可由用户自己定义.
(4)map()方法(MapTask)对每一个<k,v>执行一次。
Reduce阶段
(1)用户编写的reducer类要继承自己的父类
(2)用户的业务逻辑封装在重写父类的reduce()方法里。
(3) Reducer输入的数据类型对应Map阶段的输出数据类型
(4)ReduceTask对于每一个相同k的<k,v>组调用一次reduce方法。
Driver阶段
相当于YARN集群的客户端,用于提交程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。
代码实例——wordcount
Mapper类
package com.rsh.mapreduce.wordcount2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * KEYIN map阶段输入的key值 LongWritable * VALUEIN map阶段输入的value值 Text * KEYOUT map阶段输出的key值 Text * VALUEOUT map阶段输出的value值 IntWritable */ public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { private Text outK = new Text(); private IntWritable outV = new IntWritable(1) ; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //获取单行数据 String line = value.toString(); //对单行数据切分 String[] words = line.split(" "); //循环写出 for (String word : words) { //封装OUTKEY outK.set(word); //写出OUTVALUE context.write(outK,outV); } } }
Reducer类
package com.rsh.mapreduce.wordcount2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * KEYIN map阶段输入的key值 LongWritable * VALUEIN map阶段输入的value值 Text * KEYOUT map阶段输出的key值 Text * VALUEOUT map阶段输出的value值 IntWritable */ public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { private IntWritable outV = new IntWritable();; @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outV.set(sum); context.write(key,outV); } }
Driver类
package com.rsh.mapreduce.wordcount2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1、获取job Configuration configuration = new Configuration(); Job instance = Job.getInstance(configuration); //2、设置jar包路径 instance.setJarByClass(WordCountDriver.class); //3、关联mapper和reducer instance.setMapperClass(WordCountMapper.class); instance.setReducerClass(WordCountReducer.class); //4、设置map输出的kv类型 instance.setMapOutputKeyClass(Text.class); instance.setMapOutputValueClass(IntWritable.class); //5、设置最终输出的kv类型 instance.setOutputKeyClass(Text.class); instance.setOutputValueClass(IntWritable.class); //6、设置输入路径和输出路径 FileInputFormat.setInputPaths(instance,new Path(args[0])); FileOutputFormat.setOutputPath(instance,new Path(args[1])); //7、提交job boolean b = instance.waitForCompletion(true); System.exit(b ? 0 : 1); } }
标签:map,Text,MapReduce,hadoop,概述,apache,import From: https://www.cnblogs.com/20203923rensaihang/p/17133817.html