首页 > 其他分享 >深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

时间:2024-01-29 19:03:49浏览次数:54  
标签:Graph Flink private Job JobGraph JobVertex 提交 执行

Flink Program 编程套路回顾

1、获取执行环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、通过执行环境对象,注册数据源 Source,得到数据抽象
DataStream ds = env.socketTextStream(...)
3、调用数据抽象的各种Transformation执行逻辑计算
DataStream resultDS = ds.flatMap(...).keyBy(...).sum(...);
4、将各种Transformation执行完毕之后得到的计算结果数据抽象注册 Sink
resultDS.addSink(...)
5、提交Job执行
env.execute(...)

Flink Job 提交脚本解析

# Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-session
# Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run --target yarn-per-job
# Submission spinning up Flink on YARN cluster in Application Mode
./bin/flink run-application --target yarn-application

具体可以参考官网:

https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html

https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#advanced-cli

CliFrontend 提交分析

当用户把 Flink 应用程序打成 jar 使用 flink run ... 的 shell 命令提交的时候,底层是通过 CliFrontend 来处理。底层的逻辑,就是通过反射来调用用户程序的 main() 方法执行。

需要注意的是,Application 模式下,会通过 YarnClusterDescriptor.deployInternal 方法在 yarn 中部署一个 application 集群,返回 YarnRestClusterClient 对象。yarn 中会启动一个 EmbeddedJobClient,执行 submitJob 方法提交 jobGraph。

ExecutionEnvironment 源码解析

StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:

1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源。
2、提供了 setParallelism() 设置应用程序的并行度。
3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责 Job 执行的一些行为配置管理。还管理了 Configuration 管理一些其他的配置。这个所谓的其他配置,还包含了 Checkpoint 的配置,这个 chekcpoint 的配置参数,会单独解析出来,存储在 CheckpontConfig 中
4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些 Transformation 按照逻辑拼接起来,就能得到 StreamGragh, 注意转换顺序:
UserFunction ==> StreamOperator ==> Transformation ==> StreamNode
5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph

Flink on YARN Per-job 模式提交流程分析

入口类:ApplicatoinMaster: YarnJobClusterEntryPoint

深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解_大数据

深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解_大数据_02

Job提交流程源码分析

getStreamGraph(jobName) 生成 StreamGraph 解析

// 入口
StreamGraph streamGraph = getStreamGraph(jobName, true){
    // 通过 StreamGraphGenerator 来生成 StreamGraph
    StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(){
        streamGraph = new StreamGraph(....)
        for(Transformation<?> transformation : transformations) {
            transform(transformation);
        }
    }
}

transform(transformation){
    // 先递归处理该 Transformation 的输入
    Collection<Integer> inputIds = transform(transform.getInput());
    // 将 Transformation 变成 Operator 设置到 StreamGraph 中,其实就是添加 StreamNode
    streamGraph.addOperator(....);
    // 设置该 StreamNode 的并行度
    streamGraph.setParallelism(transform.getId(), parallelism);
    // 设置该 StreamNode 的入边 SreamEdge
    for(Integer inputId : inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
        // 内部实现
        // 构建 StreamNode 之间的 边(StreamEdge) 对象
        StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, ...){
            // TODO_MA 注释: 给 上游 StreamNode 设置 出边
            getStreamNode(edge.getSourceId()).addOutEdge(edge);
        }
        // TODO_MA 注释: 给 下游 StreamNode 设置 入边
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
}

execute(StreamGraph) 解析

// 入口
JobClient jobClient = executeAsync(streamGraph){
    // 执行一个 SreamGraph
    executorFactory.getExecutor(configuration).execute(streamGraph, configuration){
        // 第一件事:由 StreamGraph 生成 JobGragh
        JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
        // 第二件事:通过 RestClusterClient 提交 JobGraph 到Flink集群
        clusterClient.submitJob(jobGraph)
    }
}

// 通过 RestClusterClient 来提交 JobGraph
RestClusterClient.submitJob(JobGraph jobGraph){
    // 继续提交
    RestClusterClient.sendRetriableRequest(){
        // 通过 RestClient 提交
        RestClient.sendRequest(webMonitorHost, webMonitorPort, ...){
            // 继续提交
            RestClient.submitRequest(targetAddress,targetPort,httpRequest,responseType)
        }
    }
}

最终通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。

小结

