首页 > 编程语言 >Flink入门之Flink程序开发步骤(java语言)

Flink入门之Flink程序开发步骤(java语言)

时间:2024-02-19 14:45:17浏览次数:34  
标签:Flink java scala 数据源 flink 程序开发 env 数据

Flink入门之Flink程序开发步骤(java语言)

文章目录


注:本篇章的flink学习均是基于java开发语言

我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢?

前情回顾:什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)?

image-20210307225350028

- Batch Analytics,右边是 Streaming Analytics。批量计算: 统一收集数据->存储到DB->对数据进行批量处理,对数据实时性邀请不高,比如生成离线报表、月汇总,支付宝年度账单(一年结束批处理计算)

- Streaming Analytics 流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如 实时报表、车辆实时报警计算等等。

(0)开发程序所需依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.12.2</flink.version>
</properties>
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
</dependencies>
    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 设置jar包的入口类(可选) -->
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109

(1)获取执行环境

flink程序开发,首要的便是需要获取其执行环境!

ex:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  • 1

或者:

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 1

如果使用StreamExecutionEnvironment 默认便是流式处理环境

但是flink1.12 开始,流批一体,我们可以自己指定当前计算程序的环境模式

指定为自动模式:AUTOMATIC

此设置后,flink将会自动识别数据源类型

有界数据流,则会采用批方式进行数据处理

无界束流,则会采用流方式进行数据处理

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  • 1

强制指定为批数据处理模式:BATCH

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  • 1

强制指定为流数据处理模式:STREAMING

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
  • 1

注意点:

在flink中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATICSTREAMING,不可以指定为BATCH否则程序会报错!

(2)加载/创建数据源

flink,是一个计算框架,在计算的前提,肯定是要有数据来源啊!

flink可以从多种场景读取加载数据,例如 各类DB 如MysqlSQL SERVERMongoDB、各类MQ 如KafkaRabbitMQ、以及很多常用数据存储场景 如redis文件(本地文件/HDFS)scoket

我们在加载数据源的时候,便知道,该数据是有界还是无界了!

ex:

flink读取rabbitMQ消息,是有界还是无界呢?当然是无界!因为flink程序启动时,能通过连接知道什么时候MQ中有数据,什么时候没有数据吗?不知道,因为本身MQ中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待MQ中有数据到来,然后处理。


flink读取指定某个文件中的数据,那么此数据源是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!

ex:

从集合中读取数据:

DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java");
  • 1

那么,这是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。

scoket中读取数据:

 DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
  • 1

这是无界数据还是有界数据呢?很明显,无界数据!因为scoket一旦连接,flink不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket中有数据到来,然后处理。

(3)数据转换处理

数据转换处理,就是flink使用算子,对从数据源中获取的数据进行数据加工处理(例如 数据转换,计算等等)

例如:开窗口、低阶处理函数ProcessFuction、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。

demo示例:

DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
                                                           "java,scala,php", "java,scala", "java");
// 数据处理
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String element, Collector<String> out) throws Exception {
        String[] wordArr = element.split(",");
        for (String word : wordArr) {
            out.collect(word);
        }
    }
});
flatMap.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        return value.toUpperCase();
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

(4)处理后数据放置/输出

将计算后的数据,进行放置(输出/存储),可以很地方,从什么地方读取数据,自然也可以将计算结果输出到该地点。

例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket

ex:输出到控制台

source.print();
  • 1

(5)执行计算程序

flink程序需要启动才能执行任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar & 或者编译器中点击图标按钮启动

启动示例:

// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置模式 (流、批、自动)
// 2.加载数据源
// 3.数据转换
// 4.数据输出
// 5.执行程序
env.execute();
//或者 env.execute("指定当前计算程序名");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(6)完整示例

public class FlinkDemo {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.加载数据源
        DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
                "java,scala,php", "java,scala", "java");
        // 3.数据转换
        DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String element, Collector<String> out) throws Exception {
                String[] wordArr = element.split(",");
                for (String word : wordArr) {
                    out.collect(word);
                }
            }
        });
        //DataStream 下边为DataStream子类
        SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        });
        // 4.数据输出
        source.print();
        // 5.执行程序
        env.execute("flink-hello-world");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

