首页 > 其他分享 >Flink DataStream API

Flink DataStream API

时间:2023-07-04 11:45:56浏览次数:55  
标签:DataStream StreamExecutionEnvironment 读取 Flink Source API String

Flink 的 DataSet 和 DataStream 的 API,并模拟了实时计算的场景。

说好的流批一体呢

现状

Flink 很重要的一个特点是“流批一体”,然而事实上 Flink 并没有完全做到所谓的“流批一体”,即编写一套代码,可以同时支持流式计算场景和批量计算的场景。目前截止 1.10 版本依然采用了 DataSet 和 DataStream 两套 API 来适配不同的应用场景。

DateSet 和 DataStream 的区别和联系

在官网或者其他网站上,都可以找到目前 Flink 支持两套 API 和一些应用场景,但大都缺少了“为什么”这样的思考。

Apache Flink 在诞生之初的设计哲学是:用同一个引擎支持多种形式的计算,包括批处理、流处理和机器学习等。尤其是在流式计算方面,Flink 实现了计算引擎级别的流批一体。那么对于普通开发者而言,如果使用原生的 Flink ,直接的感受还是要编写两套代码。

整体架构如下图所示:

在 Flink 的源代码中,我们可以在 flink-java 这个模块中找到所有关于 DataSet 的核心类,DataStream 的核心实现类则在 flink-streaming-java 这个模块。

在上述两张图中,我们分别打开 DataSet 和 DataStream 这两个类,可以发现,二者支持的 API 都非常丰富且十分类似,比如常用的 map、filter、join 等常见的 transformation 函数。

对于 DataSet 而言,Source 部分来源于文件、表或者 Java 集合;而 DataStream 的 Source 部分则一般是消息中间件比如 Kafka 等。

(一)Environment

Flink有以下几种Environment

1. 批处理Environment,ExecutionEnvironment

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

2.流处理Environment,StreamExecutionEnvironment

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

3. 本机Environment,LocalEnvironment

ExecutionEnvironment env = LocalEnvironment.getExecutionEnvironment();

4. java集合Environment,CollectionEnvironment

ExecutionEnvironment env = CollectionEnvironment.getExecutionEnvironment();

创建Environment的方法

1. getExecutionEnvironment ,含义就是本地运行就是 createLocalEnvironment,如果是通过client提交到集群上,就返回集群的环境

Creates an execution environment that represents the context in which the program is currently executed.
    * If the program is invoked standalone, this method returns a local execution environment, as returned by
    * {@link #createLocalEnvironment()}. If the program is invoked from within the command line client to be
    * submitted to a cluster, this method returns the execution environment of this cluster.

2. createLocalEnvironment ,返回本地执行环境,需要在调用时指定默认的并行度,比如

LocalStreamEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment(1);
 
LocalEnvironment env2 = ExecutionEnvironment.createLocalEnvironment(1);

3. createRemoteEnvironment, 返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager 的 IP 和端口号,并指定要在集群中运行的 Jar 包,比如

StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8080, "/path/word_count.jar");
 
ExecutionEnvironment env2 = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8080, "/path/word_count.jar");

在工作中我们一般使用IntelliJ IDEA开发工具进行代码开发,为了能方便快速的调试Flink和了解Flink程序的运行情况,我们希望本地开发工具中运行Flink时能查看到WebUI,这就可以在编写Flink程序时开启本地WebUI。

在Flink1.15版本之前根据使用Scala版本在Java Flink项目或Scala Flink项目中添加对应Scala版本的依赖。在Flink1.15版本之后,无论是Java Flink项目还是Scala Flink项目,添加如下依赖,不需额外依赖Scala版本。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime-web</artifactId>
  <version>${flink.version}</version>
</dependency>

Flink Java 代码启动本地WebUI:

Configuration conf = new Configuration();
//设置WebUI绑定的本地端口
conf.setString(RestOptions.BIND_PORT,"8081");
//使用配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

执行

env.execute();

(二)DataStream API

DataStream是Flink编写流处理作业的API。我们前面说过一个完整的Flink处理程序应该包含三部分:数据源(Source)、转换操作(Transformation)、结果接收(Sink)。下面我们从这三部分来看DataStream API。

(三)数据源(Source)