01、用户根据 Flink 应用程序的编写套路,写好应用程序,打成 jar 包,通过 flink run 的命令来执行提交
02、这个命令的底层,其实是执行: CliFrontend 组件来执行提交
03、这个 CliFrontend 的内部,会通过反射的技术,来转交执行到用户自定义应用程序的 main()
04、先获取 StreamExecutionEnvironment 执行环境对象实例
05、执行算子:其实就是从 算子 ---> function ---> StreamOperator ---> Transformation
06、执行 StreamExecutionEnvironment 的 executor 方法来执行提交
07、首先遍历 StreamExecutionEnvironment 的 transformations 这个 list 来生成 StreamGraph,之后会继续被构建成 JobGraph
08、具体的内部的提交是通过 RestClusterClient 来执行提交
09、在通过 RestClusterClient 提交之前,其实还会做一件事:把 SreamGraph 变成 JobGraph,也还会先把 JobGraph 持久化成为一个磁盘文件
10、在这个 RestClusterClient 的内部,其实是通过 RestClient 来提交
11、RestClient 其实在初始化的时候,就初始化了一个 Netty 客户端
12、通过封装一个 HttpRequest 对象,包含了需要提交的 JobGraph 文件和 Jar 包等,通过 Netty 客户端链接服务端,发送请求对象到服务端
13、Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler

深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解_源码_03

WebMonitorEndpoint 处理 RestClient 的 JobSubmit 请求

最终处理这个请求: Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler。

// JobManager 服务端处理入口
JobSubmitHandler.handleRequest(){
    // 恢复得到 JobGraph
    JobGraph jobGraph = loadJobGraph(requestBody, nameToFile);
    // 通过 Dispatcher 提交 JobGraph
    Dispatcher.submitJob(jobGraph, timeout);
}

JobMaster 启动源码剖析

关键方法: jobMasterServiceFactory.createJobMasterService核心的工作是:

  • 创建 JobMaster 这个 RpcEndpoint 组件,负责通信。内部会创建一个 DefaultScheduler 调度组件,在初始化该调度组件的时候,会调用 ExecutionGraphFactory 的相关方法,来把 JobGraph 转换成 ExectionGraph
  • JobMaster 启动,跳转到 onStart() 方法。内部的主要工作,就是以下这三:
  • 启动心跳机制,维持和 ResourceManager,和 TaskExecutor 之间的心跳
  • 启动 SlotPoolImpl 这个 slot 管理组件。
  • 从 ZK 获取 ResourceManager 的地址,从而进行 JobMaster 向 ResourceManager 的注册
  • 启动的这个 JobMaster 负责这个 Job 中的所有的 Task 的 slot 的申请和 任务的派发,状态的跟踪,容错,还有 checkpoint等各种操作

JobMaster 和 ResourceManager/TaskExecutor 的心跳

深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解_大数据_04

JobMaster 向 ResourceManager 注册

// 启动 JobMaster
jobMaster.start(){
    JobMaster.onStart(){
        startJobExecution(){
            // 第一件大事:启动 JobMaster 必要的一些工作
            startJobMasterServices(){
                // 第一件事: 启动心跳机制
                this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
                this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices);
                // 第二件事: 启动 SlotPoolImpl
                slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
                // 第三件事: 从 ZK 获取 ResourceManager 的地址
                // 这儿就是 JobMaster 向 ResourceManager 执行注册的入口
                resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            }
            // 第二件大事:开始调度执行
            startScheduling();
        }
    }
}
ResourceManager.registerJobManager(){
    // ResourceManager 关于 JobMaster 的注册内部实现,重要的事情做了四件
    registerJobMasterInternal(jobMasterGateway, jobId, ....){
        // TODO_MA 马中华 注释: 生成 JobMaster 注册对象
        JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, ....);
        // TODO_MA 马中华 注释: 完成注册
        jobManagerRegistrations.put(jobId, jobManagerRegistration);
        jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
        // TODO_MA 马中华 注释: 加入心跳管理
        jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() {});
        // TODO_MA 马中华 注释: 返回 JobMaster 注册成功
        return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);
    }
}

Flink Graph 演变

深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解_流式计算_05

StreamGraph 构建和提交源码解析

深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解_流式计算_06

关于 StreamNode 的定义:

public class StreamNode {
    private final int id;
    private int parallelism;
    private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
    private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
    private final Class<? extends AbstractInvokable> jobVertexClass;
}

关于 StreamEdge 的定义:

public class StreamEdge implements Serializable {
    private final String edgeId;
    private final int sourceId;
    private final int targetId;
}

JobGraph 构建和提交源码解析

JobGraph: StreamGraph 经过优化后生成了 JobGraph,提交给 Flink 集群的数据结构。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
3、JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什么样的情况的下 Operator 能chain 在一起呢 ?答案是要满足以下 9 个条件:

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是 ALWAYS)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的 shuffle 方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解_源码_07

构建逻辑的重点代码:

1、在 connect 之间,调用的 createChain() 就是先执行优化,然后再生成 JobVertex
2、然后 调用 connect 之后,是为了组织关系
    1、先生成 IntermediateDataSet 和 JobEdge
    2、把 IntermediateDataSet 和 当前 JobVertex 设置为 JobEdge 的 source 和 target
    3、把 JobEdge 设置为这个 IntermediateDataSet 的消费者

