背景说明
在早期的Flink1.9时,为了对Flink任务的进行部署管理,对Flink任务提交的流程进行分析。刚好以前的博客图片失效了,那就用Flink1.13来再读一遍相关源码。
任务提交
flink任务提交的起点是flink脚本,以提交至Yarn为例,我们运行wordcount的脚本如下:
bin/flink run -t yarn-per-job examples/batch/WordCount.jar
Flink脚本
最后一行执行java命令
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
其中 org.apache.flink.client.cli.CliFrontend 就是Flink任务提交的入口类。
导入Flink源码,CliFrontend类属于flink-clients模块。
Flink任务从提交到执行分主要由三大块:
- 初始化配置信息
- 封装用户代码和任务配置
- 调用并执行用户代码
CliFrontend.java
代码
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
//初始化Filnk配置信息
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
//加载并封装配置参数
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
//执行任务代码
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
} finally {
System.exit(retCode);
}
相对来说Flink的代码还是比较易读,但也有淡疼的地方,封装调用层层嵌套,容易读着读着跑偏了。
在CliFrontend类中有个run方法,就对应我们命令行的run动作
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
}
ClientUtils.java
run方法最终调用ClientUtils.executeProgram方法来调用我们自己写的任务类代码
.....
try {
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
.....
PackagedProgram.java
再经过层层调用,PackagedProgram.callMainMethod最终通过反射执行用户代码逻辑。
Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " must be public.");
}
try {
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
throw new ProgramInvocationException(
"Could not look up the main(String[]) method from the class "
+ entryClass.getName()
+ ": "
+ t.getMessage(),
t);
}
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-public main method.");
}
try {
mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
throw new ProgramInvocationException(
"Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
throw new ProgramInvocationException(
"Access to the main method was denied: " + e.getMessage(), e);
} catch (InvocationTargetException e) {
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
throw new ProgramInvocationException(
"The main method caused an error: " + exceptionInMethod.getMessage(),
exceptionInMethod);
}
} catch (Throwable t) {
throw new ProgramInvocationException(
"An error occurred while invoking the program's main method: " + t.getMessage(),
t);
}
总结
顺手简单画了一个调用流程图
如果只关心Flink任务是如何提交到集群并执行的,那到此其实就可以结束了。相对而言还是比较简单的,不过里面的嵌套调用有点多,先理个主干出来就容易理解了。
当然小细节也是比较多的,比如加载命令行对象的过程中,对加载顺序是有一定要求的。
标签:流程,Flink,final,ProgramInvocationException,提交,new,main,throw From: https://www.cnblogs.com/panshan-lurenjia/p/17496896.html