首页 > 其他分享 >Flink安装部署

Flink安装部署

时间:2023-01-09 16:02:26浏览次数:60  
标签:bin Flink 部署 flink server export node1 安装

Flink安装部署

local本地模式-了解

原理

操作

1.下载安装包

https://archive.apache.org/dist/flink/

2.上传flink-1.13.1-bin-scala_2.12.tgz到node1的指定目录

3.解压

tar -zxvf flink-1.13.1-bin-scala_2.12.tgz

4.如果出现权限问题,需要修改权限

chown -R root:root /export/server/flink-1.13.1

5.改名或创建软链接

mv flink-1.13.1 flink

ln -s /export/server/flink-1.13.1 /export/server/flink

测试

1.准备文件/root/words.txt

vim /root/words.txt

hello me you her
hello me you
hello me
hello

2.启动Flink本地“集群”

/export/server/flink/bin/start-cluster.sh

3.使用jps可以查看到下面两个进程

- TaskManagerRunner

- StandaloneSessionClusterEntrypoint

4.访问Flink的Web UI

http://node1:8081/#/overview

slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。

5.执行官方示例

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out

6.停止Flink

/export/server/flink/bin/stop-cluster.sh

启动shell交互式窗口(目前所有Scala 2.12版本的安装包暂时都不支持 Scala Shell)

/export/server/flink/bin/start-scala-shell.sh local

执行如下命令

benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()

退出shell

:quit

Standalone独立集群模式-了解

原理

操作

1.集群规划:

- 服务器: node1(Master + Slave): JobManager + TaskManager

- 服务器: node2(Slave): TaskManager

- 服务器: node3(Slave): TaskManager

2.修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true

#历史服务器
jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/

2.修改masters

vim /export/server/flink/conf/masters

node1:8081

3.修改slaves

vim /export/server/flink/conf/workers

node1
node2
node3

4.添加HADOOP_CONF_DIR环境变量

vim /etc/profile

export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

5.分发

scp -r /export/server/flink node2:/export/server/flink

scp -r /export/server/flink node3:/export/server/flink

scp /etc/profile node2:/etc/profile

scp /etc/profile node3:/etc/profile

 for i in {2..3}; do scp -r flink node$i:$PWD; done

6.source

source /etc/profile

测试

1.启动集群,在node1上执行如下命令

/export/server/flink/bin/start-cluster.sh

或者单独启动

/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all

/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all

2.启动历史服务器

​ /export/server/flink/bin/historyserver.sh start

3.访问Flink UI界面或使用jps查看

http://node1:8081/#/overview

http://node1:8082/#/overview

4.执行官方测试案例

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar

6.停止Flink集群

/export/server/flink/bin/stop-cluster.sh

Standalone-HA高可用集群模式-了解

原理

操作

1.集群规划

- 服务器: node1(Master + Slave): JobManager + TaskManager

- 服务器: node2(Master + Slave): JobManager + TaskManager

- 服务器: node3(Slave): TaskManager

2.启动ZooKeeper

zkServer.sh status

zkServer.sh stop

zkServer.sh start

3.启动HDFS

/export/serves/hadoop/sbin/start-dfs.sh

4.停止Flink集群

/export/server/flink/bin/stop-cluster.sh

5.修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

增加如下内容

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

6.修改masters

vim /export/server/flink/conf/masters

hadoop-001:8081
hadoop-002:8081

7.同步

scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/

8.修改node2上的flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

jobmanager.rpc.address: node2

9.重新启动Flink集群,node1上执行

/export/server/flink/bin/stop-cluster.sh

/export/server/flink/bin/start-cluster.sh

10.使用jps命令查看

发现没有Flink相关进程被启动

11.查看日志

cat /export/server/flink/log/flink-root-standalonesession-0-node1.log

发现如下错误

因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar

12.下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作

下载地址

https://flink.apache.org/downloads.html

13.放入lib目录

cd /export/server/flink/lib

14.分发

for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done

15.重新启动Flink集群,node1上执行

/export/server/flink/bin/stop-cluster.sh

/export/server/flink/bin/start-cluster.sh

16.使用jps命令查看,发现三台机器已经ok

测试

1.访问WebUI

http://node1:8081/#/job-manager/config

http://node2:8081/#/job-manager/config

2.执行wc

/opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar

3.kill掉其中一个master

4.重新执行wc,还是可以正常执行

/opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar

3.停止集群

/export/server/flink/bin/stop-cluster.sh

原理

两种模式

Session会话模式

Job分离模式

操作

1.关闭yarn的内存检查

