首页 > 其他分享 >【亲测有效】Flink1.20分布式集群搭建-最新版本

【亲测有效】Flink1.20分布式集群搭建-最新版本

时间:2024-12-22 11:30:08浏览次数:8  
标签:flink hadoop01 Flink 作业 hadoop YARN Flink1.20 亲测 分布式

一、Flink部署模式

由于在一些企业应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求,所以Flink为各种场景提供了不同的部署模式,主要包含以下三种模式。

会话模式(Session Mode):如图所示,在会话模式中,会先启动一个Flink集群保持一个会话,然后通过客户端提交Flink作业。由于Flink集群启动时所有资源都已经确定,所以通过客户端提交的所有作业会竞争Flink集群的资源。该模式比较适合单个作业规模较小,而执行时间短的大量作业的应用场景。
在这里插入图片描述

单作业模式(Per-Job Mode):由于会话模式的资源共享特性会导致很多问题,所以为了更好地隔离资源,可以考虑为提交的每个Flink作业启动一个集群,这种模式就是单作业模式,如图所示。在单作业模式中,作业运行完成之后,Flink集群就会关闭,所有资源也会被释放。这种特性使得单作业模式在生产环境中运行更加稳定,是实际应用中的首选模式,但是该模式仅仅支持YARN运行模式,并且在Flink1.15及以后的版本中被废弃掉了。
在这里插入图片描述

应用模式(Application Mode):由于会话模式和单作业模式的Flink应用代码都是在客户端执行,然后由客户端提交给JobManager,所以会造成客户端需要占用大量的网络带宽,特别是提交作业使用的是同一个客户端。如图所示,应用模式的解决办法是,直接将Flink应用提交到JobManager上运行。这就意味着我们需要为每一个提交的Flink应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个Flink应用而存在,应用执行结束之后JobManager也会随之关闭,这就是应用模式。
在这里插入图片描述

应用模式与单作业模式的相同之处:都会专门为一个Flink作业运行一个集群。不同之处:在单作业模式中,Flink作业的main方法在客户端运行,而在应用模式中,Flink作业的main方法在JobManager上执行。

二、Flink分布式集群规划

这里需要提前准备hadoop01、hadoop02、hadoop03三个节点来搭建Flink集群,hadoop01节点部署JobManager和TaskManager,hadoop02和hadoop03只部署TaskManager。
在这里插入图片描述

三、 Flink Standalone运行模式

1.Standalone运行模式概述
Standalone运行模式是Flink最基本的运行模式,它不需要依赖任何外部框架就可以独立工作。在这种运行模式下,所有的Flink组件(如JobManager、TaskManager等)都在集群中运行,形成一个独立的Flink集群。在三种部署模式中,Standalone运行模式支持会话模式和应用模式部署,不支持单作业模式部署。
在这里插入图片描述
Standalone会话模式下作业提交流程如上图所示。
(1)客户端将作业提交给JobManager,Dispatcher负责作业的接收和调度。
(2)Dispathcer为作业启动一个JobMaster,并将工作交给JobManager(更准确地说是交给了JobMaster)。
(3)JobMaster是作业的主控节点,负责作业的调度和管理。JobMaster接收到作业信息之后,会进行详情的解析和规划,然后向ResourceManager请求相应的资源。
(4)ResourceManager负责集群资源的分配和管理。当JobMaster请求资源时,ResourceManager会根据集群的当前状态和作业的资源需求来分配资源,并将分配的资源信息返回给Job Master。
(5)ResourceManager分配的资源其实来自于TaskManager,TaskManager来提供空闲的资源(Slot)用于Task的执行。
(6)当获取到足够的资源后,JobMaster会开始启动作业中的各个Task,将Task分发到已经分配好的TaskManager节点上,并监控Task的执行情况。

