首页 > 其他分享 >56、Flink DataStream 的管理执行配置详解

56、Flink DataStream 的管理执行配置详解

时间:2024-07-03 10:00:07浏览次数:26  
标签:DataStream Flink 56 Kryo 默认 并行度 设置 序列化

1)概述
1.执行配置

StreamExecutionEnvironment 包含了 ExecutionConfig,它允许在运行时设置作业特定的配置值。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();

以下是可用的配置选项:(默认为粗体)

  • setClosureCleanerLevel()。closure cleaner 的级别默认设置为 ClosureCleanerLevel.RECURSIVE。closure cleaner 删除 Flink 程序中对匿名 function 的调用类的不必要引用。禁用 closure cleaner 后,用户的匿名 function 可能正引用一些不可序列化的调用类。这将导致序列化器出现异常。可设置的值是: NONE:完全禁用 closure cleaner ,TOP_LEVEL:只清理顶级类而不递归到字段中,RECURSIVE:递归清理所有字段。
  • getParallelism() / setParallelism(int parallelism)。为作业设置默认的并行度。
  • getMaxParallelism() / setMaxParallelism(int parallelism)。为作业设置默认的最大并行度。此设置决定最大并行度并指定动态缩放的上限。
  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries)。设置失败任务重新执行的次数。值为零会有效地禁用容错。-1 表示使用系统默认值(在配置中定义)。该配置已弃用,请改用重启策略。
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay)。设置系统在作业失败后重新执行之前等待的延迟(以毫秒为单位)。在 TaskManagers 上成功停止所有任务后,开始计算延迟,一旦延迟过去,任务会被重新启动。该配置已被弃用,请改用重启策略 。
  • getExecutionMode() / setExecutionMode()。默认的执行模式是 PIPELINED。设置执行模式以执行程序。执行模式定义了数据交换是以批处理方式还是以流方式执行。
  • enableForceKryo() / disableForceKryo。默认情况下不强制使用 Kryo。强制 GenericTypeInformation 对 POJO 使用 Kryo 序列化器,即使可以将它们作为 POJO 进行分析。在某些情况下,应该优先启用该配置。例如,当 Flink 的内部序列化器无法正确处理 POJO 时。
  • enableForceAvro() / disableForceAvro()。默认情况下不强制使用 Avro。强制 Flink AvroTypeInfo 使用 Avro 序列化器而不是 Kryo 来序列化 Avro 的 POJO。
  • enableObjectReuse() / disableObjectReuse()。默认情况下,Flink 中不重用对象。启用对象重用模式会指示运行时重用用户对象以获得更好的性能。注意可能会导致bug。
  • getGlobalJobParameters() / setGlobalJobParameters()。此方法允许用户将自定义对象设置为作业的全局配置。由于 ExecutionConfig 可在所有用户定义的 function 中访问,因此这是一种使配置在作业中全局可用的简单方法。
  • addDefaultKryoSerializer(Class type, Serializer serializer)。为指定的类型注册 Kryo 序列化器实例。
  • addDefaultKryoSerializer(Class type, Class> serializerClass)。为指定的类型注册 Kryo 序列化器的类。
  • registerTypeWithKryoSerializer(Class type, Serializer serializer)。使用 Kryo 注册指定类型并为其指定序列化器。通过使用 Kryo 注册类型,该类型的序列化将更加高效。
  • registerKryoType(Class type)。如果类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记(整数 ID)被写入。如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的 I/O 成本。
  • registerPojoType(Class type)。将指定的类型注册到序列化栈中。如果该类型最终被序列化为 POJO,那么该类型将注册到 POJO 序列化器中。如果该类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记被写入。如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的I/O成本。

注意:用 registerKryoType() 注册的类型对 Flink 的 Kryo 序列化器实例来说是不可用的。

  • disableAutoTypeRegistration()。自动类型注册在默认情况下是启用的。自动类型注册是将用户代码使用的所有类型(包括子类型)注册到 Kryo 和 POJO 序列化器。
  • setTaskCancellationInterval(long interval)。设置尝试连续取消正在运行任务的等待时间间隔(以毫秒为单位)。当一个任务被取消时,会创建一个新的线程,如果任务线程在一定时间内没有终止,新线程就会定期调用任务线程上的 interrupt() 方法。这个参数是指连续调用 interrupt() 的时间间隔,默认设置为 30000 毫秒,或 30秒

通过 getRuntimeContext() 方法在 Rich* function 中访问到的 RuntimeContext 也允许在所有用户定义的 function 中访问 ExecutionConfig

2.程序打包和分布式运行
a)打包程序

为了能够通过命令行或 web 界面执行打包的 JAR 文件,程序必须使用通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取的 environment。当 JAR 被提交到命令行或 web 界面后,该 environment 会扮演集群环境的角色。如果调用 Flink 程序的方式与上述接口不同,该 environment 会扮演本地环境的角色。

打包程序只要简单地将所有相关的类导出为 JAR 文件,JAR 文件的 manifest 必须指向包含程序入口点(拥有公共 main 方法)的类。实现的最简单方法是将 main-class 写入 manifest 中(比如 main-class: org.apache.flinkexample.MyProgram)。main-class 属性与 Java 虚拟机通过指令 java -jar pathToTheJarFile 执行 JAR 文件时寻找 main 方法的类是相同的。

