首页 > 其他分享 >flink1.14.5集群(flink on yarn)部署1

flink1.14.5集群(flink on yarn)部署1

时间:2023-05-31 23:03:46浏览次数:42  
标签:flink yarn taskmanager high memory flink1.14 availability

先安装hadoop,yarn,zookeeper

 



配置环境变量

vim /etc/profile(注意新增了HADOOP_CLASSPATH变量)

export HADOOP_CLASSPATH=`/home/opt/hadoop-2.9.2/bin/hadoop classpath`

export FLINK_HOME=/home/opt/flink-1.14.5
export PATH=$PATH:$FLINK_HOME/bin

source /etc/profile

 

 

 



配置flink-conf.xml

jobmanager.rpc.address: hadoop34
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
yarn.application-attempts: 4
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop34:9000/flink/ha/
high-availability.zookeeper.quorum: hadoop34:2181,hadoop35:2181,hadoop36:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
state.backend: filesystem
state.checkpoints.dir: hdfs://hadoop34:9000/flink-checkpoints
state.savepoints.dir: hdfs://hadoop34:9000/flink-savepoints
jobmanager.execution.failover-strategy: region
rest.port: 8081
rest.address: 0.0.0.0
web.submit.enable: true
web.cancel.enable: true
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.refresh-interval: 10000

 

 



同步配置文件到其它机器

scp flink-conf.yaml root@hadoop35:/home/opt/flink-1.14.5/conf/
scp flink-conf.yaml root@hadoop36:/home/opt/flink-1.14.5/conf/

 

 



启动 yarn-session

yarn-session.sh test

flink1.14.5集群(flink on yarn)部署1_yarn

 

打开http://10.99.69.35:8081

flink1.14.5集群(flink on yarn)部署1_hadoop_02

 

 

 附件一个文本文档

Flink on yarn安装

集群启动步骤

步骤1:用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
步骤2:ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
步骤3:ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。
步骤4:ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5:一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
步骤6:NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
步骤7:各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。  在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
步骤8:应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己


配置环境变量
export HADOOP_CLASSPATH=`/usr/local/hadoop/bin/hadoop classpath`
export FLINK_HOME=/usr/local/flink


vim /etc/profile(注意新增了HADOOP_CLASSPATH变量)

export HADOOP_CLASSPATH=`/home/opt/hadoop-2.9.2/bin/hadoop classpath`

export FLINK_HOME=/home/opt/flink-1.14.5
export PATH=$PATH:$FLINK_HOME/bin

source /etc/profile




修改Hadoop集群的yarn-site.xml配置
<!-- master(JobManager)失败重启的最大尝试次数-->
<property>
    <name>yarn.resourcemanager.am.max-attempts</name>
    <value>4</value>
    <description>The maximum number of application master execution attempts.</description>
</property>
<!-- 关闭yarn内存检查 -->
<!-- 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认为 true -->
<!-- 因为对于 flink 使用 yarn 模式下,很容易内存超标,这个时候 yarn 会自动杀掉 job,因此需要关掉-->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>










修改flink conf配置
#用户提交作业失败时,重新执行次数
yarn.application-attempts: 4

#设置Task在所有节点平均分配
cluster.evenly-spread-out-slots: true



jobmanager.rpc.address: hadoop34
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
yarn.application-attempts: 4
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop34:9000/flink/ha/
high-availability.zookeeper.quorum: hadoop34:2181,hadoop35:2181,hadoop36:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
state.backend: filesystem
state.checkpoints.dir: hdfs://hadoop34:9000/flink-checkpoints
state.savepoints.dir: hdfs://hadoop34:9000/flink-savepoints
jobmanager.execution.failover-strategy: region
rest.port: 8081
rest.address: 0.0.0.0
web.submit.enable: true
web.cancel.enable: true
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.refresh-interval: 10000