2.配置Standalone运行模式的集群
(1)下载并解压Flink
下载flink-1.20.0-bin-scala_2.12.tgz安装包(地址:https://flink.apache.org/downloads/),选择hadoop01作为安装节点,然后上传至hadoop01节点的 /home/hadoop/app目录下进行解压安装,操作命令如下。

[hadoop@hadoop01 app]$ ls
flink-1.20.0-bin-scala_2.12.tgz
#解压
[hadoop@hadoop01 app]$ tar  -zxvf  flink-1.20.0-bin-scala_2.12.tgz
#创建软连接
[hadoop@hadoop01 app]$ ln  -s  flink-1.20.0  flink

(2)修改config.yaml配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改config.yaml配置文件,核心内容如下所示。

[hadoop@hadoop01 conf]$ vi  config.yaml
#jobmanager配置
jobmanager:
  bind-host: 0.0.0.0
  rpc:
    address: hadoop01
#taskmanager配置
taskmanager:
  bind-host: 0.0.0.0
  host: hadoop01
#Rest配置
rest:
  address: hadoop01
  bind-address: 0.0.0.0

(3)修改masters配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改masters配置文件,将hadoop01节点指定为JobManager角色,具体内容如下所示。

[hadoop@hadoop01 conf]$ vi  masters
hadoop01:8081

(4)修改workers配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改workers配置文件,将hadoop01、hadoop02和hadoop03节点指定为JobManager角色,具体内容如下所示。

[hadoop@hadoop01 conf]$ vi  workers
hadoop01
hadoop02
hadoop03

(5)分发安装目录
在hadoop01节点中,使用Linux远程拷贝命令scp将修改好的Flink配置文件,分别同步到hadoop02和hadoop03节点,具体操作如下所示。

#分发Flink配置文件
[hadoop@hadoop01 app]$ scp  -r  flink-1.20.0  hadoop@hadoop02:/home/hadoop/app/
[hadoop@hadoop01 app]$ scp  -r  flink-1.20.0  hadoop@hadoop03:/home/hadoop/app/
#创建软连接
[hadoop@hadoop02 app]$ ln  -s  flink-1.20.0  flink
[hadoop@hadoop03 app]$ ln  -s  flink-1.20.0  flink

Flink配置文件同步完成之后,需要分别到hadoop02和hadoop03节点上修改config.yaml配置文件,具体修改内容如下所示。

[hadoop@hadoop02 conf]$ vi  config.yaml
taskmanager:
  host: hadoop02
[hadoop@hadoop03 conf]$ vi config.yaml
taskmanager:
  host: hadoop03

3.会话模式部署
在会话模式中,需要提前启动Flink集群,然后在客户端提交作业。这里以WordCount作业为例,具体操作步骤如下所示。
(1)启动集群
在hadoop01节点上,进入Flink的bin目录,启动Flink Standalone集群,具体操作如下所示。

[hadoop@hadoop01 flink]$ cd bin/
[hadoop@hadoop01 bin]$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop01.
Starting taskexecutor daemon on host hadoop01.
Starting taskexecutor daemon on host hadoop02.
Starting taskexecutor daemon on host hadoop03.

(2)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。

[root@hadoop01 ~]# nc  -lk  9999

(3)提交Flink作业
将IDEA打好的learningFlink1.20-1.0-SNAPSHOT.jar包上传至hadoop01节点,然后通过Flink脚本提交WordCount作业,具体操作如下所示。

[hadoop@hadoop01 flink]$ bin/flink  run  -m  hadoop01:8081  -c  com.yangjun.WordCount 	/home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jar
Job has been submitted with JobID b6a1b3473c28b169735ebb24ab050547

(4)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。

[root@hadoop01 ~]# nc  -lk  9999
flink
flink
flink

(5)查看运行结果
由于hadoop01、hadoop02和hadoop03节点都是TaskManager角色,所以提交的WordCount作业可能运行在其中任何一个节点中。如果想查看WordCount作业的输出结果,需要逐个查看每个节点的日志文件输出结果。根据查看情况,定位到作业运行在hadoop01节点,此时可进入Flink的log目录查看输出结果,具体操作如下所示。

[hadoop@hadoop01 flink]$ cd log/
[hadoop@hadoop01 log]$ tail flink-hadoop-taskexecutor-0-hadoop01.out 
(flink,1)
(flink,2)
(flink,3)

4.应用模式部署
在应用模式中,不会提前启动Flink集群,所以不能使用start-cluster.sh脚本启动集群服务。这里需要使用bin目录下的standalone-job.sh脚本单独启动JobManager服务,具体操作步骤如下所示。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。

[root@hadoop01 ~]# nc  -lk  9999

(2)准备应用程序包
将IDEA打好的项目包上传至hadoop01节点的/home/hadoop/app/flink/lib目录下,查看Flink lib目录下的项目包操作如下所示。

[hadoop@hadoop01 lib]$ pwd
/home/hadoop/app/flink/lib
[hadoop@hadoop01 lib]$ ls
learningFlink1.20-1.0-SNAPSHOT.jar

(3)启动JobManager服务
在hadoop01节点上,进入Flink的bin目录,启动JobManager服务并指定WordCount的作业入口,具体操作如下所示。

[hadoop@hadoop01 bin]$ ./standalone-job.sh  start  --job--classname  com.yangjun.WordCount
Starting standalonejob daemon on host hadoop01.

备注:这里通过–job–classname参数直接指定WordCount入口类,脚本会到lib目录中扫描所有的jar包,从而找到WordCount应用程序包。

(4)启动TaskManager服务
分别在hadoop01、hadoop02和hadoop03节点,进入Flink的bin目录启动TaskManager服务,具体操作如下所示。

[hadoop@hadoop01 bin]$ ./taskmanager.sh  start
Starting taskexecutor daemon on host hadoop01.
[hadoop@hadoop02 bin]$ ./taskmanager.sh  start
Starting taskexecutor daemon on host hadoop02.
[hadoop@hadoop03 bin]$ ./taskmanager.sh start
Starting taskexecutor daemon on host hadoop03.

(5)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。

[root@hadoop01 ~]# nc  -lk  9999
flink
flink
flink

(6)查看运行结果
通过查看定位到作业运行在hadoop01节点,此时可进入Flink的log目录查看输出结果,具体操作如下所示。

[hadoop@hadoop01 flink]$ cd  log/
[hadoop@hadoop01 log]$ tail  flink-hadoop-taskexecutor-0-hadoop01.out 
(flink,1)
(flink,2)
(flink,3)

四、 Flink YARN运行模式

1.YARN运行模式概述
Flink YARN运行模式是Flink利用Hadoop YARN进行资源管理和调度的部署方式。在YARN上,Flink作业动态申请资源,由YARN的ApplicationMaster启动JobManager和TaskManager。它支持会话模式和应用模式:前者允许在会话中提交多个作业,后者为每个作业启动单独的YARN应用。相比Standalone模式,YARN模式提供更高的资源利用率和弹性,支持资源隔离和优先级调度,使Flink作业能更好地共享大型分布式集群资源。
在这里插入图片描述
YARN会话模式下作业提交流程如上图所示。
(1)启动集群
如果没有集群,需要创建一个新的Session模式的集群。首先,将应用的配置文件上传至HDFS,然后通过客户端向YARN提交Flink创建集群的申请,YARN分配资源,在申请的YARN容器中初始化并启动Flink JobManager,在JobManager进程中运行YarnSessionClusterEntrypoint作为集群启动的入口,初始化Dispatcher、ResourceManager。启动相关的RPC服务,等待客户端通过Rest接口提交作业。

(2)提交作业
YARN集群准备好之后,开始提交作业。
1)Flink客户端通过Rest向Dispatcher提交作业。
2)Dispatcher是Rest接口,不负责实际的调度、执行方面的工作,当收到作业之后,为作业创建一个JobMaster,将工作交给JobMaster(负责作业调度、管理作业和Task的生命周期)。
3)JobMaster向Flink ResourceManager申请资源,开始调度作业的执行;初次提交作业,集群尚没有TaskManager,此时资源不足,开始申请资源。
4)Flink ResourceManager收到JobMaster的资源请求,如果当前有空闲的Slot,则将Slot分配给JobMaster,否则Flink ResourceManager将向YARN ResourceManager请求创建TaskManager。
5)Flink ResourceManager将资源请求加入等待请求队列,并通过心跳向YARN ResourceManager申请新的Container资源来启动TaskManager进程;YARN分配新的Container给TaskManager。
6)Flink ResourceManager从HDFS加载Jar文件等所需的相关资源,在容器中启动TaskManager。
7)TaskManager启动之后,向Flink ResourceManager进行注册,并把自己的Slot资源情况汇报给Flink ResourceManager。
8)Flink ResourceManager从等待队列中取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给了哪个JobMaster。
9)TaskManager向JobMaster提供Slot,JobMaster将Task调度到TaskManager的Slot上执行。

