首页 > 其他分享 >Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器

时间:2023-09-02 11:35:34浏览次数:51  
标签:1.17 Flink standalone flink jar sh atguigu node001


集群角色

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_flink

集群启动

如果是部署在本地,本地访问,无需进行任何配置,直接启动即可。

如果是部署在服务器,需要远程访问,则需要将flink.conf中的localhost修改为服务器IP地址或是0.0.0.0

节点服务器

hadoop102

hadoop103

hadoop104

角色

JobManagerTaskManager

TaskManager

TaskManager

[atguigu@node001 module]$ cd flink
[atguigu@node001 flink]$ cd flink-1.17.0/
[atguigu@node001 flink-1.17.0]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node001.
Starting taskexecutor daemon on host node001.
Starting taskexecutor daemon on host node002.
Starting taskexecutor daemon on host node003.
[atguigu@node001 flink-1.17.0]$ jpsall 
================ node001 ================
3408 Jps
2938 StandaloneSessionClusterEntrypoint
3276 TaskManagerRunner
================ node002 ================
2852 TaskManagerRunner
2932 Jps
================ node003 ================
2864 TaskManagerRunner
2944 Jps
[atguigu@node001 flink-1.17.0]$

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_kubernetes_02

WebUI提交作业

打jar包maven插件配置

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers combine.children="append">
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_kubernetes_03

com.atguigu.wc.WordCountStreamUnboundedDemo

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_服务器_04

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_运行模式_05

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_flink_06

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_运行模式_07

命令行提交作业

bin/flink run -m node001:8081 -c com.atguigu.wc.WordCountStreamUnboundedDemo ../jar/FlinkTutorial-1.17-1.0-SNAPSHOT.jar

连接成功
Last login: Fri Jun 16 14:44:01 2023 from 192.168.10.1
[atguigu@node001 ~]$ cd /opt/module/flink/flink-1.17.0/

[atguigu@node001 flink-1.17.0]$ cd bin

[atguigu@node001 bin]$ ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node001.
Starting taskexecutor daemon on host node001.
Starting taskexecutor daemon on host node002.
Starting taskexecutor daemon on host node003.

[atguigu@node001 bin]$ jpsall 
================ node001 ================
2723 TaskManagerRunner
2855 Jps
2380 StandaloneSessionClusterEntrypoint
================ node002 ================
2294 TaskManagerRunner
2367 Jps
================ node003 ================
2292 TaskManagerRunner
2330 Jps

[atguigu@node001 bin]$ cd ..
         
[atguigu@node001 flink-1.17.0]$ bin/flink run -m node001:8081 -c com.atguigu.wc.WordCountStreamUnboundedDemo ../jar/FlinkTutorial-1.17-1.0-SNAPSHOT.jar 
Job has been submitted with JobID 59ae9d6532523b0c48cdb8b6c9105356

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_运行模式_08

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_kubernetes_09

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_运行模式_10

部署模式介绍

在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。

它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行——客户端(Client)还是JobManager。

Standalone运行模式

独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。

精简版脚本:

bin/standalone-job.sh start --job-classname com.atguigu.wc.WordCountStreamUnboundedDemo

bin/taskmanager.sh start

bin/taskmanager.sh stop

bin/standalone-job.sh stop

详细展示版:

[atguigu@node001 ~]$ cd /opt/module/flink/flink-1.17.0/bin

[atguigu@node001 bin]$ ./stop-cluster.sh 
Stopping taskexecutor daemon (pid: 2723) on host node001.
Stopping taskexecutor daemon (pid: 2294) on host node002.
Stopping taskexecutor daemon (pid: 2292) on host node003.
Stopping standalonesession daemon (pid: 2380) on host node001.

[atguigu@node001 bin]$ jpsall 
================ node001 ================
5120 Jps
================ node002 ================
3212 Jps
================ node003 ================
3159 Jps

[atguigu@node001 bin]$ ls
bash-java-utils.jar  flink             historyserver.sh          kubernetes-session.sh      sql-client.sh      start-cluster.sh           stop-zookeeper-quorum.sh  zookeeper.sh
config.sh            flink-console.sh  jobmanager.sh             kubernetes-taskmanager.sh  sql-gateway.sh     start-zookeeper-quorum.sh  taskmanager.sh
find-flink-home.sh   flink-daemon.sh   kubernetes-jobmanager.sh  pyflink-shell.sh           standalone-job.sh  stop-cluster.sh            yarn-session.sh

[atguigu@node001 bin]$ cd ../lib/