大多数 IDE 提供了在导出 JAR 文件时自动包含该属性的功能。

b)总结

调用打包后程序的完整流程包括两步:

  • 搜索 JAR 文件 manifest 中的 main-classprogram-class 属性。如果两个属性同时存在,program-class 属性会优先于 main-class 属性。对于 JAR manifest 中两个属性都不存在的情况,命令行和 web 界面支持手动传入入口点类名参数。
  • 系统接着调用该类的 main 方法。
3.并行执行
a)概述

一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。

使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,可以改变特定算子或者整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。

b)设置并行度

算子层面

单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(value -> value.f0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");

执行环境层次

Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = [...];
wordCounts.print();

env.execute("Word Count Example");

客户端层次

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通过如下方式指定并行度:

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}

系统层次

可以通过设置 Flink 配置文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。

c)设置最大并行度

最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,可以通过调用 setMaxParallelism() 方法来设定最大并行度。

默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768

为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。

从之前的作业恢复时,改变该作业的最大并发度将会导致状态不兼容。

标签:DataStream,Flink,56,Kryo,默认,并行度,设置,序列化
From: https://blog.csdn.net/m0_50186249/article/details/140144343

相关文章

  • 从0到1Flink的成长之路(二十)-Flink 高级特性(二)之自动重启策略和恢复 ,固定延迟重启策
    从0到1Flink的成长之路(二十)-Flink高级特性(二)之自动重启策略和恢复,,固定延迟重启策略(开发中使用)自动重启策略和恢复1)、重启策略配置方式配置文件在flink-conf.yml中可以进行配置,示例如下:restart-strategy:fixed-delayrestart-strategy.fixed-delay.attempts:3restart-strat......
  • 用ADSP-21569做A2B的开发的保姆级教程六:Fireworks源码
    作者的话我在前面已经写了5篇,补充一篇更新:Fireworks源码关于A2B,我写过非常多的文章,都是基于ADI公司的A2B开发板参考设计来写的,在真实世界里,主机厂们用到了ADSP-21565、ADSP-21569,甚至ADSP-21593来做座舱的音响设计,整车的NVH,这些应用全部都涉及到了A2B,也就是所谓的汽车音......
  • 56.Ajax操作
    【一】Ajax1)简介Ajax:异步的Javascript和XML,即JavaScript语言与服务器进行异步交互,传输的数据为XML可用与服务器交换数据并更新部分网页内容同步交互:客户端发出一个请求后,需要等待服务器响应结束后,才能发出第二个请求异步同步:客户端发出一个请求后,无需等待服务器......
  • 大数据面试题之Flink(1)
    目录Flink架构 Flink的窗口了解哪些,都有什么区别,有哪几种?如何定义? Flink窗口函数,时间语义相关的问题 介绍下Flink的watermark(水位线),watermark需要实现哪个实现类,在何处定义?有什么作用? Flink的窗口(实现)机制 说下Flink的CEP 说一说Flink的Checkpoint机制 ......
  • 大数据面试题之Flink(2)
    Flink中Checkpoint超时原因 Flink的ExactlyOnce语义怎么保证? Flink的端到端ExactlyOnce Flink的水印(Watermark),有哪几种? Flink的时间语义 Flink相比于其它流式处理框架的优点? Flink和Spark的区别?什么情况下使用Flink?有什么优点? FlinkbackPressure反压机......
  • 大数据面试题之Flink(3)
    如何确定Flink任务的合理并行度? Flink任务如何实现端到端一致? Flink如何处理背(反)压? Flink解决数据延迟的问题 Flink消费kafka分区的数据时flink件务并行度之间的关系 使用flink-client消费kafka数据还是使用flink-connector消费 如何动态修改Flink的配置,前提......
  • ADS1256芯片说明
    本篇文章先总结一下24位的8通道24bit高精度采集的24位ADS1256,本篇文章不是纯粹的datasheet的抄袭,而是datasheet的总结,高度概括,以及对我们编程有用的思路,我大概看了一下网上流传的版本,大多数都是STM32,另外有一份是verilog不知道是谁写的,各个网都有,它是多通道采集,仅仅使用了一种模式......
  • FlinkCDCSQL数据同步mysql->clickhouse
    FlinkCDC(ChangeDataCapture)SQL用于实现数据库的数据变更捕获,并通过SQL接口进行处理。以下是一个基本的示例,全量+增量数据mysql同步到clickhouse,展示如何使用FlinkCDCSQL进行数据同步。首先,确保你有Flink和FlinkCDC的环境配置好。1.mysql测试source表(准备......
  • NRF52840DK PCA10056 BLE Mesh Light例程记录
    1.创建项目在打开的VSCode窗口,打开nRFConnect选项卡,"Createanewapplication" 选择"Copyasample" 输入"light", 选择"BluetoothMeshlight". 选择copy后,保存的路径。 键盘"Enter"一下。 点击"AddBuildConfiguration&qu......
  • RK3568 android13 预置APK
    现象:需要预置一个测试APK,按照以往得方法,将APK放到device/rockchip/rk356x/rk3568_t/preinstall目录下面。然后编译成新得固件。发现桌面没有显示APK。 查看OUT目录下面有预置得APK,out/target/product/rk3568_t/odm/bundled_persist-app/autotest/autotest.apk。说明APK是有......