vim /export/server/hadoop/etc/hadoop/yarn-site.xml

 <!-- 关闭yarn内存检查 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
		<property>
                <name>yarn.scheduler.minimum-allocation-mb</name>
                <value>512</value>
        </property>
		<property>
                <name>yarn.nodemanager.resource.memory-mb</name>
                <value>2048</value>
        </property>

2.分发

scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml

3.重启yarn

/export/server/hadoop/sbin/stop-yarn.sh

/export/server/hadoop/sbin/start-yarn.sh

测试

Session会话模式

在Yarn上启动一个Flink集群,并重复使用该集群,后续提交的任务都是给该集群,资源会被一直占用,除非手动关闭该集群----适用于大量的小任务

1.在yarn上启动一个Flink集群/会话,node1上执行以下命令

/export/servers/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

说明:

申请2个CPU、1600M内存

# -n 表示申请2个容器,这里指的就是多少个taskmanager

# -tm 表示每个TaskManager的内存大小

# -s 表示每个TaskManager的slots数量

# -d 表示以后台程序方式运行

注意:

该警告不用管

WARN org.apache.hadoop.hdfs.DFSClient - Caught exception

java.lang.InterruptedException

2.查看UI界面

http://node1:8088/cluster

img

3.使用flink run提交任务:

/export/servers/flink/bin/flink run /export/servers/flink/examples/batch/WordCount.jar

运行完之后可以继续运行其他的小任务

/export/servers/flink/bin/flink run /export/servers/flink/examples/batch/WordCount.jar

4.通过上方的ApplicationMaster可以进入Flink的管理界面

img

5.关闭yarn-session:

查看自己的应用信息:application_1609508087977_0005

yarn application -kill application_1609508087977_0005

Job分离模式--用的更多

针对每个Flink任务在Yarn上启动一个独立的Flink集群并运行,结束后自动关闭并释放资源,----适用于大任务

1.直接提交job

/export/servers/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar

# -m jobmanager的地址

# -yjm 1024 指定jobmanager的内存信息

# -ytm 1024 指定taskmanager的内存信息

2.查看UI界面

http://node1:8088/cluster

标签:bin,Flink,部署,flink,server,export,node1,安装
From: https://www.cnblogs.com/Mr-Sponge/p/17037291.html

相关文章

  • Flink合流操作
    合流1、概念将不同流中的数据汇聚在一起,然后可以进行一个统计等相关操作。2、基本合流操作union和connectunion算子可以合并多个同类型的数据流,并生成同类型的数据流......
  • Flink的状态
    State-理解原理即可Flink中状态的自动管理之前写的Flink代码中其实已经做好了状态自动管理,如发送hello,得出(hello,1)再发送hello,得出(hello,2)说明Flink已经自动......
  • 【集成开发环境 (IDE)】Dev-Cpp下载与安装 [ 图文教程 ]
    版权声明©本文作者:main工作室本文链接:https://www.cnblogs.com/main-studio/p/17037280.html版权声明:本文为博客园博主「main工作室」的原创文章,遵循署名-非商业性......
  • Flink设置Source数据源
    流处理说明有边界的流boundedstream:批数据无边界的流unboundedstream:真正的流数据Source基于集合packagecom.pzb.source;importorg.apache.flink.api.co......
  • dell N4050声卡驱动安装后仍然没有声音,改装XP完美驱动IDT声卡!实测可用!
    文中的驱动可以解决4月份出的机器的XPIDT高清声卡驱动在XP下无法正常安装的问题,但是这些操作对动手能力要求较高。大家可以尝试下,但无法保证100%。​​高速下载​​​  ......
  • pip安装hashlib失败
    问题描述:    解决方法:因为hashlib的最新版本还是2008年的,所以从Python2.7开始就已经自带hashlib库,可以直接通过import进行导入importhashlibhash......
  • Flink的高级应用watermake理论
    Time/Watermarker时间分类EventTime的重要性和Watermarker的引入代码演示-开发版-掌握https://ci.apache.org/projects/flink/flink-docs-release-1.12/de......
  • 3568开发板ubuntu环境下安装Visual Studio Code
    首先进入官网下载,进入如下页面,选择适合自身版本的 ​​Linux​​ 的.deb版本下载,也可以直接使用我们提供好的安装包。下载完成之后,拷贝到ubuntu上,如下图所示:然后使用以......
  • Portainer安装
    Portainer是一个可视化的容器镜像的图形管理工具,利用Portainer可以轻松构建,管理和维护Docker环境。而且完全免费,基于容器化的安装方式,方便高效部署。官网地址:https://www......
  • Flink的转换方法
    流处理说明Transformation基本操作map/flatMap/filter/keyBy/sum/reduce...和之前学习的Scala/Spark里面的一样的意思map方法、flatmap方法、keyBy方法、reduce方法m......