Flink应用程序从数据源获取要处理的数据,DataStream通过StreamExecutionEnvironment.addResource(SourceFunction) 来添加数据源。为了方便使用,Flink预提几类预定义的数据源,比如读取文件的Source、通过Sockt读取的Source、从内存中获取的Source等。

1.基于集合的预定义Source

基于集合的数据源一般是指从内存集合中直接读取要处理的数据,StreamExecutionEnvironment提供了4类预定义方法。

fromCollection

fromCollection是从给定的集合创建DataStream,StreamExecutionEnvironment提供了4种重载方法:

  • fromCollection(Collection<T> data):通过给定的集合创建DataStream。返回数据类型为集合元素类型。
  • fromCollection(Collection<T> data,TypeInformation<T> typeInfo):通过给定的非空集合创建DataStream。返回数据类型为typeInfo。
  • fromCollection(Iterator<T> data,Class<T> type):通过给定的迭代器创建DataStream。返回数据类型为type。
  • fromCollection(Iterator<T> data,TypeInformation<T> typeInfo):通过给定的迭代器创建DataStream。返回数据类型为typeInfo。

fromParallelCollection

fromParallelCollection和fromCollection类似,但是是并行的从迭代器中创建DataStream

  • fromParallelCollection(SplittableIterator<T> data,Class<T> type)
  • fromParallelCollection(SplittableIterator<T>,TypeInfomation typeInfo)

和Iterable中Spliterator类似,这是JDK1.8新增的特性,并行读取集合元素。

fromElements

fromElements从给定的对象序列创建DataStream,StreamExecutionEnvironment提供了2种重载方法:

  • fromElements(T... data):从给定对象序列中创建DataStream,返回的数据类型为该对象类型自身。
  • fromElements(Class<T> type,T... data):从给定对象序列中创建DataStream,返回的数据类型type。

generateSequence

generateSequence(long from,long to)给定间隔的数字序列创建DataStream,比如from为1,to为10,则会生成1~10的序列。

2.基于Socket的预定义Source

我们还可以通过Socket来读取数据,通过Sockt创建的DataStream能够从Socket中无限接收字符串,字符编码采用系统默认字符集。当Socket关闭时,Source停止读取。Socket提供了5个重载方法,但是有两个方法已经标记废弃。

  • socketTextStream(String hostname,int port):指定Socket主机和端口,默认数据分隔符为换行符(\n)。
  • socketTextStream(String hostname,int port,String delimiter):指定Socket主机和端口,数据分隔符为delimiter。
  • socketTextStream(String hostname,int port,String delimiter,long maxRetry):该重载方法能够当与Socket断开时进行重连,重连次数由maxRetry决定,时间间隔为1秒。如果为0则表示立即终止不重连,如果为负数则表示一直重试。
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);

3.基于文件的预定义Source

基于文件创建DataStream主要有两种方式:readTextFile和readFile。(readFileStream已废弃)。readTextFile就是简单读取文件,而readFile的使用方式比较灵活。

readTextFile

readTextFile提供了两个重载方法:

  • readTextFile(String filePath):逐行读取指定文件来创建DataStream,使用系统默认字符编码读取。
  • readTextFile(String filePath,String charsetName):逐行读取文件来创建DataStream,使用charsetName编码读取。
DataSource<String> lineDS = env.readTextFile("D:\\test\\test.txt");
DataStreamSource<String> lineDS = env.readTextFile("D:\\test\\test.txt");

readFile

readFile通过指定的FileInputFormat来读取用户指定路径的文件。对于指定路径文件,我们可以使用不同的处理模式来处理,FileProcessingMode.PROCESS_ONCE模式只会处理文件数据一次,而FileProcessingMode.PROCESS_CONTINUOUSLY会监控数据源文件是否有新数据,如果有新数据则会继续处理。

readFile(FileInputFormat<T> inputFormat,
      String filePath,
        FileProcessingMode watchType,
          long interval,TypeInformation typrInfo) 
参数说明实例
inputFormat 创建DataStream指定的输入格式
filePath 读取的文件路径,为URI格式。既可以读取普通文件,可以读取HDFS文件 file:///some/local/file 或hdfs://host:port/file/path
watchType 文件数据处理方式 FileProcessingMode.PROCESS_ONCE或FileProcessingMode.PROCESS_CONTINUOUSLY
interval 在周期性监控Source的模式下(PROCESS_CONTINUOUSLY),指定每次扫描的时间间隔 10
typeInformation 返回数据流的类型  

