首页 > 其他分享 >flinkv1.14启动过程分析

flinkv1.14启动过程分析

时间:2023-06-06 09:44:23浏览次数:41  
标签:分析 启动 flinkv1.14 创建 start job 集群 configuration 方法

今天阅读了一下flink v1.14的代码,首先分析一下flink启动的过程。
首先分2种,一种是SessionClusterEntrypoint,一种是JobClusterEntrypoint。分别对应session 模式和per-job模式。
session模式就是一次启动,可以执行多个job,执行完job还有后台进程在等待用户提交新的job。
per-job模式就是一次启动,执行一个job,执行完成就退出全部程序。

SessionClusterEntrypoint又分为(1)YarnSessionClusterEntryPoint;(2) KubernetesSessionClusterEntrypoint;(3)StandaloneSessionClusterEntrypoint;
见名知意:
第一个是基于Yarn的ResourceManager来管理集群,然后是启动后可以执行多次job;
第二个是基于Kubernetes来管理集群,然后是启动后可以执行多长job;
第三个是基于Flink自己的ResourceManager来管理集群,然后也是启动后可以执行多次job;

JobClusterEntrypoint又分为(1)YarnJobClusterEntrypoint;(2)StandaloneJobClusterEntrypoint;(3)MesosJobClusterEntrypoint;
见名知意:
第一个是基于Yarn的ResourceManager来管理集群,然后是一次运行一个job,运行完成一个job就退出整个集群;
第二个是基于Flink自己的ResourceManager来管理集群,然后是一次运行一个job,运行完成一个job就退出整个集群;
第三个是基于Mesos来管理集群,然后是一次运行一个job,运行完成一个job后就退出整个集群;

我们今天用StandaloneSessionClusterEntrypoint类来分析flink如何启动的。

打开StandaloneSessionClusterEntrypoint类,找到main()方法;

        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(
                LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        final EntrypointClusterConfiguration entrypointClusterConfiguration =
                ClusterEntrypointUtils.parseParametersOrExit(
                        args,
                        new EntrypointClusterConfigurationParserFactory(),
                        StandaloneSessionClusterEntrypoint.class);
        Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

        StandaloneSessionClusterEntrypoint entrypoint =
                new StandaloneSessionClusterEntrypoint(configuration);

        ClusterEntrypoint.runClusterEntrypoint(entrypoint);
    } 

这里就是启动的所有代码,最核心的我们看最后一行

 ClusterEntrypoint.runClusterEntrypoint(entrypoint);

这行代码,我们点runClusterEntrypoint进入ClusterEntrypoint类的runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) 方法;
runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) 方法中有

clusterEntrypoint.startCluster();

这行代码,我们点击startCluster进入ClusterEntrypoint类的startCluster()方法;
startCluster()方法中有

runCluster(configuration, pluginManager);

这行代码,我们点击runCluster进入ClusterEntrypoint类的runCluster(Configuration configuration, PluginManager pluginManager)方法;
runCluster(Configuration configuration, PluginManager pluginManager)方法中有

initializeServices(configuration, pluginManager);

这行代码,我们点击initializeServices进入ClusterEntrypoint类的initializeServices(Configuration configuration, PluginManager pluginManager)方法;
initializeServices(Configuration configuration, PluginManager pluginManager)方法中有几个重要的核心启动方法,截图如下:

上面的截图中最重要的是做了几件事情:
(1):从configuration中获取配置的RPC地址和portRange参数,根据配置地址和端口信息创建集群所需的公用的commonRpcService服务。更新configuration中的address和port配置,支持后续的集群高可用服务。

(2):创建ioExecutor线程池,用于集群组件的I/O操作,如本地文件数据读取和输出等。
(3):创建并启动haService,向集群组件提供高可用支持,集群中的组件都会通过haService创建高可用服务。
(4):创建并启动blobServer,存储集群所需要的Blob对象数据,例如JobGraph中的Jar包等。blobServer中存储的数据能够被JobMaster和TaskManager访问。
(5):创建heartbeatServices,主要用于创建集群组件之间的心跳检测,例如ResourceManager与JobManager之间的心跳服务。
(6):创建metricRegistry服务,用于注册集群监控指标收集。
(7):创建archivedExecutionGraphStore服务,用于压缩并存储集群中的ExecutionGraph,主要有FileArchivedExecutionGraphStore和MemoryArchivedExecutionGraphStore两种实现类型。

执行完initializeServices方法后,回到上面的runCluster(Configuration configuration, PluginManager pluginManager)方法里,这个方法里还有一行代码比较重要:

            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);

核心是调用了createDispatcherResourceManagerComponentFactory(configuraion)方法,这个方法点进去,会发现它是一个抽象类方法,具体的实现类有好多种:

