首页 > 其他分享 >Flink (三):核心概念(并行度、算子链、任务槽)

Flink (三):核心概念(并行度、算子链、任务槽)

时间:2025-01-12 20:33:47浏览次数:3  
标签:slot task Flink TaskManager 算子 并行度

1. 作业提交

Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(分离模式),或保持连接来接收进程报告(附加模式)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程./bin/flink run ...中运行。

可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为standalone 集群启动、在容器中启动、或者通过YARN等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。

1.1 JobManager

JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:

  • ResourceManager: ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

  • Dispatcher: Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。

  • JobMaster:JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby

1.2 TaskManager

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子

2. 并行度

一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。如图中Source 和Map 设置为了一个Task, 这个Task 的并行度为2 ,故存在两个sub_task。

2.1 设置并行度

一个 task 的并行度可以从多个层次指定:

2.1.1 算子层次

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(value -> value.f0)
    .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");

每个算子后,都可以进行并行度设置,这时的并行度设置只会应用到设置的对应算子

2.1.2 执行环境层次

Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = [...];
wordCounts.print();

env.execute("Word Count Example");

这时设置的并行度对运行程序中,所有算子生效

2.1.3 客户端层次

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通过如下方式指定并行度:

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}

2.1.4 系统层次

可以通过设置 Flink 配置文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。可以通过参考官方配置文档获取更多细节。

这几个层次的优先级 算子层次 >> 执行环境层次 >> 客户端层次 >> 系统层次

2.2 设置最大并行度

最大并行度在每个作业和每个算子粒度上进行设置,决定了有状态算子能够扩展的最大并行度。目前,作业启动后无法改变算子的最大并行度,除非丢弃该算子的状态。设置最大并行度的原因,区别于允许有状态算子无限扩展,是因为它会对应用程序的性能和状态大小产生影响。Flink需要维护特定的元数据来支持重新调整状态的能力,而这些元数据随着最大并行度的增加而线性增长。通常,您应该选择一个足够高的最大并行度,以满足未来的可扩展性需求,同时保持它足够低,以维持合理的性能。

最大并行度和并行度的主要区别在于:并行度决定了算子处理数据的sub_task 数目,但是对于有状态的算子,除了需要对应的处理数据,还需要额外的处理状态,那么这里最大并行度就是限制有状态算子的最大的sub_task的数目。

最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,你可以通过调用 setMaxParallelism() 方法来设定最大并行度。

默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768

注意:为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。从之前的作业恢复时,改变该作业的最大并行度将会导致状态不兼容。

3. 算子链

将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:

但是构建算子链是有一定条件的:

  • 上下游算子实例处于同一个 SlotSharingGroup 中

  • 两个算子间的物理分区逻辑是 ForwardPartitioner 

  • 两个算子间的 shuffle 方式不是批处理模式

  • 上下游算子实例的并行度相同

3.1 创建新链

基于当前算子创建一个新的算子链。后面两个 map 将被链接起来,而 filter 和第一个 map 不会链接在一起。

someStream.filter(...).map(...).startNewChain().map(...);

3.2 禁止链接

禁止和 map 算子链接在一起。

someStream.map(...).disableChaining();

3.3 配置 Slot 共享组

为某个算子设置 slot 共享组。Flink 会将同一个 slot 共享组的算子放在同一个 slot 中,而将不在同一 slot 共享组的算子保留在其它 slot 中。这可用于隔离 slot 。如果所有输入算子都属于同一个 slot 共享组,那么 slot 共享组从将继承输入算子所在的 slot。slot 共享组的默认名称是 “default”,可以调用 slotSharingGroup(“default”) 来显式地将算子放入该组。

someStream.filter(...).slotSharingGroup("name");

4. 任务槽

每个 worker(TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。

每个 task slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。

通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销。

默认情况下,Flink 允许 subtask 共享 slot,即便它们是不同的 task 的 subtask,只要是来自于同一作业即可。结果就是一个 slot 可以持有整个作业管道。允许 slot 共享有两个主要优点:

  • Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。

  • 容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window) 一样多的资源。通过 slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。

5. 并行度和任务槽关系

  • Flink 在执行作业时,根据作业的并行度来决定需要多少个 slot。如果一个算子的并行度为 4,那么它将需要 4 个 slot 才能同时执行这 4 个并行任务。
  • 作业的并行度和 TaskManager 的可用 slot 数量直接决定了作业的执行情况。如果集群中没有足够的 slot,作业可能会等待调度。如果是standalone 会话模式启动flink集群,则并行度 > slot 数目则会报错,

    如:我们standalone 会话模式在本地启动flink 集群

