先安装hadoop,yarn,zookeeper
配置环境变量
vim /etc/profile(注意新增了HADOOP_CLASSPATH变量)
export HADOOP_CLASSPATH=`/home/opt/hadoop-2.9.2/bin/hadoop classpath`
export FLINK_HOME=/home/opt/flink-1.14.5
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
配置flink-conf.xml
jobmanager.rpc.address: hadoop34
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
yarn.application-attempts: 4
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop34:9000/flink/ha/
high-availability.zookeeper.quorum: hadoop34:2181,hadoop35:2181,hadoop36:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
state.backend: filesystem
state.checkpoints.dir: hdfs://hadoop34:9000/flink-checkpoints
state.savepoints.dir: hdfs://hadoop34:9000/flink-savepoints
jobmanager.execution.failover-strategy: region
rest.port: 8081
rest.address: 0.0.0.0
web.submit.enable: true
web.cancel.enable: true
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.refresh-interval: 10000
同步配置文件到其它机器
scp flink-conf.yaml root@hadoop35:/home/opt/flink-1.14.5/conf/
scp flink-conf.yaml root@hadoop36:/home/opt/flink-1.14.5/conf/
启动 yarn-session
yarn-session.sh test
打开http://10.99.69.35:8081
附件一个文本文档
Flink on yarn安装
集群启动步骤
步骤1:用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
步骤2:ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
步骤3:ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。
步骤4:ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5:一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
步骤6:NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
步骤7:各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。 在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
步骤8:应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己
配置环境变量
export HADOOP_CLASSPATH=`/usr/local/hadoop/bin/hadoop classpath`
export FLINK_HOME=/usr/local/flink
vim /etc/profile(注意新增了HADOOP_CLASSPATH变量)
export HADOOP_CLASSPATH=`/home/opt/hadoop-2.9.2/bin/hadoop classpath`
export FLINK_HOME=/home/opt/flink-1.14.5
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
修改Hadoop集群的yarn-site.xml配置
<!-- master(JobManager)失败重启的最大尝试次数-->
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>The maximum number of application master execution attempts.</description>
</property>
<!-- 关闭yarn内存检查 -->
<!-- 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认为 true -->
<!-- 因为对于 flink 使用 yarn 模式下,很容易内存超标,这个时候 yarn 会自动杀掉 job,因此需要关掉-->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
修改flink conf配置
#用户提交作业失败时,重新执行次数
yarn.application-attempts: 4
#设置Task在所有节点平均分配
cluster.evenly-spread-out-slots: true
jobmanager.rpc.address: hadoop34
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
yarn.application-attempts: 4
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop34:9000/flink/ha/
high-availability.zookeeper.quorum: hadoop34:2181,hadoop35:2181,hadoop36:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
state.backend: filesystem
state.checkpoints.dir: hdfs://hadoop34:9000/flink-checkpoints
state.savepoints.dir: hdfs://hadoop34:9000/flink-savepoints
jobmanager.execution.failover-strategy: region
rest.port: 8081
rest.address: 0.0.0.0
web.submit.enable: true
web.cancel.enable: true
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.refresh-interval: 10000
编辑配置文件
vi flink-conf.yaml
# JobManager内存主要分为四部分:JVM Heap、Off-Heap Memory、JVM Metaspace、JVM Overhead
# JobManager总内存设置为2048m,则JVM Overhead可根据0.1的fraction换算得到204.8m,即JVM Overhead内存为205m
# JVM Metaspace默认为256m
# Off-Heap Memory默认为128m
# JVM Heap最终被推断为2048m-205m-256m - 128m = 1459m,即1.42g
# 但gc算法会占用一小部分固定内存作为Non-Heap,占用大小为0.05g
# JVM Heap实际大小为1.42g - 0.05g = 1.38g
jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
#JobManager jvm堆大小,主要取决于运行的作业数量、作业结构及用户代码的要求
jobmanager.heap.size: 1024m
#进程总内存
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 4096m
#每个TaskManager提供的任务Slots数量,建议与cpu核数一致
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1
env.hadoop.conf.dir: /usr/local/hadoop/etc/hadoop
high-availability: zookeeper
# flink在重启时,尝试的最大次数
yarn.application-attempts: 10
high-availability.storageDir: hdfs://ns/flink/recovery
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink
#用于存储和检查点状态
state.backend: filesystem
state.checkpoints.dir: hdfs://ns/flink/checkpoints
state.savepoints.dir: hdfs://ns/flink/savepoints
#故障转移策略
jobmanager.execution.failover-strategy: region
rest.port: 8081
#是否启动web提交
web.submit.enable: true
io.tmp.dirs: /usr/local/flink/data/tmp
env.log.dir: /usr/local/flink/data/logs
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.refresh-interval: 10000
启动flink集群
bin/yarn-session.sh -n 2
# 主节点中执行
bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 1
# -tm 表示每个 TaskManager 的内存大小
# -s 表示每个 TaskManager 的 slots 数量
# -d 表示以后台程序方式运行
新建文件夹
mkdir -p /usr/local/flink/data/tmp
mkdir -p /usr/local/flink/data/logs
添加jar包
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
启动flink yarn session模式
yarn-session.sh
# 查看命令参数
./bin/yarn-session.sh -h
# 用法:
必选
-n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量)
可选
-D <arg> 动态属性
-d,--detached 独立运行
-jm,--jobManagerMemory <arg> JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列.
-s,--slots <arg> 每个TaskManager使用的slots数量
-tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
client必须要设置HADOOP_HOME,YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。
创建一个YARN模式的flink集群:
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
1
提交一个flink job到flink集群:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
1
这次提交flink job,虽然没有指定对应yarn application的信息,确可以提交到对应的flink集群,原因在于/tmp/.yarn-properties-${user}文件中保存了上一次创建yarn session的集群信息。所以如果同一用户在同一机器上再次创建一个yarn session,则这个文件会被覆盖掉。
那如果删掉/tmp/.yarn-properties-${user}或者在另一个机器上提交作业能否提交到预期到yarn session中呢?这也是可以的,如果配置了HighAvailability,则可以根据cluster-id,从zookeeper上获取到JobManager的地址和端口,从而提交作业。
标签:flink,yarn,taskmanager,high,memory,flink1.14,availability From: https://blog.51cto.com/lenglingx/6390804