实现类:

我们今天只分析StandaloneSessionClusterEntrypoint类,所以进去StandaloneSessionClusterEntrypoint类,找到createDispatcherResourceManagerComponentFactory(Configuration configuration) 方法:

    @Override
    protected DefaultDispatcherResourceManagerComponentFactory
            createDispatcherResourceManagerComponentFactory(Configuration configuration) {
        return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
                StandaloneResourceManagerFactory.getInstance());
    }

这里会发现StandaloneResourceManagerFactory.getInstance(),会返回一个StandaloneResourceManagerFactory类单例对象。
StandaloneResourceManagerFactory顾名思义就是用来创建StandaloneResourceManager的一个工厂对象。这是后边要用到的一个工厂对象,不过呢现在被包在更大的DefaultDispatcherResourceManagerComponentFactory类里了

可以看到createDispatcherResourceManagerComponentFactory方法返回的类型是DefaultDispatcherResourceManagerComponentFactory类,顾名思义,DefaultDispatcherResourceManagerComponentFactory是用于创建DefaultDispatcherResourceManagerComponent对象的工厂。
接着我们去看看DefaultDispatcherResourceManagerComponentFactory这个工厂对象的create()方法,在这个方法里,首先会创建和启动WebMointorEntrypoint对象,作为Dispatcher对应的Rest entrypoint,通过Rest API将JobGraph提交到Dispatcher上,同时,WebMonitorEndpoint也会提供Web UI需要的Rest API接口实现。

紧接着,DefaultDispatcherResourceManagerComponentFactory这个工厂对象的create()方法里还调用了创建ResourceManager组件的方法,创建了ResourceManager并启动。如下图

紧接着,DefaultDispatcherResourceManagerComponentFactory这个工厂对象的create()方法里还调用了创建DispatcherRunner组件的方法,创建了DispatcherRunner并启动。如下图

在上图中还有

resourceManagerService.start();

这句代码点进去看看,会进入到ResourceManagerService接口的start()方法,ResourceManagerService接口类中有一个实现类是ResourceManagerServiceImpl类。
所以看一下ResourceManagerServiceImpl类的start()方法

    @Override
    public void start() throws Exception {
        synchronized (lock) {
            if (running) {
                LOG.debug("Resource manager service has already started.");
                return;
            }
            running = true;
        }

        LOG.info("Starting resource manager service.");

        leaderElectionService.start(this);
    }

最后一行代码leaderElectionService.start(this);点进去
会进入到LeaderElectionService接口类的start()方法;该接口类有几种类的实现,其中一种是DefaultLeaderElectionService类。
我们看看DefaultLeaderElectionService类的start()方法:

    @Override
    public final void start(LeaderContender contender) throws Exception {
        checkNotNull(contender, "Contender must not be null.");
        Preconditions.checkState(leaderContender == null, "Contender was already set.");

        synchronized (lock) {
            leaderContender = contender;
            leaderElectionDriver =
                    leaderElectionDriverFactory.createLeaderElectionDriver(
                            this,
                            new LeaderElectionFatalErrorHandler(),
                            leaderContender.getDescription());
            LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);

            running = true;
        }
    }

