首页 > 其他分享 >Flink

Flink

时间:2023-09-19 21:36:18浏览次数:26  
标签:task 窗口 Flink 作业 yarn 线程

Flink

概念

Flink运行时由两种类型的进程组成:JobManager和TaskManager。

Flink Program可以理解为自己提交的jar包。构建出Dataflow(数据流),Optimizer Graph Builder(图构造优化器),Client(客户端)。

时域

事件时间

事件发生的时间。

到达时间

数据到达Flink时间。

处理时间

Flink开始处理时间。

很多时候等于到达时间。

窗口

Aggregating events (e.g., counts, sums) works differently on streams than in batch processing. For example, it is impossible to count all elements in a stream, because streams are in general infinite (unbounded). Instead, aggregates on streams (counts, sums, etc), are scoped by windows, such as “count over the last 5 minutes”, or “sum of the last 100 elements”.
聚合事件(例如,计数、总和)在流上的工作方式不同于在批处理中。例如,不可能统计流中的所有元素,因为流通常是无限的(无界的)。相反,流上的聚合(计数、和等)由窗口确定作用域,如“在过去5分钟内计数”或“最后100个元素的总和”。

Windows can be time driven (example: every 30 seconds) or data driven (example: every 100 elements). One typically distinguishes different types of windows, such as tumbling windows (no overlap), sliding windows (with overlap), and session windows (punctuated by a gap of inactivity).
Windows 可以是时间驱动的(例如: 每30秒)或数据驱动的(例如: 每100个元素)。通常区分不同类型的窗口,例如滚动窗口(无重叠)、滑动窗口(有重叠)和会话窗口(不活动间隔)。

分为:固定窗口、滑动窗口、会话窗口

固定窗口:通常按照固定的时间片进行划分;各个窗口之间不会有重叠也不会有间隙;

滑动窗口

在固定的窗口上增加了滑动的步长;

比如:每五分钟统计过去十分钟内的数据,即:窗口为10 ,滑动步长为5 ,则每次都有5分钟的窗口数据与上次统计的重叠;

窗口=步长:等同于固定窗口;

窗口>步长:有重叠;

窗口<步长:窗口之间出现空隙;

会话窗口

没有固定的窗口大小,由时间频率决定;

当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的;

Watermark

进程

JobManager

至少有一个,高可用模式下由多个,一个active,其余的standby。协调Flink应用程序的分布式执行,决定何时调度下一个task,对完成、失败的task做出反应,协调checkPoint,故障恢复等。

ResourceManager

负责Flink集群中资源提供、回收、分配,管理task solts。

Dispatcher

提供REST接口,用来提交Flink程序,并未每个提交的启动一个新的JobMaster。提供webui。

JobMaster

负责管理单个JobGraph(生成逻辑图有向图)的执行,Flink集群中可以同时运行多个作业,每个作业都有自己的JobMaster。

TaskManager

执行作业流的task,缓存、交换数据。

能调度的最小单位是task solt。

一个taskmanager被分配1000m资源,5个solt,那么每个solt占用1000/5=200m资源;

tasks和算子链

Flink将算子的subtasks链接成tasks,每个task由一个线程执行。(减少线程切换、缓冲开销、件事时延和增加吞吐量)

如下图示例:source-->map()-->keyby()wondow()apply()-->sink
source和map为一一对应关系,可以优化一个算子链,一个task,并行度为2,有2个subtask(线程);
keyby()会对数据进行分区,所以不能和前面整合为一个算子链,并行度为2,有两个线程;
sink一个task,一个并行度,一个线程;
共三个task,五个线程。

本地配置30个solt,运行两个job,并行度默认为2。

算子链优化

Task Slots 和资源

每个TaskManager(worker节点)都是一个jvm进程。可以在单独的线程中的执行一个或多个subtask。为了控制一个TaskManager能执行多少个task/subtask,就有了task solt。

具有3个solt的TaskManager,每个solt占据1/3的内存,没有CPU隔离。

一个TaskManager有多个solt时,solt共享同一jvm,tcp连接数,心跳信息,数据集,数据结构等;

默认情况下运训subtask共享solt,即便不是同一个task的subtask,只要是来自同一个作业即可。

例子:将上边的作业的五个线程分配到solt

例子:将上边作业的source()-map()和keyby()-window()-apply()的并行度增加到6,共2*6+1(sink)=13个线程

运行模式

local模式原理

1. flink 程序由JobClient提交;
2. JobClient将作业提交给JobManager
3. JobManager负责协调资源分配和作业执行。资源分配完成,将任务分配给相应的TaskManager。
4. TaskManager启动一个线程开始执行,同时向JobManager报告执行状态。
5. 执行完成将结果反馈客户端JobClient。

slot在 Flink 里面可以认为是资源组, Flink 是通过将任务分成子任务并且将这些子任务分配到 slot 来并行执行程序。

on yarn模式

1. client上传jar包和配置文件到hdfs集群;
2. clinet向yarn 的ResourceManager申请资源;
3. ResourceManager分配container资源并启动AppliactionMaster,然后AP加载flink的jar包和配置构建环境,启动JobManager;
JobManager和ApplicationMaster运行在同一个container上。
ApplicationMatser也提供了flink的web服务接口。
yarn所分配的所有端口都是临时端口,允许用户并行
4. ApplicationMaster向ResourceManager申请工作资源,nodemanager记载flink程序和配置文件构建TaskManager。
5. TaskManager启动后,向JobManager发送心跳包,等待分配任务。

关闭yarn的内存检查

