首页 > 其他分享 >Flink任务提交流程分析

Flink任务提交流程分析

时间:2023-06-21 18:33:07浏览次数:44  
标签:流程 Flink final ProgramInvocationException 提交 new main throw

背景说明

在早期的Flink1.9时,为了对Flink任务的进行部署管理,对Flink任务提交的流程进行分析。刚好以前的博客图片失效了,那就用Flink1.13来再读一遍相关源码。

任务提交

flink任务提交的起点是flink脚本,以提交至Yarn为例,我们运行wordcount的脚本如下:

bin/flink run -t yarn-per-job examples/batch/WordCount.jar

Flink脚本

最后一行执行java命令

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

其中 org.apache.flink.client.cli.CliFrontend 就是Flink任务提交的入口类。

导入Flink源码,CliFrontend类属于flink-clients模块。

Flink任务从提交到执行分主要由三大块:

  1. 初始化配置信息
  2. 封装用户代码和任务配置
  3. 调用并执行用户代码

CliFrontend.java

代码

EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

// 1. find the configuration directory
//初始化Filnk配置信息
final String configurationDirectory = getConfigurationDirectoryFromEnv();

// 2. load the global configuration
final Configuration configuration =
        GlobalConfiguration.loadConfiguration(configurationDirectory);

// 3. load the custom command lines
//加载并封装配置参数
final List<CustomCommandLine> customCommandLines =
        loadCustomCommandLines(configuration, configurationDirectory);

int retCode = 31;
try {
    final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

    SecurityUtils.install(new SecurityConfiguration(cli.configuration));
    //执行任务代码
    retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
    final Throwable strippedThrowable =
            ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
    LOG.error("Fatal error while running command line interface.", strippedThrowable);
    strippedThrowable.printStackTrace();
} finally {
    System.exit(retCode);
}

相对来说Flink的代码还是比较易读,但也有淡疼的地方,封装调用层层嵌套,容易读着读着跑偏了。

在CliFrontend类中有个run方法,就对应我们命令行的run动作

final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);

// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
    CliFrontendParser.printHelpForRun(customCommandLines);
    return;
}

final CustomCommandLine activeCommandLine =
        validateAndGetActiveCommandLine(checkNotNull(commandLine));

final ProgramOptions programOptions = ProgramOptions.create(commandLine);

final List<URL> jobJars = getJobJarAndDependencies(programOptions);

final Configuration effectiveConfiguration =
        getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
    
    executeProgram(effectiveConfiguration, program);
}

ClientUtils.java

run方法最终调用ClientUtils.executeProgram方法来调用我们自己写的任务类代码

.....
try {
    program.invokeInteractiveModeForExecution();
} finally {
    ContextEnvironment.unsetAsContext();
    StreamContextEnvironment.unsetAsContext();
}
.....

PackagedProgram.java

再经过层层调用,PackagedProgram.callMainMethod最终通过反射执行用户代码逻辑。

Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
    throw new ProgramInvocationException(
            "The class " + entryClass.getName() + " must be public.");
}