编辑配置文件
vi flink-conf.yaml
# JobManager内存主要分为四部分:JVM Heap、Off-Heap Memory、JVM Metaspace、JVM Overhead
# JobManager总内存设置为2048m,则JVM Overhead可根据0.1的fraction换算得到204.8m,即JVM Overhead内存为205m
# JVM Metaspace默认为256m
# Off-Heap Memory默认为128m
# JVM Heap最终被推断为2048m-205m-256m - 128m = 1459m,即1.42g
# 但gc算法会占用一小部分固定内存作为Non-Heap,占用大小为0.05g
# JVM Heap实际大小为1.42g - 0.05g = 1.38g
jobmanager.rpc.address: node1

jobmanager.rpc.port: 6123
#JobManager jvm堆大小,主要取决于运行的作业数量、作业结构及用户代码的要求
jobmanager.heap.size: 1024m
#进程总内存
jobmanager.memory.process.size: 2048m

taskmanager.memory.process.size: 4096m
#每个TaskManager提供的任务Slots数量,建议与cpu核数一致
taskmanager.numberOfTaskSlots: 4

parallelism.default: 1

env.hadoop.conf.dir: /usr/local/hadoop/etc/hadoop

high-availability: zookeeper
# flink在重启时,尝试的最大次数
yarn.application-attempts: 10

high-availability.storageDir: hdfs://ns/flink/recovery

high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

high-availability.zookeeper.path.root: /flink
#用于存储和检查点状态
state.backend: filesystem

state.checkpoints.dir: hdfs://ns/flink/checkpoints

state.savepoints.dir: hdfs://ns/flink/savepoints
#故障转移策略
jobmanager.execution.failover-strategy: region

rest.port: 8081
#是否启动web提交
web.submit.enable: true

io.tmp.dirs: /usr/local/flink/data/tmp

env.log.dir: /usr/local/flink/data/logs

taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop

historyserver.web.address: 0.0.0.0

historyserver.web.port: 8082

historyserver.archive.fs.refresh-interval: 10000










启动flink集群
bin/yarn-session.sh -n 2
# 主节点中执行
bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 1

# -tm 表示每个 TaskManager 的内存大小
# -s 表示每个 TaskManager 的 slots 数量
# -d 表示以后台程序方式运行



新建文件夹
mkdir -p /usr/local/flink/data/tmp
mkdir -p /usr/local/flink/data/logs
添加jar包
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar


启动flink yarn session模式
yarn-session.sh


# 查看命令参数
./bin/yarn-session.sh -h
# 用法:
   必选
     -n,--container <arg>   分配多少个yarn容器 (=taskmanager的数量)
   可选
     -D <arg>                        动态属性
     -d,--detached                   独立运行
     -jm,--jobManagerMemory <arg>    JobManager的内存 [in MB]
     -nm,--name                     在YARN上为一个自定义的应用设置一个名字
     -q,--query                      显示yarn中可用的资源 (内存, cpu核数)
     -qu,--queue <arg>               指定YARN队列.
     -s,--slots <arg>                每个TaskManager使用的slots数量
     -tm,--taskManagerMemory <arg>   每个TaskManager的内存 [in MB]
     -z,--zookeeperNamespace <arg>   针对HA模式在zookeeper上创建NameSpace



client必须要设置HADOOP_HOME,YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。

创建一个YARN模式的flink集群:

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
1
提交一个flink job到flink集群:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
1
这次提交flink job,虽然没有指定对应yarn application的信息,确可以提交到对应的flink集群,原因在于/tmp/.yarn-properties-${user}文件中保存了上一次创建yarn session的集群信息。所以如果同一用户在同一机器上再次创建一个yarn session,则这个文件会被覆盖掉。

那如果删掉/tmp/.yarn-properties-${user}或者在另一个机器上提交作业能否提交到预期到yarn session中呢?这也是可以的,如果配置了HighAvailability,则可以根据cluster-id,从zookeeper上获取到JobManager的地址和端口,从而提交作业。

 

flink1.14.5集群(flink on yarn)部署1_yarn_03

