首页 > 其他分享 >MapReduce框架原理

MapReduce框架原理

时间:2023-02-21 23:56:17浏览次数:28  
标签:框架 ensureNotSet MapReduce job mode conf Job 原理 class

原理一:切片与MapTask并行度决定机制

  MapTask之前了解到了,他是在分布式程序在map阶段的一个进程,管理之一个map任务类似于一个master。那么什么是切片?

  说起切片,很明显就是对数据的切分。在之前了解到数据是以block(数据块)的形式传输到集群上的,block是对数据的物理切分。数据切片则是对数据的逻辑切分,并不会根据切片在磁盘上存储。它是MapReduce程序计算输入数据的基本单位,一般一个数据切片对应启动一个MapTask,而MapTask的数量就是并行度,所以说切片和maptask的并行度决定机制。

  具体细节如下:

  (1)一个Job在map阶段并行度由客户端提交的切片数决定。

  (2)每个split切片分配一个MapTask并行实例处理。

  (3)默认情况下切片大小=block的size。

  (4)切片不考虑数据集整体,而是对每个文件单独进行切片。

源码分析

  以之前的WordCount为例去分析(在本地运行)

  Job提交流程

  再提交job处设置断点

  

 

 

强制进入

 

 直接进入job的submit()

 

 ensuerState()来确保工作的状态,在之前进入submit()已经做出过验证。进入方法可以看到只要状态不符合就会抛出异常。

private void ensureState(JobState state) throws IllegalStateException {
        if (state != this.state) {
            throw new IllegalStateException("Job in state " + this.state + " instead of " + state);
        } else if (state == Job.JobState.RUNNING && this.cluster == null) {
            throw new IllegalStateException("Job in state " + this.state + ", but it isn't attached to any job tracker!");
        }
    }

然后退出,进入设置setUseNewAPI(),里面就是设置api

 private void setUseNewAPI() throws IOException {
        int numReduces = this.conf.getNumReduceTasks();
        String oldMapperClass = "mapred.mapper.class";
        String oldReduceClass = "mapred.reducer.class";
        this.conf.setBooleanIfUnset("mapred.mapper.new-api", this.conf.get(oldMapperClass) == null);
        String mode;
        if (this.conf.getUseNewMapper()) {
            mode = "new map API";
            this.ensureNotSet("mapred.input.format.class", mode);
            this.ensureNotSet(oldMapperClass, mode);
            if (numReduces != 0) {
                this.ensureNotSet("mapred.partitioner.class", mode);
            } else {
                this.ensureNotSet("mapred.output.format.class", mode);
            }
        } else {
            mode = "map compatibility";
            this.ensureNotSet("mapreduce.job.inputformat.class", mode);
            this.ensureNotSet("mapreduce.job.map.class", mode);
            if (numReduces != 0) {
                this.ensureNotSet("mapreduce.job.partitioner.class", mode);
            } else {
                this.ensureNotSet("mapreduce.job.outputformat.class", mode);
            }
        }

        if (numReduces != 0) {
            this.conf.setBooleanIfUnset("mapred.reducer.new-api", this.conf.get(oldReduceClass) == null);
            if (this.conf.getUseNewReducer()) {
                mode = "new reduce API";
                this.ensureNotSet("mapred.output.format.class", mode);
                this.ensureNotSet(oldReduceClass, mode);
            } else {
                mode = "reduce compatibility";
                this.ensureNotSet("mapreduce.job.outputformat.class", mode);
                this.ensureNotSet("mapreduce.job.reduce.class", mode);
            }
        }

    }

进入connect()方法,发现它会new一个Cluster,我查了一下Cluster的意思(集群)

 

 步入方法,看到教程里讲述的在构造方法里 this.initialize(jobTrackAddr, conf); 设置断点,步出,然后向下执行代码,会直接走到断点处,然后步入initialize()方法

 

 看下initialize()方法

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
        this.initProviderList();
        IOException initEx = new IOException("Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.");
        if (jobTrackAddr != null) {
            LOG.info("Initializing cluster for Job Tracker=" + jobTrackAddr.toString());
        }

        Iterator var4 = this.providerList.iterator();

        while(var4.hasNext()) {
            ClientProtocolProvider provider = (ClientProtocolProvider)var4.next();
            LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName());
            ClientProtocol clientProtocol = null;

            try {
                if (jobTrackAddr == null) {
                    clientProtocol = provider.create(conf);
                } else {
                    clientProtocol = provider.create(jobTrackAddr, conf);
                }

                if (clientProtocol != null) {
                    this.clientProtocolProvider = provider;
                    this.client = clientProtocol;
                    LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider");
                    break;
                }

                LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol");
            } catch (Exception var9) {
                String errMsg = "Failed to use " + provider.getClass().getName() + " due to error: ";
                initEx.addSuppressed(new IOException(errMsg, var9));
                LOG.info(errMsg, var9);
            }
        }

        if (null == this.clientProtocolProvider || null == this.client) {
            throw initEx;
        }
    }

