一、Flink部署模式
由于在一些企业应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求,所以Flink为各种场景提供了不同的部署模式,主要包含以下三种模式。
会话模式(Session Mode):如图所示,在会话模式中,会先启动一个Flink集群保持一个会话,然后通过客户端提交Flink作业。由于Flink集群启动时所有资源都已经确定,所以通过客户端提交的所有作业会竞争Flink集群的资源。该模式比较适合单个作业规模较小,而执行时间短的大量作业的应用场景。
单作业模式(Per-Job Mode):由于会话模式的资源共享特性会导致很多问题,所以为了更好地隔离资源,可以考虑为提交的每个Flink作业启动一个集群,这种模式就是单作业模式,如图所示。在单作业模式中,作业运行完成之后,Flink集群就会关闭,所有资源也会被释放。这种特性使得单作业模式在生产环境中运行更加稳定,是实际应用中的首选模式,但是该模式仅仅支持YARN运行模式,并且在Flink1.15及以后的版本中被废弃掉了。
应用模式(Application Mode):由于会话模式和单作业模式的Flink应用代码都是在客户端执行,然后由客户端提交给JobManager,所以会造成客户端需要占用大量的网络带宽,特别是提交作业使用的是同一个客户端。如图所示,应用模式的解决办法是,直接将Flink应用提交到JobManager上运行。这就意味着我们需要为每一个提交的Flink应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个Flink应用而存在,应用执行结束之后JobManager也会随之关闭,这就是应用模式。
应用模式与单作业模式的相同之处:都会专门为一个Flink作业运行一个集群。不同之处:在单作业模式中,Flink作业的main方法在客户端运行,而在应用模式中,Flink作业的main方法在JobManager上执行。
二、Flink分布式集群规划
这里需要提前准备hadoop01、hadoop02、hadoop03三个节点来搭建Flink集群,hadoop01节点部署JobManager和TaskManager,hadoop02和hadoop03只部署TaskManager。
三、 Flink Standalone运行模式
1.Standalone运行模式概述
Standalone运行模式是Flink最基本的运行模式,它不需要依赖任何外部框架就可以独立工作。在这种运行模式下,所有的Flink组件(如JobManager、TaskManager等)都在集群中运行,形成一个独立的Flink集群。在三种部署模式中,Standalone运行模式支持会话模式和应用模式部署,不支持单作业模式部署。
Standalone会话模式下作业提交流程如上图所示。
(1)客户端将作业提交给JobManager,Dispatcher负责作业的接收和调度。
(2)Dispathcer为作业启动一个JobMaster,并将工作交给JobManager(更准确地说是交给了JobMaster)。
(3)JobMaster是作业的主控节点,负责作业的调度和管理。JobMaster接收到作业信息之后,会进行详情的解析和规划,然后向ResourceManager请求相应的资源。
(4)ResourceManager负责集群资源的分配和管理。当JobMaster请求资源时,ResourceManager会根据集群的当前状态和作业的资源需求来分配资源,并将分配的资源信息返回给Job Master。
(5)ResourceManager分配的资源其实来自于TaskManager,TaskManager来提供空闲的资源(Slot)用于Task的执行。
(6)当获取到足够的资源后,JobMaster会开始启动作业中的各个Task,将Task分发到已经分配好的TaskManager节点上,并监控Task的执行情况。
2.配置Standalone运行模式的集群
(1)下载并解压Flink
下载flink-1.20.0-bin-scala_2.12.tgz安装包(地址:https://flink.apache.org/downloads/),选择hadoop01作为安装节点,然后上传至hadoop01节点的 /home/hadoop/app目录下进行解压安装,操作命令如下。
[hadoop@hadoop01 app]$ ls
flink-1.20.0-bin-scala_2.12.tgz
#解压
[hadoop@hadoop01 app]$ tar -zxvf flink-1.20.0-bin-scala_2.12.tgz
#创建软连接
[hadoop@hadoop01 app]$ ln -s flink-1.20.0 flink
(2)修改config.yaml配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改config.yaml配置文件,核心内容如下所示。
[hadoop@hadoop01 conf]$ vi config.yaml
#jobmanager配置
jobmanager:
bind-host: 0.0.0.0
rpc:
address: hadoop01
#taskmanager配置
taskmanager:
bind-host: 0.0.0.0
host: hadoop01
#Rest配置
rest:
address: hadoop01
bind-address: 0.0.0.0
(3)修改masters配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改masters配置文件,将hadoop01节点指定为JobManager角色,具体内容如下所示。
[hadoop@hadoop01 conf]$ vi masters
hadoop01:8081
(4)修改workers配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改workers配置文件,将hadoop01、hadoop02和hadoop03节点指定为JobManager角色,具体内容如下所示。
[hadoop@hadoop01 conf]$ vi workers
hadoop01
hadoop02
hadoop03
(5)分发安装目录
在hadoop01节点中,使用Linux远程拷贝命令scp将修改好的Flink配置文件,分别同步到hadoop02和hadoop03节点,具体操作如下所示。
#分发Flink配置文件
[hadoop@hadoop01 app]$ scp -r flink-1.20.0 hadoop@hadoop02:/home/hadoop/app/
[hadoop@hadoop01 app]$ scp -r flink-1.20.0 hadoop@hadoop03:/home/hadoop/app/
#创建软连接
[hadoop@hadoop02 app]$ ln -s flink-1.20.0 flink
[hadoop@hadoop03 app]$ ln -s flink-1.20.0 flink
Flink配置文件同步完成之后,需要分别到hadoop02和hadoop03节点上修改config.yaml配置文件,具体修改内容如下所示。
[hadoop@hadoop02 conf]$ vi config.yaml
taskmanager:
host: hadoop02
[hadoop@hadoop03 conf]$ vi config.yaml
taskmanager:
host: hadoop03
3.会话模式部署
在会话模式中,需要提前启动Flink集群,然后在客户端提交作业。这里以WordCount作业为例,具体操作步骤如下所示。
(1)启动集群
在hadoop01节点上,进入Flink的bin目录,启动Flink Standalone集群,具体操作如下所示。
[hadoop@hadoop01 flink]$ cd bin/
[hadoop@hadoop01 bin]$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop01.
Starting taskexecutor daemon on host hadoop01.
Starting taskexecutor daemon on host hadoop02.
Starting taskexecutor daemon on host hadoop03.
(2)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。
[root@hadoop01 ~]# nc -lk 9999
(3)提交Flink作业
将IDEA打好的learningFlink1.20-1.0-SNAPSHOT.jar包上传至hadoop01节点,然后通过Flink脚本提交WordCount作业,具体操作如下所示。
[hadoop@hadoop01 flink]$ bin/flink run -m hadoop01:8081 -c com.yangjun.WordCount /home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jar
Job has been submitted with JobID b6a1b3473c28b169735ebb24ab050547
(4)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。
[root@hadoop01 ~]# nc -lk 9999
flink
flink
flink
(5)查看运行结果
由于hadoop01、hadoop02和hadoop03节点都是TaskManager角色,所以提交的WordCount作业可能运行在其中任何一个节点中。如果想查看WordCount作业的输出结果,需要逐个查看每个节点的日志文件输出结果。根据查看情况,定位到作业运行在hadoop01节点,此时可进入Flink的log目录查看输出结果,具体操作如下所示。
[hadoop@hadoop01 flink]$ cd log/
[hadoop@hadoop01 log]$ tail flink-hadoop-taskexecutor-0-hadoop01.out
(flink,1)
(flink,2)
(flink,3)
4.应用模式部署
在应用模式中,不会提前启动Flink集群,所以不能使用start-cluster.sh脚本启动集群服务。这里需要使用bin目录下的standalone-job.sh脚本单独启动JobManager服务,具体操作步骤如下所示。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。
[root@hadoop01 ~]# nc -lk 9999
(2)准备应用程序包
将IDEA打好的项目包上传至hadoop01节点的/home/hadoop/app/flink/lib目录下,查看Flink lib目录下的项目包操作如下所示。
[hadoop@hadoop01 lib]$ pwd
/home/hadoop/app/flink/lib
[hadoop@hadoop01 lib]$ ls
learningFlink1.20-1.0-SNAPSHOT.jar
(3)启动JobManager服务
在hadoop01节点上,进入Flink的bin目录,启动JobManager服务并指定WordCount的作业入口,具体操作如下所示。
[hadoop@hadoop01 bin]$ ./standalone-job.sh start --job--classname com.yangjun.WordCount
Starting standalonejob daemon on host hadoop01.
备注:这里通过–job–classname参数直接指定WordCount入口类,脚本会到lib目录中扫描所有的jar包,从而找到WordCount应用程序包。
(4)启动TaskManager服务
分别在hadoop01、hadoop02和hadoop03节点,进入Flink的bin目录启动TaskManager服务,具体操作如下所示。
[hadoop@hadoop01 bin]$ ./taskmanager.sh start
Starting taskexecutor daemon on host hadoop01.
[hadoop@hadoop02 bin]$ ./taskmanager.sh start
Starting taskexecutor daemon on host hadoop02.
[hadoop@hadoop03 bin]$ ./taskmanager.sh start
Starting taskexecutor daemon on host hadoop03.
(5)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。
[root@hadoop01 ~]# nc -lk 9999
flink
flink
flink
(6)查看运行结果
通过查看定位到作业运行在hadoop01节点,此时可进入Flink的log目录查看输出结果,具体操作如下所示。
[hadoop@hadoop01 flink]$ cd log/
[hadoop@hadoop01 log]$ tail flink-hadoop-taskexecutor-0-hadoop01.out
(flink,1)
(flink,2)
(flink,3)
四、 Flink YARN运行模式
1.YARN运行模式概述
Flink YARN运行模式是Flink利用Hadoop YARN进行资源管理和调度的部署方式。在YARN上,Flink作业动态申请资源,由YARN的ApplicationMaster启动JobManager和TaskManager。它支持会话模式和应用模式:前者允许在会话中提交多个作业,后者为每个作业启动单独的YARN应用。相比Standalone模式,YARN模式提供更高的资源利用率和弹性,支持资源隔离和优先级调度,使Flink作业能更好地共享大型分布式集群资源。
YARN会话模式下作业提交流程如上图所示。
(1)启动集群
如果没有集群,需要创建一个新的Session模式的集群。首先,将应用的配置文件上传至HDFS,然后通过客户端向YARN提交Flink创建集群的申请,YARN分配资源,在申请的YARN容器中初始化并启动Flink JobManager,在JobManager进程中运行YarnSessionClusterEntrypoint作为集群启动的入口,初始化Dispatcher、ResourceManager。启动相关的RPC服务,等待客户端通过Rest接口提交作业。
(2)提交作业
YARN集群准备好之后,开始提交作业。
1)Flink客户端通过Rest向Dispatcher提交作业。
2)Dispatcher是Rest接口,不负责实际的调度、执行方面的工作,当收到作业之后,为作业创建一个JobMaster,将工作交给JobMaster(负责作业调度、管理作业和Task的生命周期)。
3)JobMaster向Flink ResourceManager申请资源,开始调度作业的执行;初次提交作业,集群尚没有TaskManager,此时资源不足,开始申请资源。
4)Flink ResourceManager收到JobMaster的资源请求,如果当前有空闲的Slot,则将Slot分配给JobMaster,否则Flink ResourceManager将向YARN ResourceManager请求创建TaskManager。
5)Flink ResourceManager将资源请求加入等待请求队列,并通过心跳向YARN ResourceManager申请新的Container资源来启动TaskManager进程;YARN分配新的Container给TaskManager。
6)Flink ResourceManager从HDFS加载Jar文件等所需的相关资源,在容器中启动TaskManager。
7)TaskManager启动之后,向Flink ResourceManager进行注册,并把自己的Slot资源情况汇报给Flink ResourceManager。
8)Flink ResourceManager从等待队列中取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给了哪个JobMaster。
9)TaskManager向JobMaster提供Slot,JobMaster将Task调度到TaskManager的Slot上执行。
2.配置YARN运行模式的集群
Flink On YARN运行模式的运行依赖与Hadoop集群(需要提前搭建好Hadoop集群),不需要配置Flink集群,只需要配置一个客户端提交Flink作业即可。
(1)配置环境变量
由于节点资源有限,这里我们选择hadoop01节点作为Flink客户端。hadoop01节点需要增加Hadoop环境变量,才能将Flink作业提交到YARN集群,具体操作如下所示。
[hadoop@hadoop01 ~]$ vi ~/.bashrc
JAVA_HOME=/home/hadoop/app/jdk
HADOOP_HOME=/home/hadoop/app/hadoop
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH HADOOP_HOME
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
#使配置生效
[hadoop@hadoop01 flink]$ source ~/.bashrc
(2)启动Zookeeper集群
分别在hadoop01、hadoop02和hadoop03节点进入Zookeeper安装目录,启动Zookeeper服务,具体操作如下所示。
[hadoop@hadoop01 zookeeper]$ bin/zkServer.sh start
[hadoop@hadoop02 zookeeper]$ bin/zkServer.sh start
[hadoop@hadoop03 zookeeper]$ bin/zkServer.sh start
(3)启动Hadoop集群
在hadoop01节点,进入Hadoop安装目录分别启动HDFS和YARN集群,具体操作如下所示。
[hadoop@hadoop01 hadoop]$ sbin/start-dfs.sh
[hadoop@hadoop01 hadoop]$ sbin/start-yarn.sh
3.会话模式部署
YANR的会话模式与Standalone集群略有不同,首先需要申请一个YARN会话即YARN Session,来启动Flink集群。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。
[root@hadoop01 ~]# nc -lk 9999
(2)启动YARN会话
在hadoop01节点,进入Flink安装目录,启动YARN会话,具体操作如下所示。
[hadoop@hadoop01 flink]$ bin/yarn-session.sh -nm wordcount
Found Web Interface hadoop03:45753 of application 'application_1734486011928_0001'.
JobManager Web Interface: http://hadoop03:45753
核心参数说明。
-d:分离模式,可以使YARN Session在后台运行。
-jm(–jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(–name):配置在YARN UI界面上显示的任务名。
-qu(–queue):指定YARN队列名。
-tm(–taskManager):配置每个TaskManager所使用内存。
如上打印信息所示。YARN Session启动之后,会生成一个YARN application ID和一个Web UI地址。
(3)提交作业
Flink可以通过Web UI界面或者命令行来提交作业,由于Web UI界面操作较为简单,我们重点讲解通过命令行来提交Flink作业。
在hadoop01节点,新打开一个控制台,进入Flink安装目录,提交WordCount作业,具体操作如下所示。
[hadoop@hadoop01 flink]$ bin/flink run
-c com.yangjun.WordCount /home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jar
Flink客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到。
(4)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。
[root@hadoop01 ~]# nc -lk 9999
flink
flink
flink
(5)查看运行结果
通过查看定位到作业运行在hadoop02节点,此时可进入Hadoop安装路径下的logs/userlogs目录查看输出结果,具体操作如下所示。
[hadoop@hadoop02 container_e42_1734486011928_0001_01_000002]$ tail taskmanager.out
(flink,1)
(flink,2)
(flink,3)
[hadoop@hadoop02 container_e42_1734486011928_0001_01_000002]$ pwd
/home/hadoop/app/hadoop/logs/userlogs/application_1734486011928_0001/container_e42_1734486011928_0001_01_000002
4.应用模式部署
在Flink On YARN运行模式中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的Flink作业,从而启动一个Flink集群。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。
[root@hadoop01 ~]# nc -lk 9999
(2)提交作业
在hadoop01节点,进入Flink安装目录,直接提交WordCount作业,具体操作如下所示。
[hadoop@hadoop01 flink]$ bin/flink run-application -t yarn-application
-c com.yangjun.WordCount /home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jar
......
YARN application has been deployed successfully.
Found Web Interface hadoop02:35108 of application 'application_1734486011928_0002'.
(3)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。
[root@hadoop01 ~]# nc -lk 9999
flink
flink
flink
(4)查看运行结果
通过查看定位到作业运行在hadoop02节点,此时可进入Hadoop安装路径下的logs/userlogs目录查看输出结果,具体操作如下所示。
[hadoop@hadoop02 container_e42_1734486011928_0002_01_000002]$ tail taskmanager.out
(flink,1)
(flink,2)
(flink,3)
[hadoop@hadoop02 container_e42_1734486011928_0002_01_000002]$ pwd
/home/hadoop/app/hadoop/logs/userlogs/application_1734486011928_0002/container_e42_1734486011928_0002_01_000002
到这里为止,我们已经成功完成了Flink On Standalone集群和Flink On YANR集群的搭建,同时分别进行了会话模式和应用模式的作业部署。
标签:flink,hadoop01,Flink,作业,hadoop,YARN,Flink1.20,亲测,分布式
From: https://blog.csdn.net/yangjun_1985/article/details/144607132