readFile提供了几个便于使用的重载方法,但它们最终都是调用上面这个方法的。

  • readFile(FileInputFormat<T> inputFormat,String filePath):处理方式默认使用FileProcessingMode.PROCESS_ONCE。
  • readFile(FileInputFormat<T> inputFormat,String filePath,FileProcessingMode watchType,long interval):返回类型默认为inputFormat类型。

需要注意:在使用FileProcessingMode.PROCESS_CONTINUOUSLY时,当修改读取文件时,Flink会将文件整体内容重新处理,也就是打破了"exactly-once"。

4.自定义Source

除了预定义的Source外,我们还可以通过实现SourceFunction来自定义Source,然后通过StreamExecutionEnvironment.addSource(sourceFunction)添加进来。比如读取Kafka数据的Source:

addSource(new FlinkKafkaConsumer08<>);

我们可以实现以下三个接口来自定义Source:

  • SourceFunction:创建非并行数据源。
  • ParallelSourceFunction:创建并行数据源。
  • RichParallelSourceFunction:创建并行数据源。

(四)数据转换(Transformation)

(五)结果数据接收器(Data sink)

数据经过Flink处理之后,最终结果会写到file、socket、外部系统或者直接打印出来。数据接收器定义在DataStream类下,我们通过addSink()可以来添加一个接收器。同Source,Flink也提供了一些预定义的Data Sink让我们直接使用。

1.写入文本文件

DataStream提供了两个writeAsText重载方法,写入格式会调用写入对象的toString()方法。

  • writeAsText(String path):将DataStream数据写入到指定文件。
  • writeAsText(String path,WriteMode writeMode):将DataStream数据写入到指定文件,可以通过writeMode来指定如果文件已经存在应该采用什么方式,可以指定OVERWRITE或NO_OVERWRITE。

2.写入CSV文件

DataStream提供了三个写入csv文件的重载方法,对于DataStream中的每个Filed,都会调用其对象的toString()方法作为写入格式。writeAsCsv只能用于元组(Tuple)的DataStream。

writeAsCsv(String path,WriteMode writeMode,String rowDelimiter,String fieldDelimiter)
参数说明实例
path 写入文件路径
writeMode 如果写入文件已经存在,采用什么方式处理 WriteMode.NO_OVERWRITE 或WriteMode.OVERWRITE
rowDelimiter 定义行分隔符
fieldDelimiter 定义列分隔符  

DataStream提供了两个简易重载方法:

  • writeAsCsv(String path):使用"\n"作为行分隔符,使用","作为列分隔符。
  • writeAsCsv(String path,WriteMode writeMode):使用"\n"作为行分隔符,使用","作为列分隔符。

3.写入Socket

Flink提供了将DataStream作为字节数组写入Socket的方法,通过SerializationSchema来指定输出格式。

writeToSocket(String hostName,int port,SerializationSchema<T> schema)

4.指定输出格式

DataStream提供了自定义文件输出的类和方法,我们能够自定义对象到字节的转换。

writeUsingOutputFormat(OutputFormat<T> format)

5.结果打印

DataStream提供了print和printToErr打印标准输出/标准错误流。DataStream中的每个元素都会调用其toString()方法作为输出格式,我们也可以指定一个前缀字符来区分不同的输出。

  • print():标准输出
  • print(String sinkIdentifier):指定输出前缀
  • printToErr():标准错误输出
  • printToErr(String sinkIdentifier):指定输出前缀

对于并行度大于1的输出,输出结果也将输出任务的标识符作为前缀。

6.自定义输出器

我们一般会自定义输出器,通过实现SinkFunction接口,然后通过DataStream.addSink(sinkFunction)来指定数据接收器。

addSink(SinkFunction<T> sinkFunction)

注意:对于DataStream中的writeXxx()方法一般都是用于测试使用,因为他们并没有参与chaeckpoint,所以它们只有"at-last-once"也就是至少处理一次语义。

如果想要可靠输出,想要使用"exactly-once"语义准确将结果写入到文件系统中,我们需要使用flink-connector-filesystem。此外,我们也可以通过addSink()自定义输出器来使用Flink的checkpoint来完成"exactl-oncey"语义。

 