发现里面有一个providerList的迭代器,然后遍历一下,在遍历结束之前客户端是Yarn的

 

 遍历结束后就变为Local的了。然后继续向下执行,执行完connect()。进入 final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());

从方法的参数可以看到得到集群的文件系统和得到集群的客户端,客户端是本地,还有文件系统的配置信息

 

 退出

 

 

进入断点,进入 submitter.submitJobInternal(Job.this, Job.this.cluster) 方法

this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
            public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
                return submitter.submitJobInternal(Job.this, Job.this.cluster);
            }
        });

再提交任务的方法中首先是checkSpecs()方法,步入

   private void checkSpecs(Job job) throws ClassNotFoundException, InterruptedException, IOException {
        label23: {
            JobConf jConf = (JobConf)job.getConfiguration();
            if (jConf.getNumReduceTasks() == 0) {
                if (jConf.getUseNewMapper()) {
                    break label23;
                }
            } else if (jConf.getUseNewReducer()) {
                break label23;
            }

            jConf.getOutputFormat().checkOutputSpecs(this.jtFs, jConf);
            return;
        }

        OutputFormat<?, ?> output = (OutputFormat)ReflectionUtils.newInstance(job.getOutputFormatClass(), job.getConfiguration());
        output.checkOutputSpecs(job);
    }

设置断点,进入检查输出路径的方法

 

 

 

 向下执行,可以看到job提交的路径

 

 

 

 

 

 向下执行完这两行代码后,会发现提交的job路径多出了一些切片文件,同时在也会提交相关的配置xml信息,如果是运行在集群上还会提交对应的jar包。

 

 向下执行会发现任务的状态变为了running

 

 

 

 

标签:框架,ensureNotSet,MapReduce,job,mode,conf,Job,原理,class
From: https://www.cnblogs.com/20203923rensaihang/p/17135740.html

相关文章

  • 28-DRF框架-搭建环境开发restful接口
    #将程序中的一个数据结构类型转换为其他格式(字典、JSON、XML等),例如将Django中的模型类对象装换为JSON字符串,这个转换过程我们称为序列化#将其他格式(字典、JSON、XML等)转......
  • cpu开启节能或者关闭方法及原理
    情况说明CPU支持c-state特性,在负载较低时,会降低CPU频率而实现节能。原理介绍c-state有C0/C1/C2....级别,其中C0为CPU正常运行状态,运行在额定频率,数字越大代表CPU节能模式越深......
  • LVS三种工作模式及原理详解
    什么是LVS?摘自:https://blog.csdn.net/qq_59369367/article/details/124951685LVS是LinuxVirtualServer的简写,也就是Linux虚拟服务器,是一个虚拟的服务器集群系统,......
  • Vue第三方框架之ElementUi
    目录Vue第三方框架之ElementUi1.热门组件库2.elementui的使用Vue第三方框架之ElementUi1.热门组件库使用第三方插件插件和库:https://github.com/vuejs/awesome-vu......
  • Mybatis Plus 框架项目落地实践总结
    在使用了MybatisPlus框架进行项目重构之后,关于如何更好的利用Mybatisplus。在此做一些总结供大家参考。主要总结了以下这几个方面的实践。基础设计BaseEntity逻辑......
  • 【Hive】hive的架构原理
    Hive本质:将HQL(hiveSQL)转化成MapReduce程序Hive架构Hive的主要组成部分    Hive压根就不是数据库,hive除了语言类似之外,存储和计算都是使用hadoop来完成的。......
  • Spring框架3--Web
    Spring框架之WebJavaweb三大组件和四大域顺便:Javaweb中的四大域,作用范围如下:PageContext<Request<Session<ServletContext(Application)域对象属性的作用范围......
  • 计算机组成原理02
    一.计算机的发展史1.1.第一台电子计算机是ENIAC(埃尼亚克)。1.没有存储器。2.通过布线进行控制。1.1.1.第一台机械式计算机是法国帕斯卡发明的。1.2.微型计算机是使用......
  • koa中间件的实现原理
    koa中间件的实现原理如何?先来看一个例子。koa的执行顺序是这样的:constmiddleware=asyncfunction(ctx,next){console.log(1)awaitnext()console.log(6)......
  • Django框架课-创建游戏界面 (1)
    创建游戏界面(1)最后的结构:playground/|--ac_game_object|`--zbase.js|--game_map|`--zbase.js|--particle|`--zbase.js|--player|`--zbas......