首页 > 其他分享 >MapReduce分布式计算

MapReduce分布式计算

时间:2023-05-11 21:03:45浏览次数:31  
标签:map String 分布式计算 MapReduce public new 序列化 class

MapReduce是Hadoop系统核心组件之一,它是一种可用于大数据并行处理的计算模型、框架和平台,主要解决海量数据的计算,是目前分布式计算模型中应用较为广泛的一种。

练习:计算a.txt文件中每个单词出现的次数

hello world
hello hadoop
hello 51doit
hadoop mapreduce
mapreduce spark


public class WordCount {
    public static void main(String[] args) throws IOException {
        //获取到resource文件夹下a.txt的路径
        URL resource = WordCount.class.getClassLoader().getResource("a.txt");
        String path = resource.getPath();
        //使用FileUtils将文件读取成字符串
        String s = FileUtils.readFileToString(new File(path),"utf-8");
        //将文件使用空格进行切割  \s可以切割 空格 tab键
        String[] arr = s.split("\\s+");

        //创建Map集合
        Map<String,Integer> map = new HashMap<>();

        //遍历数组
        for (String s1 : arr) {
            //判断集合是否包含指定键
            if(!map.containsKey(s1)){
                //如果不包含 添加 单词 1
                map.put(s1,1);
            }else{
                //如果包含  获取当前键的次数 +1 在添加回集合
                Integer count = map.get(s1);
                map.put(s1,count+1);
            }
        }

        System.out.println(map);
    }
}

通过以上的方式 计算出来了a.txt文件中每个单词出现的次数,但是我们想一下 ,如果a.txt文件非常大,怎么办?

比如有一个a.txt文件10个T的大小。这时一台计算机就没有办法计算了,因为我们根本存储不了,计算不了,那么一台计算机无法计算,就使用多台计算机来进行计算!

MapReduce核心思想

​ MapReduce的核心思想是“分而治之”。所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果,这种思想来源于日常生活与工作时的经验,同样也完全适合技术领域。

为了更好地理解“分而治之”思想,我们光来举一个生活的例子。例如,某大型公司在全国设立了分公司,假设现在要统计公司今年的营收情况制作年报,有两种统计方式,第1种方式是全国分公司将自己的账单数据发送至总部,由总部统一计算公司今年的营收报表:第2种方式是采用分而治之的思想,也就是说,先要求分公司各自统计营收情况,再将统计结果发给总部进行统一汇总计算。这两种方式相比,显然第2种方式的策略更好,工作效率更高效。

MapReduce 作为一种分布式计算模型,它主要用于解决海量数据的计算问题。使用MapReduce操作海量数据时,每个MapReduce程序被初始化为一个工作任务,每个工作任务可以分为Map 和l Reducc两个阶段,具体介绍如下:

Map阶段::负责将任务分解,即把复杂的任务分解成若干个“简单的任务”来行处理,但前提是这些任务没有必然的依赖关系,可以单独执行任务。

Reduce阶段:负责将任务合并,即把Map阶段的结果进行全局汇总。下面通过一个图来描述上述MapReduce 的核心思想。

MapReduce就是“任务的分解与结和的汇总”。即使用户不懂分布式计算框架的内部运行机制,但是只要能用Map和 Reduce思想描述清楚要处理的问题,就能轻松地在Hadoop集群上实现分布式计算功能。

MapReduce编程模型

MapReduce是一种编程模型,用于处理大规模数据集的并行运算。使用MapReduce执行计算任务的时候,每个任务的执行过程都会被分为两个阶段,分别是Map和Reduce,其中Map阶段用于对原始数据进行处理,Reduce阶段用于对Map阶段的结果进行汇总,得到最终结果。

MapReduce编程模型借鉴了函数式程序设计语言的设计思想,其程序实现过程是通过map()和l reduce()函数来完成的。从数据格式上来看,map()函数接收的数据格式是键值对,生的输出结果也是键值对形式,reduce()函数会将map()函数输出的键值对作为输入,把相同key 值的 value进行汇总,输出新的键值对。

(1)将原始数据处理成键值对<K1,V1>形式。

(2)将解析后的键值对<K1,V1>传给map()函数,map()函数会根据映射规则,将键值对<K1,V1>映射为一系列中间结果形式的键值对<K2,V2>。

