taskManager是flink的worker节点,负责slot的资源管理和task执行 一个taskManager就是一台服务器的抽象 TaskManager基本资源单位是slot,一个作业的task会部署在一个TM的slot上运行,TM会负责维护本地的slot资源列表,并与Master和JobManager进行通信 启动主类:TaskManagerRunner TaskManagerRunner.main() 启动函数:runTaskManagerProcessSecurely(args); Flink集群主节点和从节点,每个节点都有一个全局的唯一ID,叫ResourceID
- load解析main方法参数和flink-conf.yaml配置信息
-
runTaskManagerProcessSecurely
- 在主节点启动的时候启动了PluginManager
-
然后再runTaskManager里面通过线程来启动TaskManager
SecurityUtils.getInstalledContext() .runSecured(() -> runTaskManager(configuration, pluginManager));
-
构建tm实例,taskManagerRunner是standalone下TaskManager的可执行入口点。构造相关组件(network,IO,memoryManager,RPCService等)并启动
-
New TaskManagerRunner初始化各种基础服务组件
- 初始化各种组件
- WebMonitorEndpoint
- Dispatcher
- resourceManager
- start方法是发送一个start消息确认启动
-
New TaskManagerRunner初始化各种基础服务组件
TaskManager启动分三件事
-
启动基础服务
- 启动7大基础服务
-
初始化taskManagerService
- startTaskManager
- 线程池,处理回调
- Flink中大量的异步编程
- CompletableFeature大量提交请求的执行和回调的执行都是由线程池执行
- feature.xxx(()->xxx(),executor)
- 定时的线程池(NewScheduledThreadPool):创建一个线程池,它可安排在延迟时间后运行命令或者定期地执行
-
HA高可用服务,用zookeeper来做,在flink-conf.yaml中配置high-availability = zookeeper模式
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( configuration, executor, AddressResolution.NO_ADDRESS_RESOLUTION, rpcSystem, this);
-
createRpcService
- createAndStart初始化actor System
-
使用了代理模式
- 代理模式是一种结构型设计模式,它允许一个对象代表另一个对象,从而控制对这个对象的访问。在Flink的RPC服务中,代理模式被用于创建远程调用的代理对象,以便在客户端和服务器之间进行通信。
- 具体来说,在Flink的RPC框架中,RpcService负责创建和启动RpcEndpoint组件的RpcServer,并且提供了与远程RpcServer建立连接的能力。当调用RpcService的connect方法时,它会创建一个代理对象,这个代理对象实现了RpcGateway接口,用于与远程的RpcEndpoint进行通信。
- 当一个TaskManager需要与ResourceManager通信时,它会通过RpcService创建一个连接。RpcService会使用Akka框架创建一个ActorRef,然后通过这个ActorRef创建一个AkkaInvocationHandler或者FencedAkkaInvocationHandler。这个InvocationHandler会被用来创建一个RpcGateway的代理对象,该代理对象可以在本地代表远程的RpcEndpoint
-
heartbeat服务
- taskExecutor心跳组件启动,跟resourceManager维持心跳
- jobMaster心跳组件启动,跟resourceManager维持心跳
- interval=10s并且timeout=50s,也就是连续5次心跳没有就挂了,在flink-conf.yaml设置
- metrics监控服务
-
Blob资源删除服务:两个定时任务,定时检查,删除过期的job资源文件,通过引用计数法来判断文件是否过期
- 主节点启动BlobServer
- 从节点启动BlobCacheService
-
在startTaskManager的最后,初始化TaskExecutor
-
Task资源
final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
- 1台物理节点的资源:cpu,memory。network
-
Task资源
-
taskManagerService
- 初始化了很多的taskManager要运行过程中需要的服务,真正需要的用来对外提供服务的组件
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, taskExecutorBlobService.getPermanentBlobService(), taskManagerMetricGroup.f1, ioExecutor, fatalErrorHandler, workingDirectory);
- 里面有个shuffleEnvironment,上下游StreamTask有shuffle动作,在过程中,许哟啊很组件工作,这个组件为shuffle提供创建组件的环境支持
-
TaskSlotTable<Task> taskSlotTable
- 哪些task在哪些slot里执行,使用table关联
- 当一个slot被调度执行了一个task,就会生成taskslot对象
- 一个task是由一个线程执行
- Flink:resourceManager+taskManager(slot管理+task执行)
-
方法返回TaskExecutor,初始化taskManager最重要的目标就是启动taskExecutor
return new TaskExecutor( rpcService, taskManagerConfiguration, highAvailabilityServices, taskManagerServices, externalResourceInfoProvider, heartbeatServices, taskManagerMetricGroup.f0, metricQueryServiceAddress, taskExecutorBlobService, fatalErrorHandler, new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()), delegationTokenReceiverRepository);
- 额外
Standalone
1个Yarn Container JobManager 启动ResourceManager 启动Dispatcher 其他的 Yarn Container TaskExecutor 应用程序提交的时候,启动一个JobMaster,再去调度StreamTask执行,向JobMaster汇报心跳 Dispatcher调度一个JobMaster,
-
TaskManager/TaskExecutor注册和心跳
- 提交task
- 申请和释放slot
-
TaskExecutor的初始化:
- 初始化2个心跳管理器,jobManagerHeartbeat resourceManagerHeartbeat
-
hardware Description把硬件资源抽象成一个对象
this.hardwareDescription = HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());
-
onStart方法
- 继承的RpcEndpoint,所以需要执行
-
启动taskExecutorService
-
监控ResourceManager
- 连接ResourceManager
- 注册
- 维持心跳
- 当前TaskExecutor会监控RM的变更
- 启动TaskSlotTable服务
- 监控JobMaster
- 启动FileCache
-
监控ResourceManager