环境: Flink 1.13.6和Flink 1.14.4
yarn-session模式:
--启动yarn seeion
bin/yarn-session.sh \
-s 8 \
-jm 4g \
-tm 16g \
-nm yarn-session-flink \
-d
yarn-session.sh -jm 1g -tm 8g -s 4 -d
参数解释:
-jm 1024 表示jobmanager 1024M内存
-tm 1024表示taskmanager 1024M内存
-s 每一个TaskManager上的slots数量。
-d 任务后台运行
-nm,--name YARN上为一个自定义的应用设置一个名字
-D<property=value> 动态属性 类似于这种-Dparallelism.default=3
-q,--query显示可用的YARN资源(内存,内核)
-qu,--queue 指定YARN队列。
-t,--ship 传输指定目录下的文件(t用于传输)
-nl,--nodeLabel 为YARN应用程序指定YARN节点标签
-z,--Zookeeper Namespace 命名空间,用于创建高可用模式下的Zookeeper子路径
-j,--jar <arg> Flink jar文件的路径
--提交到创建的这个yarn-session上执行
flink run -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar
flink run -t yarn-session -Dyarn.application.id=application_1650018331890_0018 -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar
flink run -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar
yarn-per-job模式:
提交任务的时候如果需要自己指定tm和jm的内存大小,可以注释掉配置文件里面相关的内存配置。
--旧版本的flink提交per-job任务是通过 -m yarn-cluster来提交的
flink run -m yarn-cluster -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar
参数列表
./flink run \
-m yarn-cluster \
-yjm 1024 \
-ytm 1024 \
-ynm ?????? \
-c org.apache.flink.examples.java.wordcount.WordCount \
-yj /u01/isi/application/component/flink-1.13.6/examples/batch/WordCount.jar
-m 执行模式为yarn-cluster
也可以指定要连接的JobManager的地址。使用这个标志可以连接到配置中指定的不同的JobManager。注意:只有高可用性配置为NONE时才会考虑此选项。
-yjm 指定JobManager所在的Container内存。单位:MB
-ytm 每一个TaskManager Container的内存,单位MB。
-ys 每一个TaskManager中slots的数量。
-ynm YARN中application的名称。
-c 指定Job对应的jar包中主函数所在类名。
-yj,--yarnjar <arg> jar包位置
-yt,--yarnship 传输指定目录下的文件(t用于传输)
-yqu,--yarnqueue <arg> 指定yarn队列
-yD <property=value> 自定义参数
-yid,--yarnapplicationId <arg> 指定yarnid执行
-yq,--yarnquery 显示可用的YARN资源(内存,核心)
-d,--detached 后台执行
--新版本的Flink可以通过-t参数提交统一格式的任务
flink run -t yarn-per-job -Dyarn.application.name=nmmd -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar
参数列表
./bin/flink run \
# 指定yarn的Per-job模式,-t等价于-Dexecution.target
-t yarn-per-job \
# yarn应用的自定义name
-Dyarn.application.name=consumerDemo \
# 未指定并行度时的默认并行度值, 该值默认为1
-Dparallelism.default=3 \
# JobManager进程的内存
-Djobmanager.memory.process.size=2048mb \
# TaskManager进程的内存
-Dtaskmanager.memory.process.size=2048mb \
# 每个TaskManager的slot数目, 最佳配比是和vCores保持一致
-Dtaskmanager.numberOfTaskSlots=2 \
# 防止日志中文乱码
-Denv.java.opts="-Dfile.encoding=UTF-8" \
# 支持火焰图, Flink1.13新特性, 默认为false, 开发和测试环境可以开启, 生产环境建议关闭
-Drest.flamegraph.enabled=true \
# 入口类
-c xxxx.MainClass \
# 提交Job的jar包
xxxx.jar
-t
给定应用程序的部署目标,相当于“run”。-Dexecution.target。
对于“run”操作,当前可用的目标是:“remote”,“local”,“kubernetes-session”,“yarn-per-job”,“yarn-session”。
对于“run-application”操作,当前可用的目标是:“kubernetes-application”,“yarn-application”。
--list 下面需要的FlinkJobID可以通过这个命令查看
参数:
-a,--all显示所有程序及其jobid
-r,--running只显示正在运行的程序及其jobid
-s,--scheduled只显示已调度的程序及其jobid
flink list \
-a \
-t yarn-session \
-Dyarn.application.id=application_1650018331890_0010
--cancel 此时无法指定savepoint
flink cancel \
-t yarn-session \
# Yarn的ApplicationID值, 可以通过Yarn的webUI直接查看
-Dyarn.application.id=${YarnApplicationID} \
# Flink的JobID, 可以通过Yarn找到Flink的WebUI进行查看
${FlinkJobID}
--stop 可以指定savepoint,执行完成可以看到savepoint的具体地址,用于Job启动时的savepoint地址
参数:
-d,--drain 在获取保存点和停止连接之前发送MAX_WATERMARK
-p,--savepointPath <savepointPath> savepoint保存地址,等价于-Dstate.savepoints.dir=xxx
flink stop \
-t yarn-session \
-Dyarn.application.id=${YarnApplicationID} \
# 指定savepoint存放位置
--savepointPath hdfs://xx:8020/test/checkpoint_test \
${FlinkJobID}
--从savepoint启动
./bin/flink run \
-t yarn-session \
-Djobmanager.memory.process.size=2048mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Denv.java.opts="-Dfile.encoding=UTF-8" \
-Drest.flamegraph.enabled=true \
# 指定savepoint地址
--fromSavepoint hdfs://xx:8020/test/checkpoint_test/savepoint-e28bdd-ef7febad087e \
-c xxx \
xxx.jar
yarn-session.sh \
-s 8 \
-jm 4g \
-tm 16g \
-nm yarn-session-flink \
-d
标签:Flink,--,flink,jar,yarn,application,session,提交,整理 From: https://www.cnblogs.com/zzz01/p/17486118.html