(3)将中间形式的键值对<K2,V2>形成<K2,{V2,....>形式传给reduce()函数处理,把具有相同key的value合并在一起,产生新的键值对<K3,V3>,此时的键值对<K3,V3>就是最终输出的结果。

词频统计

因为我们的数据都存储在不同的计算机中,那么将对象中的数据从网络中传输,就一定要用到序列化!

/*
	JDK序列化对象的弊端 
	我们进行序列化 其实最主要的目的是为了 序列化对象的属性数据
	比如如果序列化一个Person对象 new Person("柳岩",38); 其实我们想要的是 柳岩 38
	但是如果直接序列化一个对象的话 JDK为了反序列化方便 会在文件中加入其他的数据 这样
	序列化后的文件会变的很大,占用空间
*/
public class Test {
    public static void main(String[] args) throws Exception {
        ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("d:\\person.txt"));

        //JDK序列化对象 
        Person p = new Person();
        p.setName("柳岩");
        p.setAge(38);
        oos.writeObject(p);
        oos.close();
    }
}

本来其实数据就占几个字节,序列化后,多占用了很多字节,这样如果序列化多的话就会浪费很多空间.

/*
	可以通过序列化属性的方式解决问题
	只序列化属性 可以减小序列化后的文件大小
*/
public class Test {
    public static void main(String[] args) throws Exception {
        ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("d:\\person.txt"));

        Person p = new Person();
        p.setName("柳岩");
        p.setAge(38);

        //只序列化属性
        oos.writeUTF(p.getName());
        oos.writeInt(p.getAge());
        oos.close();
    }
}

/*
	需要注意
	反序列化时 需要按照序列化的顺序来反序列化
*/
public class Test {
    public static void main(String[] args) throws Exception {
        ObjectInputStream ois = new ObjectInputStream(new FileInputStream("d:\\person.txt"));
		//先反序列化name 在反序列化age
        String name = ois.readUTF();
        int age = ois.readInt();
        System.out.println(name + " "+age);
        ois.close();
    }
}

Hadoop对java的序列化又进行了优化,对一些类型进行了进一步的封装,方便按照自己的方式序列化

Integer  ----> IntWritable
Long     ----> LongWritable
String   ----> Text
Double   ----> DoubleWritable
Boolean  ----> BooleanWritable

WorldCount代码编写

map函数定义

/*
	KEYIN: K1   
	VALUIN: V1  
	KEYOUT:K2   
	VALUEOUT:V2 
*/
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  
   protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 		context) throws IOException, InterruptedException {
    		
    }
}

我们只需要继承Mapper类,重写map方法就好

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
     K1 : 行起始位置   数字 Long  ---- > LongWritable
     V1 : 一行数据  字符串 String  -----> Text
     K2 : 单词     字符串 String  ----->  Text
     V2 : 固定数字1 数组  Long -----> LongWritable
 */
public class WordCountMapper  extends Mapper<LongWritable, Text,Text,LongWritable> {


    /**
     *
     * @param key   K1
     * @param value V1
     * @param context  上下文对象 将map的结果 输出给reduce
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //将一行数据 转换成字符串 按照空格切割 
        String[] arr = value.toString().split("\\s+");
        for (String k2 : arr) {
              //将单词输出给reduce
              context.write(new Text(k2),new LongWritable(1));
        }
    }
}

reduce函数定义

/*
	KEYIN:K2 
	VALUEIN:V2
	KEYOUT:K3
	VALUEOUT:V3
*/
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  
    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, 					VALUEOUT>.Context context) throws IOException, InterruptedException {

    }
}

我们只需要继承Reducer类型重写reduce方法就好