是否启动线程检查每个任务使用的虚拟内存量,超出直接杀掉,默认是true;

yarn-site.xml

<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

复制到各个节点上,重启;

session模式

特点:需要实现申请资源,启动JobManager和TaskManager,每次作业执行完成不是放flink集群资源;
有点:不需要每次递交作业申请资源,二十使用已经申请好的资源,从而提升执行效率;
缺点:作业执行完以后,资源不会释放,因此会一直占用系统资源;
适用场景:适合作业递交比较频繁的场景,小而多的作业;

提交任务

/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

# -n 申请两个容器,这里指的是多少个taskmanager;
# -tm 表示每个taskmanager的内存大小;
# -s 表示每个taskmanager的solts数量;
# -d 表示后台运行;

per-job模式

特点:每次递交作业都需要申请一次资源;
优点:作业运行完成,资源会立刻释放,不会一直占用资源;
缺点:每次递交作业申请资源会影响执行效率;
适用场景:适合作业比较少的场景,大作业的场景

提交job

/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /flink/server/flink/examples/batch/wordCount.jar

# -m jobmananger地址;
# -yjm 1024指定jobmananger的内存信息;
# -ytm 1024指定taskmanager的内存信息;

可在yarn的application界面查看;

如果使用yarn方式执行任务,想要切换回standalone模式有报错,可以删除/tmp/.yarn-properties-root,因为默认查找当前yarn集群中已经有的yarn-session信息中的jobmanager

application模式

标签:task,窗口,Flink,作业,yarn,线程
From: https://www.cnblogs.com/CHEN-zuhe/p/17715858.html

相关文章

  • 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2-2)
    Flink系列文章[1、Flink部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接][13、Flink的tableapi与sql的基本概念、通用api介绍及入门示例][14、Flink的tableapi与sql之数据类型:内置数据类型以及它们的属性][15、Flink的t......
  • 云主机测试Flink磁盘满问题解决
    问题描述:使用云主机测试Flink时,根目录满了。经排查发现运行Flink任务后根目录空间一直在减少,最后定位持续增加的目录是/tmp目录解决方法:修改Flink配置使用一个相对较大的磁盘目录做为Flink运行时目录#Overridethedirectoriesfortemporaryfiles.Ifnotspecified,the#sy......
  • 【Flink系列十八】HDFS_DELEGATION_TOKEN过期的问题解决汇总
    问题类别Spark框架自身的问题Hadoop全家桶的问题开发者通过Hive,HDFS,HBASE访问HDFS的问题排查已知Hadoop-common-2.6.0的UGI存在bug,代码为HADOOP-10786,该问题在CDH发行版中已经修复,但Apache版本存在问题。已知HDFS也存在一个HDFS_DELEGATION_TOKEN过期的bug,代码为HDFS-9......
  • Flink CDC 原理、实践和优化
    本文转载自:https://zhuanlan.zhihu.com/p/430182083 CDC变更数据捕获技术可以将源数据库的增量变动记录,同步到一个或多个数据目的。本文基于腾讯云Oceanus提供的FlinkCDC引擎,着重介绍Flink在变更数据捕获技术中的应用。一、CDC是什么?CDC是变更数据捕获(ChangeData......
  • Iceberg从入门到精通系列之十一:Flink DataStream读取Iceberg表
    Iceberg从入门到精通系列之十一:FlinkDataStream读取Iceberg表一、完整代码二、效果如下所示一、完整代码importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;i......
  • Iceberg从入门到精通系列之九:flink sql修改Iceberg表和删除Iceberg表
    Iceberg从入门到精通系列之九:flinksql修改Iceberg表一、修改表属性二、修改表名三、删除表一、修改表属性ALTERTABLE`hive_catalog`.`default`.`sample`SET('write.format.default'='avro');二、修改表名ALTERTABLE`hive_catalog`.`default`.`sample`RENAMETO`hive_cat......
  • Iceberg从入门到精通系列之八:flink sql 创建Iceberg表
    Iceberg从入门到精通系列之八:flinksql创建Iceberg表一、创建数据库二、创建表三、创建分区表四、使用LIKE语法建表五、创建主键表一、创建数据库createdatabaseiceberg_db;useiceberg_db;二、创建表createtable`hive_catalog`.`default`.`sample`(idbigintcomment'un......
  • Iceberg从入门到精通系列之七:Flink SQL创建Catalog
    Iceberg从入门到精通系列之七:FlinkSQL创建Catalog一、语法说明二、flink集成hivejar包三、放到指定目录四、启动hivemetastore服务五、创建hivecatalog六、查看catalog七、HadoopCatalog八、创建sql-client初始化文件九、启动flinksql指定初始化文件一、语法说明createcat......
  • Iceberg从入门到精通系列之六:Flink集成Iceberg
    Iceberg从入门到精通系列之六:Flink集成Iceberg一、下载Flink二、解压Flink安装包三、配置环境变量四、激活环境变量五、下载Icebergflinkjar包六、部署Icebergflinkjar包七、修改flink配置八、启动flink九、启动flinksqlclient一、下载Flink下载Flink:https://www.apache.o......
  • 在flink-1.17中测试执行流处理版本的单词计数程序时,出现"Exception in thread "Thread
    场景描述采用单作业模式提交作业后发现报错了 报错内容Exceptioninthread“Thread-5”java.lang.IllegalStateException:Tryingtoaccessclosedclassloader.Pleasecheckifyoustoreclassloadersdirectlyorindirectlyinstaticfields.Ifthestacktrace......