首页 > 编程语言 >Flink源码学习(6)StreamTask的初始化和执行

Flink源码学习(6)StreamTask的初始化和执行

时间:2024-05-03 12:46:00浏览次数:28  
标签:初始化 flink task Flink StreamTask streamTask state 源码 Task

Stream初始化

taskExecutor执行一个Task

当taskExecutor接受提交Task执行的请求,会调用:
CompletableFuture<Acknowledge> submitTask(
        TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout);
提交Task到TM 提交Job过来的JobManager和现在Active JobManager不是同一个的时候,就拒绝提交 初始化task的时候,内部初始化一个执行线程,就是task类启动,把TDD变成Task
Task task =
        new Task(
                jobInformation,
                taskInformation,
                tdd.getExecutionAttemptId(),
                tdd.getAllocationId(),
                tdd.getProducedPartitions(),
                tdd.getInputGates(),
                memoryManager,
                sharedResources,
                taskExecutorServices.getIOManager(),
                taskExecutorServices.getShuffleEnvironment(),
                taskExecutorServices.getKvStateService(),
                taskExecutorServices.getBroadcastVariableManager(),
                taskExecutorServices.getTaskEventDispatcher(),
                externalResourceInfoProvider,
                taskStateManager,
                taskManagerActions,
                inputSplitProvider,
                checkpointResponder,
                taskOperatorEventGateway,
                aggregateManager,
                classLoaderHandle,
                fileCache,
                taskManagerConfiguration,
                taskMetricGroup,
                partitionStateChecker,
                getRpcService().getScheduledExecutor(),
                channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));
  1. task的构造方法

    1. 封装一个TaskInfo,把task在执行过程中必备的描述信息放入
      this.taskInfo =
              new TaskInfo(
                      taskInformation.getTaskName(),
                      taskInformation.getMaxNumberOfSubtasks(),
                      executionAttemptID.getSubtaskIndex(),
                      taskInformation.getNumberOfSubtasks(),
                      executionAttemptID.getAttemptNumber(),
                      String.valueOf(slotAllocationId));

       

    2. 创建shuffle context
      final ShuffleIOOwnerContext taskShuffleContext =
              shuffleEnvironment.createShuffleIOOwnerContext(
                      taskNameWithSubtaskAndId, executionId, metrics.getIOMetricGroup());

       

    3. 初始化ResultPartition和ResultSubPartition
  1. 一个Task的执行有输入和输出,关于输出的抽象ResultPartition和ResultSubPartition
  2. 具体实现是resultPartition
  3. 批处理创建一个ReleaseOnConsumptionResultPartition,流处理创建ResultPartition
  4. subPartition的数量等于下游task的个数
  1. 输入的抽象是InputGate和InputChannel,也会init
    1.   singleInputGate创建一个给inputGate

        创建inputChannel数组,有两种情况,createUnknownInputChannel也可能是createKnownInputChannel,正常情况是known的。当前是本地获取数据的话localRecoveredInputChannel,正常是remote

  2. 初始化一个thread,task的构造方法
  3. sourceStreamTask和streamTask初始化
    1. 创建recordWriter
    2. 处理输入streamTask.processInput=new mailboxProcess
    3. 创建状态后端stateBackend
    4. 通过stateBackend创建checkpointStorage
    5. 获取分流器
      1. 用于调整并行度,如果上下游streamNode的并行度一致就用forwardPartition分发策略
      2. 如果不一致就使用RebalancePartition实现

    

提交job到standalone集群运行的时候,在client构建streamGraph顶点式streamNode,边是streamEdge的时候,根据生成的transformation为streamGraph生成stramNode,生成Node的hi后判断如果该operator是streamSource的话,就会指定该streamTask的invokableClass为sourceStreamTask   task.Run
流式引擎,上游task执行完一条数据计算,就会发送结果给下游task,规则由streamPartitioner来指定 一条数据在一个task执行完毕之后,就要发送给下游另外一个task,这个过程,网络数据传输过程,由netty支持的,具体是由inputChannel实现的
  1. 状态从created、改为deploying
  2. 拉起resultPartitionWriter和inputGate
    1. 构造task的时候,就已经初始化过了
    2. 注册result Partition Writer
    3. 如果注册过了就报错
    4. inputChannel分配buffer
  3. 包装task的各种组件 environment
  4. 通过反射实例化streamTask
    1. sourceStreamTask
      1. 是flink job最开始的task,对接source,有一个专门的线程接受数据
      2. source用于产生data的一个线程
    2. oneInputStreamTask
  5. 状态从deploying改为running
  6. 启动streamTask
  7. streamTask正常结束处理buffer数据
  8. 状态从running改为finished