此时可以发现,slot 数目为1,当我们在web界面提交作业设置并行度是2 时,就会出现如下报错,提示资源不足。
 

Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
	at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:226)
	... 40 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
	... 38 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

如果是使用YARN,则不会出现这种情况,YARN会根据任务所需要的情况,启动对应数目的TaskManager 来保证作业的稳定运行

标签:slot,task,Flink,TaskManager,算子,并行度
From: https://blog.csdn.net/weixin_41914554/article/details/145058688

相关文章

  • pytorch算子调用过程:以rand算子为例
    通过pytorch的torch.profiler带调用栈采集运行过程可以看到如下信息,通过chrome://tracing查看,图上每个小条条表示一个traceEvent,json中的信息如下图所示,其中cat表示traceEvent的类型,有cpu_op,python_function等,前者表示torch的cpp代码中定义的操作,后者表示pytorch的python代码......
  • Spark vs Flink分布式数据处理框架的全面对比与应用场景解析
    1.引言1.1什么是分布式数据处理框架随着数据量的快速增长,传统的单机处理方式已经无法满足现代数据处理需求。分布式数据处理框架应运而生,它通过将数据分片分布到多台服务器上并行处理,提高了任务的处理速度和效率。分布式数据处理框架的主要特点包括:水平扩展性:通过增加......
  • laplacian算子
    拉普拉斯算子(LaplacianOperator)是图像处理中的一种二阶导数算子,用于检测图像中的边缘。它可以增强图像中灰度变化较大的区域,从而突出边缘特征。数学定义拉普拉斯算子在二维情况下定义为:[\Deltaf(x,y)=\frac{\partial^2f}{\partialx^2}+\frac{\partial^2f}{\partial......
  • flinkcdc 实现数据监听
    1.概述FlinkCDC是一个用于实时数据和批处理数据的分布式数据集成工具。他可以监听数据库表的变化。实现将数据变化写到其他的数据源中。我们可以使用java实现自定义的数据写出。下面是实现细节。2.实现代码2.1项目依赖<dependencies><dependency><......
  • Flink同步mysql写入Iceberg异常,一秒写入一次
    1、现象在Iceberg数据湖治理过程中发现,同步任务运行7天没有写入数据,运行7天后突然大批量产生Commit,一秒产生一个Commit。 2、问题Flink写入checkpoint时会在checkpoint中先记录一个递增id,commit后会在Iceberg表中记录一个递增commitID,记录了两个id,默认两个id相等。通过不......
  • PySpark学习笔记2-RDD算子,RDD持久化
    RDD定义RDD是弹性分布式数据集,是spark中的最基本的数据抽象,里面的元素可以并行计算RDD的五大特性RDD是有分区的,它的分区是数据存储的最小单位RDD的方法会作用在所有分区上RDD之间是有依赖关系的KV型的RDD可以有分区器RDD的分区会尽量靠近数据所在的服务器,尽量保证本......
  • Flink如何设置合理的并行度
    一个Flink程序由多个Operator组成(source、transformation和sink)。一个Operator由多个并行的Task(线程)来执行,一个Operator的并行Task(线程)数目就被称为该Operator(任务)的并行度(Parallel)。即并行度就是相对于Operator来说的。合理设置并行度可以有效提高Flink作业的性......
  • Prometheus+Grafana监控flink任务指标
    Prometheus+Grafana监控flink任务指标前期准备Prometheus是一款基于时序数据库的开源监控告警系统,由go语言开发,Prometheus的基本原理是通过HTTP协议周期性抓取被监控组件的状态,任意组件只要提供对应的HTTP接口就可以接入监控。Grafana是一款采用Go语言编写的开源应用,前端由Re......
  • 计算机毕业设计PyFlink+Hadoop广告推荐系统 广告预测 广告数据分析可视化 广告爬虫 大
    温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO......
  • Flink源码解析之:如何根据JobGraph生成ExecutionGraph
    Flink源码解析之:如何根据JobGraph生成ExecutionGraph在上一篇Flink源码解析中,我们介绍了Flink如何根据StreamGraph生成JobGraph的流程,并着重分析了其算子链的合并过程和JobGraph的构造流程。对于StreamGraph和JobGraph的生成来说,其都是在客户端生成的,本文将会讲述JobGraph......