在该方法中,我们重点关注一下createLeaderElectionDriver(···)方法,我们跟踪进去:
会进入到LeaderElectionDriverFactory接口类的createLeaderElectionDriver()方法,该接口类也有几种实现,其中一种是ZooKeeperLeaderElectionDriverFactory类,我们来看看这个类的createLeaderElectionDriver方法实现。

    @Override
    public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
            LeaderElectionEventHandler leaderEventHandler,
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception {
        return new ZooKeeperLeaderElectionDriver(
                client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
    }

可以看到最核心的代码是

new ZooKeeperLeaderElectionDriver(
                client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);

我们跟踪进去ZooKeeperLeaderElectionDriver(···)方法,会进入到ZooKeeperLeaderElectionDriver类的ZooKeeperLeaderElectionDriver()方法,该方法的具体实现我们看一下:

    public ZooKeeperLeaderElectionDriver(
            CuratorFramework client,
            String path,
            LeaderElectionEventHandler leaderElectionEventHandler,
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception {
        checkNotNull(path);
        this.client = checkNotNull(client);
        this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
        this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
        this.leaderContenderDescription = checkNotNull(leaderContenderDescription);

        leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
        leaderLatch = new LeaderLatch(client, leaderLatchPath);
        this.cache =
                ZooKeeperUtils.createTreeCache(
                        client,
                        connectionInformationPath,
                        this::retrieveLeaderInformationFromZooKeeper);

        running = true;

        leaderLatch.addListener(this);
        leaderLatch.start();

        cache.start();

        client.getConnectionStateListenable().addListener(listener);
    }

这里核心关注2行代码:

        leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
        leaderLatch = new LeaderLatch(client, leaderLatchPath);

可以看到选举也是用了LeaderLatch机制,这个机制其实好多分布式软件都在用,大家可以去了解一下LeaderLatch选主节点的机制。

标签:分析,启动,flinkv1.14,创建,start,job,集群,configuration,方法
From: https://www.cnblogs.com/lukairui/p/17458817.html

相关文章

  • 对粮食产量进行大数据分析
    一、选题背景 近年来,我国各个省份的粮食总产量以及增量增速逐渐倍受关注,如何增加粮食产量也成为了人们关注的热点话题。通过互联网上的信息发布网站,我获取并整合了各省粮食产量数据。其中,“中国产业信息网”每年发布的国内新一年的粮食产量信息。网站发布的信息包括近些年的粮......
  • R语言ARMA-GARCH模型金融产品价格实证分析黄金价格时间序列
    全文链接:http://tecdat.cn/?p=32677原文出处:拓端数据部落公众号研究黄金价格的动态演变过程至关重要。文中以黄金交易市场下午定盘价格为基础,帮助客户利用时间序列的相关理论,建立了黄金价格的ARMA-GARCH模型,并对数据进行了实证分析,其结果非常接近。利用该模型可动态刻画黄金......
  • 对人力资源分析案例研究数据集进行数据分析
    一.选题背景近年就业面临着诸多挑战。一方面,经济的不景气和就业市场的不稳定性使得就业难度加大,就业形势越来越严峻。另一方面,高校毕业生的数量不断增加,而就业岗位的数量却没有相应增加,导致竞争激烈,难以找到合适的工作。此外,还有一些特殊的问题,如女性就业歧视、农村学生就业难等,......
  • Python爬取郑州安居客租房数据采集分析
    一、选题背景在现在,虽然我国实行楼市调控,使得总体的房价稳定下来,但是我国房价还是处于一个高水平之上。在这种情况下,大批在郑奋斗的年轻人选择租房,所以此次数据分析可以使在郑的年轻人了解郑州租房现状,让年轻人在租房时可以选到更加适合的房源。二、爬虫设计方案1、爬虫网址郑......
  • 【Python网络爬虫课程设计】B站up主——老番茄视频数据爬取+数据可视化分析
    一、选题背景1.背景随着大数据时代的来临,网络爬虫在互联网中的地位将越来越重要。互联网中的数据是海量的,如何自动高效地获取互联网中我们感兴趣的信息并为我们所用是一个重要的问题,而爬虫技术就是为了解决这些问题而生的。对于身为数据科学与大数据技术专业的学生来说,网络......
  • 大数据分析--南瓜籽品种分类
    大数据分析--南瓜籽品种分类 1.选题背景南瓜籽为南瓜的种子,葫芦科南瓜属植物南瓜CucurbitamoschataDuch.的种子。一端略尖,外表黄白色,边缘稍有棱,表面带有毛茸。除去种皮,可见绿色菲薄的胚乳,具有健脑提神、降压镇痛、驱虫等功效。南瓜籽因其含有足量的蛋白质、脂肪、碳水化合物......
  • python机器学习——点评评论分析
    (一)选题背景:随着广大用户“即需要、即外卖、即使用”的方便快捷的“外卖生活方式”的形成和普及,如今外卖行业不仅可以满足用户餐饮商品的在线即时购物需求,还可以满足饮食、水果、酒水饮料、家居日用、母婴用品、数码家电、服饰鞋包、美妆护肤、医药等各种品类商品。对于服务行业来......
  • CC6链子分析
    <1>环境分析实际上CC6链子又是CC1的一个变种没有版本限制找到这个漏洞的人又开辟出一个新的线路通过TiedMapEntry.hashcode()去触发CC1里的LazyMap.get()这里我们是继续沿用的CC1的项目jdk版本:jdk8u65pom.xml内容:<dependencies><dependency>......
  • Loadrunner考核试卷分析
    XXX科技学院试卷20  /20  学年   第 学期        课程所属部门:                  课程名称:                        课程编号:               考试方式:(A、B、开、闭)卷使用班级:         ......
  • CATIA-CATIA V5-6R2017 WIN10 64位版本安装+许可证的安装配置(CATIA启动时必须要调用许
    CATIAV5-6R2017WIN1064位安装步骤:1.先使用“百度网盘客户端”下载CATIAV5-6R2017软件安装包到电脑磁盘英文路径文件夹下,并解压缩,安装前先断开电脑网络,然后双击打开CATIAV5R2017文件夹,找到setup.exe,鼠标右击选择【以管理员身份运行】2.正在准备安装中,稍等片刻自动进入安......