2.配置YARN运行模式的集群
Flink On YARN运行模式的运行依赖与Hadoop集群(需要提前搭建好Hadoop集群),不需要配置Flink集群,只需要配置一个客户端提交Flink作业即可。
(1)配置环境变量
由于节点资源有限,这里我们选择hadoop01节点作为Flink客户端。hadoop01节点需要增加Hadoop环境变量,才能将Flink作业提交到YARN集群,具体操作如下所示。

[hadoop@hadoop01 ~]$ vi  ~/.bashrc
JAVA_HOME=/home/hadoop/app/jdk
HADOOP_HOME=/home/hadoop/app/hadoop
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH HADOOP_HOME
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
#使配置生效
[hadoop@hadoop01 flink]$ source  ~/.bashrc

(2)启动Zookeeper集群
分别在hadoop01、hadoop02和hadoop03节点进入Zookeeper安装目录,启动Zookeeper服务,具体操作如下所示。

[hadoop@hadoop01 zookeeper]$ bin/zkServer.sh  start
[hadoop@hadoop02 zookeeper]$ bin/zkServer.sh  start
[hadoop@hadoop03 zookeeper]$ bin/zkServer.sh  start

(3)启动Hadoop集群
在hadoop01节点,进入Hadoop安装目录分别启动HDFS和YARN集群,具体操作如下所示。