标签:flink,yarn,taskmanager,high,memory,flink1.14,availability
From: https://blog.51cto.com/lenglingx/6390804

相关文章

  • Flink CEP的使用
    探索如何使用FlinkCEP写在前面前言的前言在学习Flink的过程中,我看过很多教程。无论是视频还是博文,几乎都把FlinkCEP作为进阶内容来讲授。究其原因,大概是CEP涉及到的计算机基础知识很多,而我对于诸如NFA、DFA之类名词的印象,基本只停留在很多年前编译原理的课本上。那么如何在仅了解......
  • 【Flink系列十八】History Server 重新登场,如何跟Yarn进行集成
    先看Flink的官方文档本文适用于Flink-1.11+HistoryServer至少Flink-1.16+JobManagerThearchivingofcompletedjobshappensontheJobManager,whichuploadsthearchivedjobinformationtoafilesystemdirectory.Youcanconfigurethedirectorytoarchiveco......
  • Hadoop之YARN详解
    YARN的由来从Hadoop2开始,官方把资源管理单独剥离出来,主要是为了考虑后期作为一个公共的资源管理平台,任何满足规则的计算引擎都可以在它上面执行。所以YARN可以实现HADOOP集群的资源共享,不仅仅可以跑MapRedcue,还可以跑Spark、Flink。YARN架构分析咱们之前部署Hadoop集群的时候也......
  • yarn安装报错网络问题解决方案
    yarn安装报错网络问题解决方案报错为infoThereappearstobetroublewithyournetworkconnection.Retrying...解决方案:更换安装依赖的镜像,使用淘宝镜像安装安装好后更换淘宝镜像yarnconfigsetregistryhttps://registry.npm.taobao.org移除原代理yarn......
  • flink计算引擎
    第1章Flink简介1.1初识Flink1)Flink项目的理念是:“ApacheFlink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。2)ApacheFlink是一个框架和分布式处理引擎,用于对无界(nclk9999)和有界数据(一个文档)流进行有状态计算。Flink被设计在所有......
  • Flink白话解析Watermark
    一、摘要如果想使用Flink,Flink的Watermark是很难绕过去的概念。本文帮大家梳理Watermark概念 二、Watermark疑问1、Flink应用的常见需求是什么如公司运营一个官网,想统计下过去一分钟有多少用户访问官网。一分钟可以理解为Flink的窗口,在这一分钟统计有多少用户。窗口的作用......
  • macOS下由yarn与npm差异引发的Electron镜像地址读取问题
    记录macOS下由yarn与npm差异引发的Electron镜像地址读取问题写在前面:该问题仅仅出现在Linux和macOS上,Windows上不存在该问题!初始背景最近笔者重新拾起了Electron,把最新版Electron的官方文档阅读了一遍。众所周知,Electron作为依赖在安装的时候,其二进制文件下载在国内一直以来都......
  • MapReduce和Yarn原理
    MapReduce原理 问题1.什么是计算,什么是分布式计算?答案:计算指的是从海量数据中提取出有效的价值信息的过程(广义上解释),狭义上指的是1+1=2即:数学运算.分布式计算指的是多台机器协调,共同完成同1个计算任务.问题2:分布式计算的两种模式?答案:分散汇......
  • flink CEP 讲解 和实例
     1,Flink介绍  Flink是一个分布式的基于状态计算的流处理计算引擎,或者说框架,可以处理有边界流数据和无边界流数据,在内存中执行计算,而且具有任意扩展计算能力。最初由柏林工业大学的xxx小组研发,后被阿里巴巴收购。初略看起来,和spark功能类似,但是某些特征优于spark。 Flin......
  • 【深入浅出 Yarn 架构与实现】6-4 Container 生命周期源码分析
    本文将深入探讨AM向RM申请并获得Container资源后,在NM节点上如何启动和清理Container。将详细分析整个过程的源码实现。一、Container生命周期介绍Container的启动由ApplicationMaster通过调用RPC函数ContainerManagementProtocol#startContainers()发起请求,NM......