关于 JobVertex 的定义:

public class JobVertex implements java.io.Serializable {
    private final JobVertexID id;
    private final ArrayList<IntermediateDataSet> results = new ArrayList<>();
    private final ArrayList<JobEdge> inputs = new ArrayList<>();
    private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
    private String invokableClassName;
}

关于 IntermediateDataSet 的定义:

public class IntermediateDataSet implements java.io.Serializable {
    private final IntermediateDataSetID id;
    private final JobVertex producer;
    private final List<JobEdge> consumers = new ArrayList<JobEdge>();
}

关于 JobEdge 的定义:

public class JobEdge implements java.io.Serializable {
    private final JobVertex target;
    private IntermediateDataSet source;
    private IntermediateDataSetID sourceId;
}

标签:Graph,Flink,private,Job,JobGraph,JobVertex,提交,执行
From: https://blog.51cto.com/u_16516690/9424094

相关文章

  • Flink 中的容错机制
    在Flink中,有一套完整的容错机制来保证故障后的恢复,其中最重要的就是检查点。1.检查点(Checkpoint)在流处理中,我们可以用存档读档的思路,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。遇到故障重启的时候,我们可以从检查点中“读档”,恢复......
  • Flink 中的状态管理
    1.Flink中的状态1.概述在Flink中,算子任务可以分为无状态和有状态两种情况。无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。我们之前讲到的基本转换算子,如map、filter、flatMap,计算时不依赖其他数据,就都属于无状态的算子。而有状态的算子任务,......
  • 洛谷题解-P1938 [USACO09NOV] Job Hunt S
    https://www.luogu.com.cn/problem/P1938题目描述Bessieisrunningoutofmoneyandissearchingforjobs.FarmerJohnknowsthisandwantsthecowstotravelaroundsohehasimposedarulethathiscowscanonlymakeD(1<=D<=1,000)dollarsinac......
  • 《Confusion Graph: Detecting Confusion Communities in Large Scale Image Classifi
    论文标题《ConfusionGraph:DetectingConfusionCommunitiesinLargeScaleImageClassification》混淆图:在大规模图像分类中检测混淆社区作者RuochunJin、YongDou、YueqingWang和XinNiu来自国防科技大学并行和分布式处理国家实验室,和上一篇是姊妹篇。初读摘要......
  • OpenSceneGraph (OSG)
    OpenSceneGraph(OSG)是一个开源的三维引擎,广泛应用于多个领域,如可视化仿真、游戏、虚拟现实、科学计算、三维重建、地理信息、太空探索、石油矿产等。它由标准C++和OpenGL编写而成OpenGL(英语:OpenGraphicsLibrary,译名:开放图形库或者“开放式图形库”)是用于渲染2D、3D矢量图形......
  • 初中英语优秀范文100篇-070A Job I Want to Do in the Future-我未来想做的工作
    PDF格式公众号回复关键字:SHCZFW070记忆树1Everyonehashisdreamjob.SodoI.翻译每个人都有他的理想工作。我也一样。简化记忆工作句子结构主语:Everyone(每个人)谓语:has(有)宾语:hisdreamjob(他的理想工作)SodoI是一个倒装句,表示"Ialsohavemydreamjob",其......
  • xxl-job之API的方式接入
    目录1xxl-job1.1简介1.2分析1.3学***l-job源码1.4改造项目1.4.1接口调用1.4.1.1对接登录接口1.4.1.2对接执行器接口1.4.1.3对接任务接口1.4.2创建新注解1.4.3自动注册核心1.4.4自动装配1xxl-job1.1简介xxl-job是一款非常优秀的任务调度中间件,轻量级、使用简单、......
  • pprof_graphviz.bat
    @echooffSETLOCALEnableDelayedExpansionfor/d%%din(%USERPROFILE%\sdk\*)do(setsdk_dir=%%d)SETLOCALDisableDelayedExpansionrem下面这行可能需要根据机器修改一下set"go_dirs=%sdk_dir%\bin;%USERPROFILE%\go\bin"set"graphviz_dir=%~dp......
  • SpringBoot中集成XXL-JOB分布式任务调度平台,轻量级、低侵入实现定时任务
    场景XXL-JOBhttps://www.xuxueli.com/xxl-jobXXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。特性:1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生......
  • WorkFlow,Process,Job,Task各详细解释和他们的区别
    来自你的消息:请详细解释WorkFlow,Process,Job,Task各详细解释和他们的区别来自WeTabAI的消息:当谈到工作流程(Workflow),流程(Process),任务(Job)和任务(Task)时,它们在企业管理和信息技术领域中有不同的含义和用法。下面是它们的详细解释和区别:工作流程(Workflow):工作流程是一系列有序的......