[hadoop@hadoop01 hadoop]$ sbin/start-dfs.sh
[hadoop@hadoop01 hadoop]$ sbin/start-yarn.sh

3.会话模式部署
YANR的会话模式与Standalone集群略有不同,首先需要申请一个YARN会话即YARN Session,来启动Flink集群。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。

[root@hadoop01 ~]# nc  -lk  9999

(2)启动YARN会话
在hadoop01节点,进入Flink安装目录,启动YARN会话,具体操作如下所示。

[hadoop@hadoop01 flink]$ bin/yarn-session.sh  -nm  wordcount
Found Web Interface hadoop03:45753 of application 'application_1734486011928_0001'.
JobManager Web Interface: http://hadoop03:45753

核心参数说明。
-d:分离模式,可以使YARN Session在后台运行。
-jm(–jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(–name):配置在YARN UI界面上显示的任务名。
-qu(–queue):指定YARN队列名。
-tm(–taskManager):配置每个TaskManager所使用内存。
如上打印信息所示。YARN Session启动之后,会生成一个YARN application ID和一个Web UI地址。

(3)提交作业
Flink可以通过Web UI界面或者命令行来提交作业,由于Web UI界面操作较为简单,我们重点讲解通过命令行来提交Flink作业。
在hadoop01节点,新打开一个控制台,进入Flink安装目录,提交WordCount作业,具体操作如下所示。

[hadoop@hadoop01 flink]$ bin/flink  run  
-c  com.yangjun.WordCount /home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jar

Flink客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到。

(4)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。

[root@hadoop01 ~]# nc  -lk  9999
flink
flink
flink

(5)查看运行结果
通过查看定位到作业运行在hadoop02节点,此时可进入Hadoop安装路径下的logs/userlogs目录查看输出结果,具体操作如下所示。

[hadoop@hadoop02 container_e42_1734486011928_0001_01_000002]$ tail  taskmanager.out 
(flink,1)
(flink,2)
(flink,3)
[hadoop@hadoop02 container_e42_1734486011928_0001_01_000002]$ pwd
/home/hadoop/app/hadoop/logs/userlogs/application_1734486011928_0001/container_e42_1734486011928_0001_01_000002

4.应用模式部署
在Flink On YARN运行模式中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的Flink作业,从而启动一个Flink集群。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。

[root@hadoop01 ~]# nc  -lk  9999

(2)提交作业
在hadoop01节点,进入Flink安装目录,直接提交WordCount作业,具体操作如下所示。

[hadoop@hadoop01 flink]$ bin/flink  run-application  -t  yarn-application 
-c  com.yangjun.WordCount  /home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jar
......
YARN application has been deployed successfully.
Found Web Interface hadoop02:35108 of application 'application_1734486011928_0002'.

(3)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。

[root@hadoop01 ~]# nc  -lk  9999
flink
flink
flink

(4)查看运行结果
通过查看定位到作业运行在hadoop02节点,此时可进入Hadoop安装路径下的logs/userlogs目录查看输出结果,具体操作如下所示。

[hadoop@hadoop02 container_e42_1734486011928_0002_01_000002]$ tail taskmanager.out 
(flink,1)
(flink,2)
(flink,3)
[hadoop@hadoop02 container_e42_1734486011928_0002_01_000002]$ pwd
/home/hadoop/app/hadoop/logs/userlogs/application_1734486011928_0002/container_e42_1734486011928_0002_01_000002

到这里为止,我们已经成功完成了Flink On Standalone集群和Flink On YANR集群的搭建,同时分别进行了会话模式和应用模式的作业部署。


标签:flink,hadoop01,Flink,作业,hadoop,YARN,Flink1.20,亲测,分布式
From: https://blog.csdn.net/yangjun_1985/article/details/144607132

相关文章

  • Spring Boot 集成 Zookeeper:构建高可用分布式应用基石
    SpringBoot集成Zookeeper:构建高可用分布式应用基石在当今分布式系统蓬勃发展的时代,确保服务的高可用性、可靠性以及协调一致性至关重要。SpringBoot作为广受欢迎的Java开发框架,为快速构建应用提供了便捷,而Zookeeper则像是分布式世界里的“协调大师”,掌控着集群中......
  • 解锁分布式系统的关键:Spring Boot 与 Redis 分布式锁实战
    解锁分布式系统的关键:SpringBoot与Redis分布式锁实战在当今分布式系统架构广泛应用的时代,如何确保多个实例或线程在访问共享资源时的一致性和正确性,成为了开发人员面临的关键挑战之一。分布式锁作为解决这类问题的核心工具,在众多场景中发挥着不可或缺的作用。本文将深......
  • JetBrains WebStorm 2024 破解教程附资源(亲测可用)
    1、下载安装包WebStorm20242、安装教程1、双击安装,弹窗安装对话框  2、更改安装目录至D盘,点击下一步 3、 都进行勾选,点击下一步 4、默认,点击安装 5、安装过程中,等待安装完成,选择之后重启,点击完成 6、激活,打开随行下载的文件夹,找到激活工具,双击执行  ......
  • 鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作
    鸿蒙技术分享:HarmonyOSNext深度解析:分布式能力与跨设备协作实战随着万物互联时代的到来,操作系统作为连接设备、应用与用户体验的核心,扮演着不可或缺的角色。华为最新发布的HarmonyOSNext(鸿蒙操作系统下一代版本)不仅在技术架构上实现了颠覆性升级,更在生态体验上迈向了一个新的......
  • 分布式事务
    分布式事务名词解析全局事务:整个分布式事务分支事务:分布式事务中包含的每个子系统的事务最终一致性:各分支事务分别执行并提交,如果有不一致的情况,想办法补偿恢复,达到数据的最终一致性强一致性:各事务执行完业务不要提交,等待彼此结束,之后统一提交或回滚XA模式强一致性分阶......
  • 老生常谈——分布式限流:部分Sentinal源码解读
    基础知识HTTPCODE=429“请求过多”A.限流的类型服务端客户端限流的标的IP用户...基本要求准确限制过量的请求。低延时。限流器不能拖慢HTTP响应时间。尽量占用较少的内存。这是一个分布式限流器,可以在多个服务器或者进程之间共享。......
  • 关于分布式锁的的思考
    关于分布式锁的的思考结论先行:对于分布式锁我们在考虑不同方案的时候需要先思考需要的效果是什么?为了效率(efficiency),协调各个客户端避免做重复的工作。即使锁偶尔失效了,只是可能把某些操作多做一遍而已,不会产生其它的不良后果。比如重复发送了一封同样的email(当然这取决于......
  • HDFS分布式存储的高可用,高性能和高吞吐量架构设计
    胡弦,视频号2023年度优秀创作者,互联网大厂P8技术专家,SpringCloudAlibaba微服务架构实战派(上下册)和RocketMQ消息中间件实战派(上下册)的作者,资深架构师,技术负责人,极客时间训练营讲师,四维口袋KVP最具价值技术专家,技术领域专家团成员,2021电子工业出版社年度优秀作者,获得2023电......
  • Transformer大数据分布式因果推断在美团履约平台的探索与实践13
     1.背景中国有句古话:“民以食为天”。对食物的分析和理解,特别是识别菜肴的食材,在健康管理、卡路里计算、烹饪艺术、食物搜索等领域具有重要意义。但是,算法技术尽管在目标检测[1]-[3]、通用场景理解[4][5]和跨模态检索[6]-[8]方面取得了很大进展,却没有在食物相关的场景中取得......
  • Springboot(五十四)SpringBoot3实现redis分布式锁
    我们在Springboot项目中分别整合了redis和redission框架。 下边我记录一下再框架中分别使用redis和redission实现分布式锁的代码。 一:redis-lua脚本实现分布式锁lua本身是不具备原子性的,但由于Redis的命令是单线程执行的,它会把整个Iua脚本作为一个命令执行,会阻塞其间接......