IDEA执行后,输出结果:

image-20210331222234445

前边序号可以理解为多线程执行时的线程名字!

原文链接:https://blog.csdn.net/leilei1366615/article/details/115362824

标签:Flink,java,scala,数据源,flink,程序开发,env,数据
From: https://www.cnblogs.com/sunny3158/p/18021068

相关文章

  • 前端知识回顾概览--JavaScript 高级
     掌握JS语言,针对闭包、原型链等有深入理解对typescript静态化工具熟练掌握精通常见设计模式了解函数式编程 1.this指针/闭包/作用域this指针详解闭包的概念及应用场景作用域(全局作用域/函数作用域)默认绑定、显式绑定、隐式绑定存储空间、执行上下文2.面向对象编......
  • 前端知识回顾概览--小程序开发
    1. 小程序入门小程序的基础使用小程序生命周期小程序架构-双线程模型运行机制自己仿一个简易小程序2. 微信小程序详解微信小程序开发API详解微信小程序开发/发布/上线流程详解微信小程序原理解析3. 工程化开发小程序小程序工程化详解4. 百度/支付宝小程序......
  • Java中==和equals有什么区别
    原文网址:​​Java中==和equals有什么区别_IT利刃出鞘的博客-CSDN博客​​简介本文介绍java中==和equals的区别。分享Java技术星球(自学精灵),有面试真题和架构技术等:​​https://learn.skyofit.com/​​区别区别是:一个是运算符,一个是方法。==比较变量的值是否相同。如果比较......
  • 在script标签写export为什么会抛错|type module import ES5 ES6 预处理 指令序言 JavaS
    今天我们进入到语法部分的学习。在讲解具体的语法结构之前,这一堂课我首先要给你介绍一下JavaScript语法的一些基本规则。脚本和模块首先,JavaScript有两种源文件,一种叫做脚本,一种叫做模块。这个区分是在ES6引入了模块机制开始的,在ES5和之前的版本中,就只有一种源文件类型(就......
  • Java方法重写与重载
    一、方法重载(overload)概念方法重载指同一个类中定义的多个方法之间的关系,满足下列条件的多个方法相互构成重载 多个方法在同一个类中 多个方法具有相同的方法名 多个方法的参数不相同,类型不同或者数量不同 所谓方法重载就是指我们可以定义一些名称相同的方法,通过定......
  • java普通项目转springboot项目
    添加启动类@SpringBootApplicationpublicclassSpringBootMain{publicstaticvoidmain(String[]args){SpringApplication.run(SpringBootMain.class,args);}}添加依赖<parent><groupId>org.springframework.boot</grou......
  • java 日期计算
      importjava.util.Calendar;publicclassMain{publicstaticvoidmain(String[]args){//创建一个Calendar对象并设置为当前时间Calendarcalendar=Calendar.getInstance();//获取当前年份、月份和日期inty......
  • 一位普通Javaer的成长之路
    前言此文章用于记录自己作为Java开发者的成长历程永远置顶于我的博客为什么要做Java其实本来是想学C#做桌面应用程序的,奈何Java的火热和易上手,加上好找工作些,所以入行了Java当然,也不影响我现在偶尔会学学C#,做windows下的桌面应用程序以《斗破苍穹》的斗气段位来代表计算机专......
  • JAVA基础-正则表达式
    1,正则表达式  正则表达式,又称规则表达式,(RegularExpression,在代码中常简写为regex、regexp或RE),是一种文本模式,包括普通字符(例如,a到z之间的字母)和特殊字符(称为"元字符"),是计算机科学的一个概念。正则表达式使用单个字符串来描述、匹配一系列匹配某个句法规则的字符串,通常被用......
  • JAVA基础-类的成员
    1.类的成员-属性属性这里就是成员变量,也叫成员变量,直接定义在类中的。在方法体外声明的变量称之为成员变量实例变量(不以static修饰)类变量(以static修饰)在方法体内部声明的变量称之为局部变量形参(方法、构造器中定义的变量)方法局部变量(在方法体内定义)代码块变量(在代码......