[atguigu@node001 lib]$ ls
flink-cep-1.17.0.jar              flink-dist-1.17.0.jar        flink-table-api-java-uber-1.17.0.jar   FlinkTutorial-1.17-1.0-SNAPSHOT.jar  log4j-core-2.17.1.jar
flink-connector-files-1.17.0.jar  flink-json-1.17.0.jar        flink-table-planner-loader-1.17.0.jar  log4j-1.2-api-2.17.1.jar             log4j-slf4j-impl-2.17.1.jar
flink-csv-1.17.0.jar              flink-scala_2.12-1.17.0.jar  flink-table-runtime-1.17.0.jar         log4j-api-2.17.1.jar

[atguigu@node001 lib]$ cd ../

[atguigu@node001 flink-1.17.0]$ bin/standalone-job.sh start --job-classname com.atguigu.wc.WordCountStreamUnboundedDemo
Starting standalonejob daemon on host node001.

[atguigu@node001 flink-1.17.0]$ jpsall 
================ node001 ================
5491 StandaloneApplicationClusterEntryPoint
5583 Jps
================ node002 ================
3326 Jps
================ node003 ================
3307 Jps

[atguigu@node001 flink-1.17.0]$ bin/taskmanager.sh 
Usage: taskmanager.sh (start|start-foreground|stop|stop-all)

[atguigu@node001 flink-1.17.0]$ bin/taskmanager.sh start
Starting taskexecutor daemon on host node001.

[atguigu@node001 flink-1.17.0]$ jpsall 
================ node001 ================
5491 StandaloneApplicationClusterEntryPoint
5995 Jps
5903 TaskManagerRunner
================ node002 ================
3363 Jps
================ node003 ================
3350 Jps

[atguigu@node001 flink-1.17.0]$ bin/taskmanager.sh stop
Stopping taskexecutor daemon (pid: 5903) on host node001.

[atguigu@node001 flink-1.17.0]$ bin/standalone-job.sh stop
No standalonejob daemon (pid: 5491) is running anymore on node001.

[atguigu@node001 flink-1.17.0]$ xcall jps
=============== node001 ===============
6682 Jps
=============== node002 ===============
3429 Jps
=============== node003 ===============
3419 Jps

YARN运行模式_环境准备

YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。

[atguigu@node001 flink-1.17.0]$ source /etc/profile.d/my_env.sh 
[atguigu@node001 flink-1.17.0]$ myhadoop.sh s
Input Args Error...
[atguigu@node001 flink-1.17.0]$ myhadoop.sh start
 ================ 启动 hadoop集群 ================
 ---------------- 启动 hdfs ----------------
Starting namenodes on [node001]
Starting datanodes
Starting secondary namenodes [node003]
 --------------- 启动 yarn ---------------
Starting resourcemanager
Starting nodemanagers
 --------------- 启动 historyserver ---------------
[atguigu@node001 flink-1.17.0]$ jpsall 
================ node001 ================
9200 JobHistoryServer
8416 NameNode
8580 DataNode
9284 Jps
8983 NodeManager
================ node002 ================
3892 ResourceManager
3690 DataNode
4365 Jps
4015 NodeManager
================ node003 ================
3680 DataNode
3778 SecondaryNameNode
3911 NodeManager
4044 Jps
[atguigu@node001 flink-1.17.0]$

YARN运行模式_会话模式

这段命令是用于启动 Apache Flink 的 YARN 会话(session)的脚本,如下是每个选项和参数的含义:

  • yarn-session.sh:这是 Apache Flink 提供的用于在 YARN 上启动会话的脚本。
  • -d:这是一个选项,表示以分离模式(detached mode)启动会话。在分离模式下,会话将在后台运行,并且脚本会立即返回。
  • -nm test:这是另一个选项,用于指定会话的名称。在这个例子中,会话的名称被设置为 “test”。

综合起来,该命令的目的是在 YARN 上启动一个名为 “test” 的 Apache Flink 会话,并以分离模式运行。启动后,该会话将在后台运行,并且命令行提示符会立即返回,允许您继续执行其他操作。

[atguigu@node001 bin]$ ./yarn-session.sh --help
[atguigu@node001 bin]$ ./yarn-session.sh
[atguigu@node001 bin]$ ./yarn-session.sh -d -nm test

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_服务器_11

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_jar_12

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_jar_13

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_服务器_14

YARN运行模式_会话模式的停止

单作业模式部署

在YARN环境中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群。

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_flink_15

停止job:

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_运行模式_16

YARN运行模式_单作业模式

单作业模式部署

(1)执行命令提交作业

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_运行模式_17

YARN运行模式_应用模式

应用模式同样非常简单,与单作业模式类似,直接执行