/*
    K2:单词        String  ----> Text
    V2:固定数字 1   Long    ----> LongWritable
    K3:单词        String  ----> Text
    V3:相加后的结果 Long    ----> LongWritable
 */
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {

    /**
     *
     * @param key  K2
     * @param values  V2的集合 {1,1,1,1}
     * @param context 上下文对象 输出结果
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

        int count = 0;
        //将次数相加
        for (LongWritable value : values) {
               count+=value.get();
        }
        //写出 k3 v3
        context.write(key,new LongWritable(count));
    }
}

最后编写启动程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Test {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //创建配置对象
        Configuration conf = new Configuration();
        //创建工作任务
        Job job = Job.getInstance(conf, "wordCount");

        //设置Map类
        job.setMapperClass(WordCountMapper.class);
        //设置Reduce类
        job.setReducerClass(WordCountReducer.class);

        //设置map的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //设置reduce的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //设置读取文件位置  可以是文件 也可以是文件夹
        FileInputFormat.setInputPaths(job,new Path("d:\\work\\abc"));
        //设置输出文件位置
        FileOutputFormat.setOutputPath(job,new Path("d:\\work\\abc\\out_put"));

        //提交任务 并等待任务结束
        job.waitForCompletion(true);
    }
}

如果抛这个异常 需要查看windows环境
Exception in thread "main"java.lang .UnsatisfiedLinkError: org.apache .hadoop.io.nativeio.NativeIO$windows.access0(Ljava/lang/string;1) .
 如果已经配置了环境 还不行 在src新建包 org.apache.hadoop.io.nativeio
 然后hadoop02文件夹中的 NativeIO.java添加到这个包下 重新运行尝试

若要显示报错信息在resouces目录下添加log4j.properties
内容如下:

log4j.rootCategory=INFO,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

标签:map,String,分布式计算,MapReduce,public,new,序列化,class
From: https://www.cnblogs.com/paopaoT/p/17392209.html

相关文章

  • Mapreduce二次排序时,将jar包上传至Hadoop上运行时,抛出异常"java.util.NoSuchElementEx
    查询原因后发现是java中实现实现Mapper时StringTokenizer类时使用了一个方法nextToken()会抛出这个异常,”我们可以使用hasMoreTokens()和hasMoreElements()方法来避免异常。如果标记器的字符串中有更多标记可用,则这两种方法都返回true。只有当hasMoreTokens()方法返回Tr......
  • mapreduce测试时出现INFO client.RMProxy: Connecting to ResourceManager at 0.0.0.0
    如运行wordcount后出现INFOclient.RMProxy:ConnectingtoResourceManagerat0.0.0.0:8032长时间不动,我尝试修改我的yarn-site.xml配置后可以成功运行  <property>    <name>yarn.nodemanager.aux-services</name>    <value>mapreduce_shuffle</value>  </pr......
  • ray-分布式计算框架-集群与异步Job管理
    0.ray简介ray是开源分布式计算框架,为并行处理提供计算层,用于扩展AI与Python应用程序,是ML工作负载统一工具包RayAIRuntimeML应用程序库集RayCore通用分布式计算库Task--Ray允许任意Python函数在单独的Pythonworker上运行,这些异步Python函数称为任务Actor......
  • MapReduce原理
         MapReduce运行流程  MapReduce容错机制 ......
  • hiveSQL mapreduce任务调优
    sethive.merge.mapredfiles=true;--在Map-Reduce的任务结束时合并小文件setmapred.max.split.size=30000000;--决定每个map处理的最大的文件大小,单位为B--setmapred.min.split.size=10000000;--公司集群默认值--setmapred.min.split.size.per.node=;......
  • 并行计算、分布式计算、集群计算和网格计算的介绍,以及主要有哪些区别?
    并行计算(ParallelComputing)并行计算或称平行计算是相对于串行计算来说的。并行计算(ParallelComputing)是指同时使用多种计算资源解决计算问题的过程。为执行并行计算,计算资源应包括一台配有多处理机(并行处理)的计算机、一个与网络相连的计算机专有编号,或者两者结合使用。......
  • Hadoop的生态体系,HDFS和MapReduce等的具体介绍
    Hadoop的两大核心就是HDFS和MapReduce,而整个Hadoop的体系结构主要是通过HDFS的分布式存储作为底层数据支持的。并且会通过MapReduce来进行计算分析。Hadoop1.x的核心:HadoopCommonHadoopDistributedFileSystem(HDFS)HadoopMapReduceHadoop2.x的核心:HadoopCommonHadoopDistribu......
  • 分布式计算技术(下):Impala、Apache Flink、星环Slipstream
    实时计算的发展历史只有十几年,它与基于数据库的计算模型有本质区别,实时计算是固定的计算任务加上流动的数据,而数据库大多是固定的数据和流动的计算任务,因此实时计算平台对数据抽象、延时性、容错性、数据语义等的要求与数据库明显不同,面向实时计算的数据架构也就发展起来。本篇我......
  • MIT 6.5840 2023 Spring(6.824)LAB1:MapReduce
    MIT6.58402023Spring(6.824)LAB1:MapReduce前言本次lab主要是完成一个基于RPC远程调用的单机单文件系统的简单MapReduce框架,并完成单词计数任务。基于golang实现,单Master,多Worker。实现worker的奔溃恢复(FaultTorrance),通过超时重新执行实现。主要的任务有,RPC调用参数及返回参数......
  • 分布式计算技术(上):经典计算框架MapReduce、Spark 解析
    当一个计算任务过于复杂不能被一台服务器独立完成的时候,我们就需要分布式计算。分布式计算技术将一个大型任务切分为多个更小的任务,用多台计算机通过网络组装起来后,将每个小任务交给一些服务器来独立完成,最终完成这个复杂的计算任务。本篇我们介绍两个经典的计算框架MapReduce和Sp......