Flink入门之Flink程序开发步骤(java语言)
文章目录
注:本篇章的flink学习均是基于java开发语言
我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢?
前情回顾:什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)?
- Batch Analytics,右边是 Streaming Analytics。批量计算: 统一收集数据->存储到DB->对数据进行批量处理,对数据实时性邀请不高,比如生成离线报表、月汇总,支付宝年度账单(一年结束批处理计算)
- Streaming Analytics 流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如 实时报表、车辆实时报警计算等等。
(0)开发程序所需依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.12.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置jar包的入口类(可选) -->
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
(1)获取执行环境
flink程序开发,首要的便是需要获取其执行环境!
ex:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- 1
或者:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 1
如果使用StreamExecutionEnvironment
默认便是流式处理环境
但是flink1.12
开始,流批一体,我们可以自己指定当前计算程序的环境模式
指定为自动模式:AUTOMATIC
此设置后,flink将会自动识别数据源类型
有界数据流,则会采用批方式进行数据处理
无界束流,则会采用流方式进行数据处理
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
- 1
强制指定为批数据处理模式:BATCH
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- 1
强制指定为流数据处理模式:STREAMING
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
- 1
注意点:
在flink中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATIC
或STREAMING
,不可以指定为BATCH
否则程序会报错!
(2)加载/创建数据源
flink
,是一个计算框架,在计算的前提,肯定是要有数据来源啊!
flink
可以从多种场景读取加载数据,例如 各类DB 如Mysql
、SQL SERVER
、MongoDB
、各类MQ 如Kafka
、RabbitMQ
、以及很多常用数据存储场景 如redis
、文件(本地文件/HDFS)
、scoket
…
我们在加载数据源的时候,便知道,该数据是有界还是无界了!
ex:
flink
读取rabbitMQ
消息,是有界还是无界呢?当然是无界!因为flink
程序启动时,能通过连接知道什么时候MQ中有数据,什么时候没有数据吗?不知道,因为本身MQ中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待MQ中有数据到来,然后处理。
flink
读取指定某个文件中的数据,那么此数据源是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!
ex:
从集合中读取数据:
DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java");
- 1
那么,这是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。
从scoket
中读取数据:
DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
- 1
这是无界数据还是有界数据呢?很明显,无界数据!因为scoket
一旦连接,flink
不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket
中有数据到来,然后处理。
(3)数据转换处理
数据转换处理,就是flink
使用算子,对从数据源中获取的数据进行数据加工处理(例如 数据转换,计算等等)
例如:开窗口、低阶处理函数ProcessFuction
、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。
demo示例:
DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
"java,scala,php", "java,scala", "java");
// 数据处理
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String element, Collector<String> out) throws Exception {
String[] wordArr = element.split(",");
for (String word : wordArr) {
out.collect(word);
}
}
});
flatMap.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
(4)处理后数据放置/输出
将计算后的数据,进行放置(输出/存储),可以很地方,从什么地方读取数据,自然也可以将计算结果输出到该地点。
例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket
…
ex:输出到控制台
source.print();
- 1
(5)执行计算程序
flink程序需要启动才能执行任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar &
或者编译器中点击图标按钮启动
启动示例:
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置模式 (流、批、自动)
// 2.加载数据源
// 3.数据转换
// 4.数据输出
// 5.执行程序
env.execute();
//或者 env.execute("指定当前计算程序名");
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
(6)完整示例
public class FlinkDemo {
public static void main(String[] args) throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.加载数据源
DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
"java,scala,php", "java,scala", "java");
// 3.数据转换
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String element, Collector<String> out) throws Exception {
String[] wordArr = element.split(",");
for (String word : wordArr) {
out.collect(word);
}
}
});
//DataStream 下边为DataStream子类
SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
});
// 4.数据输出
source.print();
// 5.执行程序
env.execute("flink-hello-world");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
IDEA执行后,输出结果:
前边序号可以理解为多线程执行时的线程名字!
原文链接:https://blog.csdn.net/leilei1366615/article/details/115362824 标签:Flink,java,scala,数据源,flink,程序开发,env,数据 From: https://www.cnblogs.com/sunny3158/p/18021068