try {
    mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
    throw new ProgramInvocationException(
            "The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
    throw new ProgramInvocationException(
            "Could not look up the main(String[]) method from the class "
                    + entryClass.getName()
                    + ": "
                    + t.getMessage(),
            t);
}

if (!Modifier.isStatic(mainMethod.getModifiers())) {
    throw new ProgramInvocationException(
            "The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
    throw new ProgramInvocationException(
            "The class " + entryClass.getName() + " declares a non-public main method.");
}

try {
    mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
    throw new ProgramInvocationException(
            "Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
    throw new ProgramInvocationException(
            "Access to the main method was denied: " + e.getMessage(), e);
} catch (InvocationTargetException e) {
    Throwable exceptionInMethod = e.getTargetException();
    if (exceptionInMethod instanceof Error) {
        throw (Error) exceptionInMethod;
    } else if (exceptionInMethod instanceof ProgramParametrizationException) {
        throw (ProgramParametrizationException) exceptionInMethod;
    } else if (exceptionInMethod instanceof ProgramInvocationException) {
        throw (ProgramInvocationException) exceptionInMethod;
    } else {
        throw new ProgramInvocationException(
                "The main method caused an error: " + exceptionInMethod.getMessage(),
                exceptionInMethod);
    }
} catch (Throwable t) {
    throw new ProgramInvocationException(
            "An error occurred while invoking the program's main method: " + t.getMessage(),
            t);
}

总结

顺手简单画了一个调用流程图

如果只关心Flink任务是如何提交到集群并执行的,那到此其实就可以结束了。相对而言还是比较简单的,不过里面的嵌套调用有点多,先理个主干出来就容易理解了。

当然小细节也是比较多的,比如加载命令行对象的过程中,对加载顺序是有一定要求的。

标签:流程,Flink,final,ProgramInvocationException,提交,new,main,throw
From: https://www.cnblogs.com/panshan-lurenjia/p/17496896.html

相关文章

  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步
    这篇教程将展示如何基于FlinkCDC快速构建MySQL到Databend的实时数据同步。本教程的演示都将在FlinkSQLCLI中进行,只涉及SQL,无需一行Java/Scala代码,也无需安装IDE。假设我们有电子商务业务,商品的数据存储在MySQL,我们需要实时把它同步到Databend中。接下来的内......
  • 远程提交代码冲突合并流程
    步骤1:获取源代码,本地进行分支合并.gitfetchorigingitcheckout当前分支gitmergeorigin/xxx这里没太明白,反正就是远端的分支,origin/远端分支版本步骤2:查看冲突文件,本地解决冲突.步骤3:推送至源分支,修改直接展示在当前合并请求中.gitadd.gitcommit-m'fix......
  • springboot启动流程 (3) 自动装配
    在SpringBoot中,EnableAutoConfiguration注解用于开启自动装配功能。本文将详细分析该注解的工作流程。EnableAutoConfiguration注解启用SpringBoot自动装配功能,尝试猜测和配置可能需要的组件Bean。自动装配类通常是根据类路径和定义的Bean来应用的。例如,如果类路径上有tomcat-......
  • POSTGRESQL 事务控制(三) 事务关闭与怎么设置PG 异步提交提高性能
    。最近接到网友反馈,说次系列是打开5秒钟系列,打开5秒后就关闭了,其实我想说的是,我更难,写的脑袋疼,不过估计疼一段时间就不会再疼的,看一段时间就可以看更长的时间,人的进步是吧。接上期,事务在执行完毕后,结束的动作分为两种,1事务提交2 事务回滚至于事务回滚时的问题主要也分手动终止以......
  • 使用IDEA回滚某次提交的代码步骤,和回滚某次已经commit的代码但是没有push的代码
    使用IDEA回滚某次提交的代码步骤1.已经push的代码回滚选中提交的版本:右击RevertCommit会新增一个Revert“xxxCommit”的Commit记录,并将"xxxCommit"中的代码全部回滚。如果是已经push到远端的Commit,RevertCommit后还需要进行push。 2.已经commit但是没有pus......
  • 14. SpringMVC执行流程
    14.1、SpringMVC常用组件DispatcherServlet:前端控制器,不需要工程师开发,由框架提供作用:统一处理请求和响应,整个流程控制的中心,由它调用其它组件处理用户的请求HandlerMapping:处理器映射器,不需要工程师开发,由框架提供作用:根据请求的url、method等信息查找Handler,即控制......
  • XXL-job开源框架相关的源码流程解析。
    XXL-job框架是一个分布式的定时任务框架。他简单快捷。配置方便。而且用途广泛。所以他的源码非常值得一看。对于我来说。其中其自写的RPC框架。以及处理发布多个定时任务的高并发处理。是我打开微服务的大门。这是一篇xxl-job源码的解析与流程分析。比较偏口语化。在这篇随笔中......
  • iOS开发笔记 - App上架流程(视频分享)
    具体的文档可以看一下我的《iOS开发笔记-上线流程》iOS项目上线流程视频百度云盘分享下面是一些相关的官方文档:https://developer.apple.com/app-store/review/guidelines/-项目审核指南http://www.apple.com/legal/intellectual-property/guidelinesfor3rdparties.htmlhttps......
  • pixel 3xl 编译安卓与内核并烧入全流程(含安卓源码部分编译)
    pixel3xl编译安卓与内核并烧入全流程(含安卓源码部分编译)目录pixel3xl编译安卓与内核并烧入全流程(含安卓源码部分编译)环境搭建安卓源码下载一、准备下载环境1、安装Python3.92、安装git3、安装curl4、配置环境变量安装repo二、下载源代码1、创建目录2、初始化仓库3、同步安......
  • 解决svn提交失败情况
    情况一:更新后出现若干个冲突文件,文件所在的文件夹也会有红色感叹号解决方法:1.Revert看下有哪些冲突文件 2.Revert列表建议按Status排序,这样我们需要解决的冲突对象就会聚在一起(那些红色的Conflicted文件) 在开始处理冲突之前,需要先判断这些文件是否与自己的修改内容有......