flink run-application

命令即可。如:

bin/flink run-application -t yarn-application -c com.atguigu.wc.WordCountStreamUnboundedDemo ./FlinkTutorial-1.17-1.0-SNAPSHOT.jar
[atguigu@node001 flink-1.17.0]$ bin/flink run-application -t yarn-application -c com.atguigu.wc.WordCountStreamUnboundedDemo ./FlinkTutorial-1.17-1.0-SNAPSHOT.jar 

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink/flink-1.17.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2023-06-19 14:31:05,693 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-atguigu.
2023-06-19 14:31:05,693 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-atguigu.
2023-06-19 14:31:06,142 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/module/flink/flink-1.17.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-19 14:31:06,632 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node002/192.168.10.102:8032
2023-06-19 14:31:07,195 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_服务器_18

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_运行模式_19

jar存放在hdfs的情况:

[atguigu@node001 flink-1.17.0]$ bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://node001:8020/flink-dist" -c com.atguigu.wc.WordCountStreamUnboundedDemo hdfs://node001:8020/flink-jars/FlinkTutorial-1.17-1.0-SNAPSHOT.jar

K8S 运行模式

容器化部署是如今业界流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。基本原理与YARN是类似的,具体配置可以参见官网说明,这里我们就不做过多讲解了。

历史服务器History Server

运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。

Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_服务器_20

启动historyserver:

[atguigu@node001 flink-1.17.0]$ bin/historyserver.sh start

Starting historyserver daemon on host node001.

[atguigu@node001 flink-1.17.0]$ bin/flink run -t yarn-per-job -d -c com.atguigu.wc.WordCountStreamUnboundedDemo ../jar/FlinkTutorial-1.17-1.0-SNAPSHOT.jar

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink/flink-1.17.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器_服务器_21




标签:1.17,Flink,standalone,flink,jar,sh,atguigu,node001
From: https://blog.51cto.com/zhangxueliang/7331347

相关文章

  • Flink 1.17教程:WebUI提交作业及打jar包maven插件配置
    打jar包maven插件配置<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version>......
  • Flink 1.17教程:集群角色及集群启动
    集群角色集群启动如果是部署在本地,本地访问,无需进行任何配置,直接启动即可。如果是部署在服务器,需要远程访问,则需要将flink.conf中的localhost修改为服务器IP地址或是0.0.0.0节点服务器hadoop102hadoop103hadoop104角色JobManagerTaskManagerTaskManagerTaskManager[atguigu@node001......
  • Flink 1.17教程:wordcount maven工程java代码示例(批、流实现方式)
    批、流实现wordcount代码示例pom.xml<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><......
  • Flink 1.17教程:DataStream实现Wordcount——读socket(无界流)
    pom.xml<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>fli......
  • Java代码:flink wordcount代码示例及解读
    WordCountWordCountPojo.java代码packagewordCount;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.Executio......
  • Flink On K8s实战课程2023
    点击下载:FlinkOnK8s实战课程2023  提取码:2y46目前项目中用到Flink作为离线ETL处理构建相关的特征系统,而特征系统主要是为数据科学家、数据工程师、机器学习工程师去使用,用来去构建AI特征库,用来做模型的训练、用来做数据测试以及一些数据的预测及模型的在线服务,主要特征系统是......
  • 16、Flink 的table api与sql之连接外部系统_ 读写外部系统的连接器和格式以及Apache H
    (文章目录)本文介绍了ApacheHive连接器的使用,以具体的示例演示了通过java和flinksqlcli创建catalog。本文依赖环境是hadoop、zookeeper、hive、flink环境好用,本文内容以flink1.17版本进行介绍的,具体示例是在1.13版本中运行的(因为hadoop集群环境是基于jdk8的,flink1.17版本需......
  • Flink的3中API
    DataStream/DataSet/TableAPI是ApacheFlink提供的三种不同的API,用于处理不同类型的数据和实现不同的计算模型。1.DataStreamAPI:DataStreamAPI是基于流式数据的API,用于处理连续不断到达的数据流。它适用于实时数据处理和流式计算场景。DataStreamAPI提供了丰富的操作符和函......
  • flink的源码编译方法
    1、下载flink源码2、修改整数限制numUnapprovedLicenses---改成100(在pom.xml文件)3、执行编译命令./mvnwcleanpackage-DskipTests4、如果只想名义子工程拷贝根目录的mvnw执行脚本到对应子目录,然后再执行./mvnwcleanpackage-DskipTests5、代码修改后编译,如果碰到chec......
  • Flink 1.12.2样例
    pom.xml<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://mave......