标签:DataStream,StreamExecutionEnvironment,读取,Flink,Source,API,String
From: https://www.cnblogs.com/imreW/p/17525346.html

相关文章

  • 让IIS支持.NET Web Api PUT和DELETE请求
    前言    有很长一段时间没有使用过IIS来托管应用了,今天用IIS来托管一个比较老的.NETFx4.6的项目。发布到线上后居然一直调用不同本地却一直是正常的,关键是POST和GET请求都是正常的,只有PUT和DELETE请求是有问题的。经过一番思考忽然想起来了IIS默认情况下拒绝处理PUT和DELETE......
  • Eolink 全新一代「AI+API」协作管理平台,大模型驱动打造 API 研发管理与自动化测试!
    行业首发!Eolink全新一代「AI+API」协作管理平台,实现「AI+API」结合,大模型驱动打造API研发管理与自动化测试全新体验。Eolink「AI+API」为API带来什么?输入语义化指令即可生成API文档内容;在API文档测试页中可一键生成测试请求数据;可实现圈定API文档范围智能生......
  • 什么是SPI,和API有啥区别
    SPI代表服务提供者接口(ServiceProviderInterface),是一种Java编程语言的编程规范。它定义了一组接口或类的规范,供第三方开发人员实现,以向应用程序提供特定的服务或功能。SPI侧重于接口的定义和实现者的开发。API代表应用程序编程接口(ApplicationProgrammingInterface),是一组预定......
  • 「API 生态」Eolink 与 API7 达成战略合作,共同打造 API 治理解决方案
    在当今竞争激烈的市场环境中,企业不断地向数字化转型迈进,API已经成为数字化转型中不可或缺的一环。如何统筹规划、管理保护API早已成为企业研发团队的核心挑战。Eolink和API7支流科技作为国内领先的专业厂商,一直引领着API管理及应用安全领域的发展。面对企业API管理......
  • 记录一个boost1.72和Win32api的冲突
    报错:2>C:\ProgramFiles(x86)\WindowsKits\10\Include\10.0.19041.0\um\fileapi.h(53,1):errorC2116:'boost::interprocess::winapi::CreateDirectoryA':functionparameterlistsdonotmatchbetweendeclarations2>D:\software\boost_1_7......
  • 快手根据ID取商品详情 API 返回值说明
    item_get-根据ID取商品详情公共参数请求地址: https://o0b.cn/anzexi名称类型必须描述keyString是调用key(必须以GET方式拼接在URL中)secretString是调用密钥api_nameString是API接口名称(包括在请求地址中)[item_search,item_get,item_search_shop等]ca......
  • Apisix-linux下的内网安装
    1.背景一般测试环境都是没有外网的,apisix官网写了安装方式,但是感觉描述的并不好,结合自己的实际体验整理了下。APISIX-官网APISIX-GitHub2.离线安装假设现在有两台相同的机器我刚开始没注意到这个问题,A机是centos8,B机是centos7,下载后的rpm文件在B机上就用不了了。A机:拥有......
  • 深入了解API接口调用——从获取淘宝商品数据开始
     API(ApplicationProgrammingInterface)是现代软件开发中常用的一种技术,它允许不同的软件系统进行交互和通信。在本文中,我们将深入探讨如何通过API接口来获取淘宝商品数据。这是一个常见的需求,无论是对于商家还是开发者,都有着重要的意义。第一部分:API接口概述在开始之前,我们首......
  • solidworks api ch02
    openSolidWorks.Interop.sldworksletcmdConnect(swApp:ISldWorks)=swApp.SendMsgToUser$"RevisionNumber:{swApp.RevisionNumber}"swApp.DisplayStatusBartrueswApp.SendMsgToUser$"DisplayStatusBarOk!"letla......
  • VM VirtualBox + CentOS 7 本地化部署yapi服务
    一、安装OracleVMVirtualBoxVirtualBox下载地址:https://www.virtualbox.org/wiki/Downloads按需选择系统版本安装,安装完成后启动。安装VirtualBox插件,双击下载文件即可安装二、创建CentOS虚拟机选择无界面版本,下载地址:http://mirrors.jlu.edu.cn/centos/7.9.......