MapReduce提交过程
在Xshell中输入
bash -X
命令可以在Bash shell中启用debug模式,显示执行过程中的详细信息,例如每条命令的执行结果以及执行的步骤。
-
Hadoop提交执行
-
开始使用Java命令执行 java org.apache.hadoop.util.RunJar hadoop-1.0-SNAPSHOT.jar com.shujia.mr.worcount.WordCount
-
开始运行RunJar类中的main方法
public static void main(String[] args) throws Throwable { new RunJar().run(args); => args表示Java命令运行时对应的传参 // 参数: hadoop-1.0-SNAPSHOT.jar com.shujia.mr.worcount.WordCount }
-
开始调用run方法
public void run(String[] args) throws Throwable { String usage = "RunJar jarFile [mainClass] args..."; if (args.length < 1) { System.err.println(usage); System.exit(-1); } int firstArg = 0; // fileName = hadoop-1.0-SNAPSHOT.jar String fileName = args[firstArg++]; File file = new File(fileName); if (!file.exists() || !file.isFile()) { System.err.println("JAR does not exist or is not a normal file: " + file.getCanonicalPath()); System.exit(-1); } // mainClassName 主类名称 => Hadoop jar包中要运行的具体类 String mainClassName = null; JarFile jarFile; try { jarFile = new JarFile(fileName); } catch (IOException io) { throw new IOException("Error opening job jar: " + fileName) .initCause(io); } // 获取jar包中定义的主类 不用 Manifest manifest = jarFile.getManifest(); if (manifest != null) { mainClassName = manifest.getMainAttributes().getValue("Main-Class"); } jarFile.close(); // mainClassName在jar包中没有定义 => maven打包 if (mainClassName == null) { if (args.length < 2) { System.err.println(usage); System.exit(-1); } // firstArg =1 => 对应 com.shujia.mr.worcount.WordCount mainClassName = args[firstArg++]; } // 类路径的名称 mainClassName = mainClassName.replaceAll("/", "."); // java.io.tmpdir 临时的目录 File tmpDir = new File(System.getProperty("java.io.tmpdir 临时的目录 ")); ensureDirectory(tmpDir); ... // createClassLoader 类加载器方法 ClassLoader loader = createClassLoader(file, workDir); // 通过创建的类加载器loader 可以加载给定jar包中的类 Thread.currentThread().setContextClassLoader(loader); // Class.forName 可以构建 WordCount.class的类对象 Class<?> mainClass = Class.forName(mainClassName, true, loader); // WordCount.class的类对象 getMethod通过反射的方式获取类中的main方法 Method main = mainClass.getMethod("main", String[].class); // 将剩余的参数包装 再传入 main方法中 List<String> newArgsSubList = Arrays.asList(args) .subList(firstArg, args.length); String[] newArgs = newArgsSubList .toArray(new String[newArgsSubList.size()]); // invoke可以对当前Method对象中的方法进行执行 // new Object[] {newArgs} 表示main方法中的传参 => try { main.invoke(null, new Object[] {newArgs}); } catch (InvocationTargetException e) { throw e.getTargetException(); } }
-
开始运行自定类中的main方法 => 加载配置信息
job.waitForCompletion(true); // 开始提交执行
-
进入waitForCompletion方法内
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); // 当程序提交了 如果是再Hadoop集群中,是需要提交给Yarn运行 // mapreduce.JobSubmitter: Submitting tokens for job: job_1716520379305_0009 } return isSuccessful(); }
-
submit方法
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); // 设置新的API => Hadoop中有一些老API存在所以需要进行设置 connect(); // 异步创建cluster对象 在该对象中包含了有多个集群连接信息 => cluster 表示Yarn集群客户端 final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { // 开始正式提交任务 return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
- 注意:
- LocalJobRunner对应的是本地的数据运行
- YARNRunner是对应将数据提交到YARN上运行