Flink安装部署
local本地模式-了解
原理
操作
1.下载安装包
https://archive.apache.org/dist/flink/
2.上传flink-1.13.1-bin-scala_2.12.tgz到node1的指定目录
3.解压
tar -zxvf flink-1.13.1-bin-scala_2.12.tgz
4.如果出现权限问题,需要修改权限
chown -R root:root /export/server/flink-1.13.1
5.改名或创建软链接
mv flink-1.13.1 flink
ln -s /export/server/flink-1.13.1 /export/server/flink
测试
1.准备文件/root/words.txt
vim /root/words.txt
hello me you her
hello me you
hello me
hello
2.启动Flink本地“集群”
/export/server/flink/bin/start-cluster.sh
3.使用jps可以查看到下面两个进程
- TaskManagerRunner
- StandaloneSessionClusterEntrypoint
4.访问Flink的Web UI
slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。
5.执行官方示例
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out
6.停止Flink
/export/server/flink/bin/stop-cluster.sh
启动shell交互式窗口(目前所有Scala 2.12版本的安装包暂时都不支持 Scala Shell)
/export/server/flink/bin/start-scala-shell.sh local
执行如下命令
benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
退出shell
:quit
Standalone独立集群模式-了解
原理
操作
1.集群规划:
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Slave): TaskManager
- 服务器: node3(Slave): TaskManager
2.修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true
#历史服务器
jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
2.修改masters
vim /export/server/flink/conf/masters
node1:8081
3.修改slaves
vim /export/server/flink/conf/workers
node1
node2
node3
4.添加HADOOP_CONF_DIR环境变量
vim /etc/profile
export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
5.分发
scp -r /export/server/flink node2:/export/server/flink
scp -r /export/server/flink node3:/export/server/flink
scp /etc/profile node2:/etc/profile
scp /etc/profile node3:/etc/profile
或
for i in {2..3}; do scp -r flink node$i:$PWD; done
6.source
source /etc/profile
测试
1.启动集群,在node1上执行如下命令
/export/server/flink/bin/start-cluster.sh
或者单独启动
/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all
2.启动历史服务器
/export/server/flink/bin/historyserver.sh start
3.访问Flink UI界面或使用jps查看
4.执行官方测试案例
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
6.停止Flink集群
/export/server/flink/bin/stop-cluster.sh
Standalone-HA高可用集群模式-了解
原理
操作
1.集群规划
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Master + Slave): JobManager + TaskManager
- 服务器: node3(Slave): TaskManager
2.启动ZooKeeper
zkServer.sh status
zkServer.sh stop
zkServer.sh start
3.启动HDFS
/export/serves/hadoop/sbin/start-dfs.sh
4.停止Flink集群
/export/server/flink/bin/stop-cluster.sh
5.修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
增加如下内容
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
6.修改masters
vim /export/server/flink/conf/masters
hadoop-001:8081
hadoop-002:8081
7.同步
scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/
8.修改node2上的flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node2
9.重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
10.使用jps命令查看
发现没有Flink相关进程被启动
11.查看日志
cat /export/server/flink/log/flink-root-standalonesession-0-node1.log
发现如下错误
因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar
12.下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
下载地址
https://flink.apache.org/downloads.html
13.放入lib目录
cd /export/server/flink/lib
14.分发
for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done
15.重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
16.使用jps命令查看,发现三台机器已经ok
测试
1.访问WebUI
http://node1:8081/#/job-manager/config
http://node2:8081/#/job-manager/config
2.执行wc
/opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar
3.kill掉其中一个master
4.重新执行wc,还是可以正常执行
/opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar
3.停止集群
/export/server/flink/bin/stop-cluster.sh
Flink-On-Yarn-开发使用
原理
两种模式
Session会话模式
Job分离模式
操作
1.关闭yarn的内存检查
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
2.分发
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml
3.重启yarn
/export/server/hadoop/sbin/stop-yarn.sh
/export/server/hadoop/sbin/start-yarn.sh
测试
Session会话模式
在Yarn上启动一个Flink集群,并重复使用该集群,后续提交的任务都是给该集群,资源会被一直占用,除非手动关闭该集群----适用于大量的小任务
1.在yarn上启动一个Flink集群/会话,node1上执行以下命令
/export/servers/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
申请2个CPU、1600M内存
# -n 表示申请2个容器,这里指的就是多少个taskmanager
# -tm 表示每个TaskManager的内存大小
# -s 表示每个TaskManager的slots数量
# -d 表示以后台程序方式运行
注意:
该警告不用管
WARN org.apache.hadoop.hdfs.DFSClient - Caught exception
java.lang.InterruptedException
2.查看UI界面
3.使用flink run提交任务:
/export/servers/flink/bin/flink run /export/servers/flink/examples/batch/WordCount.jar
运行完之后可以继续运行其他的小任务
/export/servers/flink/bin/flink run /export/servers/flink/examples/batch/WordCount.jar
4.通过上方的ApplicationMaster可以进入Flink的管理界面
5.关闭yarn-session:
查看自己的应用信息:application_1609508087977_0005
yarn application -kill application_1609508087977_0005
Job分离模式--用的更多
针对每个Flink任务在Yarn上启动一个独立的Flink集群并运行,结束后自动关闭并释放资源,----适用于大任务
1.直接提交job
/export/servers/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar
# -m jobmanager的地址
# -yjm 1024 指定jobmanager的内存信息
# -ytm 1024 指定taskmanager的内存信息
2.查看UI界面
标签:bin,Flink,部署,flink,server,export,node1,安装 From: https://www.cnblogs.com/Mr-Sponge/p/17037291.html