原理一:切片与MapTask并行度决定机制
MapTask之前了解到了,他是在分布式程序在map阶段的一个进程,管理之一个map任务类似于一个master。那么什么是切片?
说起切片,很明显就是对数据的切分。在之前了解到数据是以block(数据块)的形式传输到集群上的,block是对数据的物理切分。数据切片则是对数据的逻辑切分,并不会根据切片在磁盘上存储。它是MapReduce程序计算输入数据的基本单位,一般一个数据切片对应启动一个MapTask,而MapTask的数量就是并行度,所以说切片和maptask的并行度决定机制。
具体细节如下:
(1)一个Job在map阶段并行度由客户端提交的切片数决定。
(2)每个split切片分配一个MapTask并行实例处理。
(3)默认情况下切片大小=block的size。
(4)切片不考虑数据集整体,而是对每个文件单独进行切片。
源码分析
以之前的WordCount为例去分析(在本地运行)
Job提交流程
再提交job处设置断点
强制进入
直接进入job的submit()
ensuerState()来确保工作的状态,在之前进入submit()已经做出过验证。进入方法可以看到只要状态不符合就会抛出异常。
private void ensureState(JobState state) throws IllegalStateException { if (state != this.state) { throw new IllegalStateException("Job in state " + this.state + " instead of " + state); } else if (state == Job.JobState.RUNNING && this.cluster == null) { throw new IllegalStateException("Job in state " + this.state + ", but it isn't attached to any job tracker!"); } }
然后退出,进入设置setUseNewAPI(),里面就是设置api
private void setUseNewAPI() throws IOException { int numReduces = this.conf.getNumReduceTasks(); String oldMapperClass = "mapred.mapper.class"; String oldReduceClass = "mapred.reducer.class"; this.conf.setBooleanIfUnset("mapred.mapper.new-api", this.conf.get(oldMapperClass) == null); String mode; if (this.conf.getUseNewMapper()) { mode = "new map API"; this.ensureNotSet("mapred.input.format.class", mode); this.ensureNotSet(oldMapperClass, mode); if (numReduces != 0) { this.ensureNotSet("mapred.partitioner.class", mode); } else { this.ensureNotSet("mapred.output.format.class", mode); } } else { mode = "map compatibility"; this.ensureNotSet("mapreduce.job.inputformat.class", mode); this.ensureNotSet("mapreduce.job.map.class", mode); if (numReduces != 0) { this.ensureNotSet("mapreduce.job.partitioner.class", mode); } else { this.ensureNotSet("mapreduce.job.outputformat.class", mode); } } if (numReduces != 0) { this.conf.setBooleanIfUnset("mapred.reducer.new-api", this.conf.get(oldReduceClass) == null); if (this.conf.getUseNewReducer()) { mode = "new reduce API"; this.ensureNotSet("mapred.output.format.class", mode); this.ensureNotSet(oldReduceClass, mode); } else { mode = "reduce compatibility"; this.ensureNotSet("mapreduce.job.outputformat.class", mode); this.ensureNotSet("mapreduce.job.reduce.class", mode); } } }
进入connect()方法,发现它会new一个Cluster,我查了一下Cluster的意思(集群)
步入方法,看到教程里讲述的在构造方法里 this.initialize(jobTrackAddr, conf); 设置断点,步出,然后向下执行代码,会直接走到断点处,然后步入initialize()方法
看下initialize()方法
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.initProviderList(); IOException initEx = new IOException("Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses."); if (jobTrackAddr != null) { LOG.info("Initializing cluster for Job Tracker=" + jobTrackAddr.toString()); } Iterator var4 = this.providerList.iterator(); while(var4.hasNext()) { ClientProtocolProvider provider = (ClientProtocolProvider)var4.next(); LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { this.clientProtocolProvider = provider; this.client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } catch (Exception var9) { String errMsg = "Failed to use " + provider.getClass().getName() + " due to error: "; initEx.addSuppressed(new IOException(errMsg, var9)); LOG.info(errMsg, var9); } } if (null == this.clientProtocolProvider || null == this.client) { throw initEx; } }
发现里面有一个providerList的迭代器,然后遍历一下,在遍历结束之前客户端是Yarn的
遍历结束后就变为Local的了。然后继续向下执行,执行完connect()。进入 final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());
从方法的参数可以看到得到集群的文件系统和得到集群的客户端,客户端是本地,还有文件系统的配置信息
退出
进入断点,进入 submitter.submitJobInternal(Job.this, Job.this.cluster) 方法
this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, Job.this.cluster); } });
再提交任务的方法中首先是checkSpecs()方法,步入
private void checkSpecs(Job job) throws ClassNotFoundException, InterruptedException, IOException { label23: { JobConf jConf = (JobConf)job.getConfiguration(); if (jConf.getNumReduceTasks() == 0) { if (jConf.getUseNewMapper()) { break label23; } } else if (jConf.getUseNewReducer()) { break label23; } jConf.getOutputFormat().checkOutputSpecs(this.jtFs, jConf); return; } OutputFormat<?, ?> output = (OutputFormat)ReflectionUtils.newInstance(job.getOutputFormatClass(), job.getConfiguration()); output.checkOutputSpecs(job); }
设置断点,进入检查输出路径的方法
向下执行,可以看到job提交的路径
向下执行完这两行代码后,会发现提交的job路径多出了一些切片文件,同时在也会提交相关的配置xml信息,如果是运行在集群上还会提交对应的jar包。
向下执行会发现任务的状态变为了running
标签:框架,ensureNotSet,MapReduce,job,mode,conf,Job,原理,class From: https://www.cnblogs.com/20203923rensaihang/p/17135740.html