state管理源码剖析 flink具有有状态计算的特点 state是flink job的task运行过程中产生的中间数据,辅助task执行计算,同时存储中间状态帮助flink job的重启恢复。保存状态非常重要 state要配合checkpoint机制保证flink失败之后正确的回复错误 flink自己周期性的快照叫checkpoint,手动保存叫savepoint flink状态分类 keyedState具体的key绑定,只能在keyedStream上的函数和算子中使用 OperatorState和operator的一个特定的并行实例绑定,例如kafka connector Keyed state也可以看作是operator state的一种分区partition形式,每一个key都关联一个状态分区state-partition 无论是operator state还是keyed state都有两种形式:Managed state和raw state。 所有跟state有关的都由stateBackend完成,核心有abstractStateBackend,三种存储类型 FileSystem将state存储在taskmanager的内存中,checkpoint存储在文件系统内存中,适合存储量大,文件系统中需要序列化反序列化的开销速度慢点,但是比memory好 Memory 保存在内存中,很快,适合小state任务,不能上生产。存储在内存中 RocksDB,基于内存的数据库系统,把状态存储在reocksDB,ckeckpoint存在文件系统,比memory快,GC少,支持异步snapshot持久化,用于存储state量大的,window窗口很长的一些job合适    
  1. StreamTask运行

 

  1. beforeInvoke
    1. operatorChain的初始化,recoveredChannelState初始化
    2. 当第一个operator执行完毕的时候就有collector收集计算结果再去调用第二个operator执行processElement,通过output收集处理之后的结果数据
      1. userFunction.run:function->transformation->streamOperator,headOperator的run方法
      2. socket连接读数据
      3. channelSelector确定目标channel决定record被分发到哪一个分区
  2. runMailboxLoop
不停的处理mail
  1. afterInvoke
完成task结束之前的操作
  1. cleanUpInvoke

 

标签:初始化,flink,task,Flink,StreamTask,streamTask,state,源码,Task
From: https://www.cnblogs.com/ak918xp/p/18171113

相关文章

  • 26-Spring源码分析(一)
    1.Spring架构设计Spring框架是一个分层架构,他包含一系列的功能要素,并被分为大约20个模块。1.1设计理念Spring是面向Bean的编程(BOP:BeanOrientedProgramming),Bean在Spring中才是真正的主角。Bean在Spring中作用就像Object对OOP的意义一样,没有对象的概念就像......
  • Spring6 当中的 Bean 循环依赖的详细处理方案+源码解析
    1.Spring6当中的Bean循环依赖的详细处理方案+源码解析@目录1.Spring6当中的Bean循环依赖的详细处理方案+源码解析每博一文案1.1Bean的循环依赖1.2singletion下的set注入下的Bean的循环依赖1.3prototype下的set注入下的Bean的循环依赖1.4singleton下的构造注......
  • 深入浅出Spring源码,终于把学Spring源码的技巧吃透了!
    前言本人从事Java架构十余年,也曾经在几家一线大厂任职多年,一直认为最难啃的当属Spring源码,为此我自己录制了一套Spring由浅入深的源码教程,根据自己多年来对于Spring源码整理的课纲一步步带你深入学习Spring源码,教程课件都打包好提供给你mian费学习!由于官方限制,对Spring源码感兴......
  • public void add(int index, E element)的方法源码分析
    publicclassArrayList<E>extendsAbstractList<E>implementsList<E>,RandomAccess,Cloneable,java.io.Serializable{publicvoidadd(intindex,Eelement){rangeCheckForAdd(index);//校验数组是否越界......
  • Apache DolphinScheduler支持Flink吗?
    随着大数据技术的快速发展,很多企业开始将Flink引入到生产环境中,以满足日益复杂的数据处理需求。而作为一款企业级的数据调度平台,ApacheDolphinScheduler也跟上了时代步伐,推出了对Flink任务类型的支持。Flink是一个开源的分布式流处理框架,具有高吞吐量、低延迟和准确性等特点,广泛......
  • 稳扎稳打 部署丝滑 开源即时通讯(IM)项目OpenIM源码部署流程(linux windows mac)
    背景OpenIM包含多个关键组件,每个都是系统功能必不可少的一部分。具体来说,MongoDB用于持久化存储;Redis用作缓存;Kafka用于消息队列;Zookeeper用于服务发现;Minio用于对象存储。这些组件的众多可能会增加部署的复杂性。此外,系统包含多个微服务模块,这要求有效管理进程的启动、停止......
  • JDK源码分析-TreeSet
    概述TreeSet是Java集合框架中用于存储唯一元素的树形数据结构,它实现了NavigableSet接口,这意味着TreeSet中的元素不仅是有序的,还支持一系列的导航方法。TreeSet的内部实现主要依赖于TreeMap,通过TreeMap的键来维护元素的排序。 类图从以上类图可以看到,TreeSet实现了三个接口,......
  • Linux内核源码-存储驱动之 QSPI Flash
    传输方式DIO/QIO/DOUT/QPIQPI模式(QuadPeripheralInterface),所有阶段都通过4线传输。与之相对的是SPI。SPI模式:纯种SPI(MISO/MOSI两个数据线)DOUT全称DualI/O,命令字和地址字均为单线,仅在数据阶段为双线。QOUT全称QuadI/O,命令字和地址字均为单线,仅在数据阶段为双线......
  • Flink 在 Debian 环境的安装
    目录基础环境下载安装运行基础环境启动模式:LocalDebian11.3Java11下载下载地址:https://flink.apache.org/zh/downloads/下载文件:flink-1.19.0-bin-scala_2.12.tgz安装解压安装包:tar-xvfflink-1.19.0-bin-scala_2.12.tgzcdflink-1.19.0修改flink-conf.yaml......
  • 大厂50万节点监控系统架构设计&Prometheus底层源码级剖析
    大厂50万节点监控系统架构设计&Prometheus底层源码级剖析 设计和实现一个大规模监控系统需要深入考虑架构设计、可伸缩性、性能优化等方面。下面是一个关于大规模监控系统架构设计的简要指南,以及有关Prometheus底层源码的剖析:大规模监控系统架构设计:1.架构设计原......