首页 > 其他分享 >Flink详解

Flink详解

时间:2023-06-03 10:02:47浏览次数:36  
标签:Flink 批处理 flink 处理 详解 apache 数据

Apache Flink 是一个开源的分布式,高性能,高可用,准确的流处理框架。

分布式:表示flink程序可以运行在很多台机器上,
高性能:表示Flink处理性能比较高
高可用:表示flink支持程序的自动重启机制。
准确的:表示flink可以保证处理数据的准确性。

Flink支持流处理和批处理,虽然我们刚才说了flink是一个流处理框架,但是它也支持批处理。其实对于flink而言,它是一个流处理框架,批处理只是流处理的一个极限特例而已。

image

左边是数据源,从这里面可以看出来,这些数据是实时产生的一些日志,或者是数据库、文件系统、kv存储系统中的数据。
中间是Flink,负责对数据进行处理。
右边是目的地,Flink可以将计算好的数据输出到其它应用中,或者存储系统中。

Flink架构图

image

首先图片最下面表示是flink的一些部署模式,支持local,和集群(standalone,yarn),也支持在云上部署。
往上一层是flink的核心,分布式的流处理引擎。
再往上面是flink的API和类库
主要有两大块API,DataStram API和DataSet API,分别做流处理和批处理。
针对DataStram API这块,支持复杂事件处理,和table操作,其实也是支持SQL操作的。
针对DatasetAPI 这块,支持flinkML机器学习,Gelly图计算,table操作,这块也是支持SQL操作的。
其实从这可以看出来,Flink也是有自己的生态圈的,里面包含了实时计算、离线计算、机器学习、图计算、Table和SQL计算等等
所以说它和Spark还是有点像的,不过它们两个的底层计算引擎是有本质区别的,一会我们会详细分析。

Flink三大核心组件

image

Flink包含三大核心组件
Data Source,数据源(负责接收数据),
Transformations 算子(负责对数据进行处理)
Data Sink 输出组件(负责把计算好的数据输出到其它存储介质中)

Flink的流处理与批处理

接下来我们来分析一下Flink这个计算引擎的核心内容

  • 在大数据处理领域,批处理和流处理一般被认为是两种不同的任务,一个大数据框架一般会被设计为只能处理其中一种任务

例如Storm只支持流处理任务,而MapReduce、Spark只支持批处理任务。Spark Streaming是Spark之上支持流处理任务的子系统,看似是一个特例,其实并不是——Spark Streaming采用了一种microbatch的架构,就是把输入的数据流切分成细粒度的batch,并为每一个batch提交一个批处理的Spark任务,所以Spark Streaming本质上执行的还是批处理任务,和Storm这种流式的数据处理方式是完全不同的。

  • Flink通过灵活的执行引擎,能够同时支持批处理和流处理
    在执行引擎这一层,流处理系统与批处理系统最大的不同在于节点之间的数据传输方式。
    对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理

这就是典型的一条一条处理

而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,
并不会立刻通过网络传输到下一个节点,当缓存写满的时候,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点

这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求
Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型
Flink以固定的缓存块为单位进行网络数据传输,用户可以通过缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似前面所说的流处理系统的标准模型,此时系统可以获得最低的处理延迟
如果缓存块的超时值为无限大,则Flink的数据传输方式类似前面所说的批处理系统的标准模型,此时系统可以获得最高的吞吐量

这样就比较灵活了,其实底层还是流式计算模型,批处理只是一个极限特例而已。

image

第一个:一条一条处理
第二个:一批一批处理
第三个:按照缓存块进行处理,缓存块可以无限小,也可以无限大,这样就可以同时支持流处理和批处理了。

接下来我们来对比一下目前大数据领域中的三种实时计算引擎

产品 Storm SparkStreaming Flink
模型 Native Mirco-Batching Native
API 组合式 声明式 声明式
语义 At-least-once Exectly-once Exectly-once
容错机制 Ack Checkpoint Checkpoint
状态管理 基于DStream 基于操作
延时 Low Medium Low
吞吐量 Low High High

解释:
Native:表示来一条数据处理一条数据
Mirco-Batch:表示划分小批,一小批一小批的处理数据
组合式:表示是基础API,例如实现一个求和操作都需要写代码实现,比较麻烦,代码量会比较多。
声明式:表示提供的是封装后的高阶函数,例如filter、count等函数,可以直接使用,比较方便,代码量比较少。

实时计算框架如何选择

  1. 需要关注流数据是否需要进行状态管理
  2. 消息语义是否有特殊要求At-least-once或者Exectly-once
  3. 小型独立的项目,需要低延迟的场景,建议使用Storm
  4. 如果项目中已经使用了Spark,并且秒级别的实时处理可以满足需求,建议使用SparkStreaming
  5. 要求消息语义为Exectly-once,数据量较大,要求高吞吐低延迟,需要进行状态管理,建议选择Flink

Flink快速上手使用

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.11.1</version>
</dependency>

scala使用1.11版本

在开发Flink程序之前,我们先来梳理一下开发一个Flink程序的步骤

  1. 获得一个执行环境
  2. 加载/创建 初始化数据
  3. 指定操作数据的transaction算子
  4. 指定数据目的地
  5. 调用execute()触发执行程序

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序
和Spark类似,Spark中是必须要有action算子才会真正执行。

Streaming WordCount

需求:通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * 需求:通过Socket实时产生一些单词,
  * 使用Flink实时接收数据
  * 对指定时间窗口内(例如:2秒)的数据进行聚合统计
  * 并且把时间窗口内计算的结果打印出来
  */
object SocketWindowWordCountScala {
  /**
    * 注意:在执行代码之前,需要先在bigdata01机器上开启socket,端口为9999
    *
    * @param args
    */
  def main(args: Array[String]): Unit = {
    //获取运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //连接socket获取输入数据
    val text = env.socketTextStream("bigdata01", 9999)
    //处理数据
    //注意:必须要添加这一行隐式转换的代码,否则下面的flatMap方法会报错
    import org.apache.flink.api.scala._
    val wordCount = text.flatMap(_.split(" ")) //将每一行数据根据空格切分单词
      .map((_, 1)) //每一个单词转换为tuple2的形式(单词,1)
      //.keyBy(0)//根据tuple2中的第一列进行分组
      .keyBy(tup => tup._1) //官方推荐使用keyselector选择器选择数据
      .timeWindow(Time.seconds(2)) //时间窗口为2秒,表示每隔2秒钟计算一次接收到的数
      .sum(1) // 使用sum或者reduce都可
    //使用一个线程执行打印操作
    wordCount.print().setParallelism(1)
    //执行程序
    env.execute("SocketWindowWordCountScala")
  }

}

在bigdata01上面开启socket

[root@bigdata01 ~]# nc -l 9999
hello you hello me

idea控制台可以看到如下效果

(me,1)
(you,1)
(hello,2)

Batch WordCount

需求:统计指定文件中单词出现的总次数
下面来开发Flink的批处理代码

resources下需要有hdfs-site.xml配置文件

<configuration>
    <property>
        <name>dfs.client.use.datanode.hostname</name>
        <value>true</value>
    </property>
</configuration>
import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * 需求:统计指定文件中单词出现的总次数
  */
object BatchWordCountScala {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    val inputPath = "hdfs://bigdata01:9000/hello.txt"
    val outPath = "hdfs://bigdata01:9000/flink-out"
    //读取文件中的数据
    val text = env.readTextFile(inputPath)
    //处理数据
    import org.apache.flink.api.scala._
    val wordCount = text.flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .setParallelism(1)
    //将结果数据保存到文件中
    wordCount.writeAsCsv(outPath, "\n", " ")
    //执行程序
    env.execute("BatchWordCountScala")
  }

}

注意:这里面执行setParallelism(1)设置并行度为1是为了将所有数据写到一个文件里面,我们查看结果的时候比较方便

执行成功之后到hdfs上查看结果

hello 2
me 1
you 1

流处理Streaming
执行环境:StreamExecutionEnvironment
数据类型:DataStream

批处理Batch
执行环境:ExecutionEnvironment
数据类型:DataSet

标签:Flink,批处理,flink,处理,详解,apache,数据
From: https://www.cnblogs.com/strongmore/p/17370219.html

相关文章

  • Flink安装部署
    Flink集群安装部署Flink支持多种安装部署方式StandaloneONYARNMesos、Kubernetes、AWS…这些安装方式我们主要讲一下standalone和onyarn。如果是一个独立环境的话,可能会用到standalone集群模式。在生产环境下一般还是用onyarn这种模式比较多,因为这样可以综合利用集群......
  • Sqoop详解
    Sqoop下载及安装Sqoop目前有两大版本,Sqoop1和Sqoop2,这两个版本都是一直在维护者的,所以使用哪个版本都可以。这两个版本我都用过,还是感觉Sqoop1用起来比较方便,使用Sqoop1的时候可以将具体的命令全部都写到脚本中,这样看起来是比较清晰的,但是有一个弊端,就是在操作MySQL的时候,MySQL......
  • 树状数组详解——本质上就是空间换时间,可以解决大部分基于区间上的更新以及求和问题
     943.区间和查询-Immutable 中文 English 给一个整数数组nums,求出下标从i到j的元素和(i≤j),i跟j对应的元素也包括在内。 样例样例1输入:nums=[-2,0,3,-5,2,-1]sumRange(0,2)sumRange(2,5)sumRange(0,5)输出:1-1-3解释:sumRange(0,2)->(-2......
  • Apollo配置中心管理后台的详解
    上篇【Apollo配置中心源码编译及搭建】搭建了Apollo。这篇来看看怎么使用Apollo管理后台。     Apollo(阿波罗)是携程框架部门研发的开源配置管理中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性。Apoll......
  • 2014.4.25.12.51_context_2014.4.25_Android种的Context详解
    Android中Context详解----你所不知道的Context一、Context相关类的继承关系2二、什么时候创建Context实例5从上可知一下三点,即:1、它描述的是一个应用程序环境的信息,即上下文。2、该类是一个抽象(abstractclass)类,Android提供了该抽象类的具体实现类(后面我们会讲到是Co......
  • 实时数据治理—当Atlas遇见Flink
    Atlas是Hadoop的数据治理和元数据框架。Atlas是一组可扩展和可扩展的核心基础治理服务,使企业能够有效,高效地满足Hadoop中的合规性要求,并允许与整个企业数据生态系统集成。ApacheAtlas为组织提供了开放的元数据管理和治理功能,以建立其数据资产的目录,对这些资产进行分类和治理,并为数......
  • IOS上架流程详解,包含审核避坑指南!
    ​准备开发者账号完工的项目上架步骤一、创建AppID二、创建证书请求文件(CSR文件)三、创建发布证书(CER)四、创建ProvisioningProfiles配置文件(PP文件)五、在AppStore创建应用六、打包上架一、创建AppID1.打开苹果开发者网,点击“Account”登录会员中心 ​......
  • IOS上架流程详解,包含审核避坑指南!
    ​准备开发者账号完工的项目上架步骤一、创建AppID二、创建证书请求文件(CSR文件)三、创建发布证书(CER)四、创建ProvisioningProfiles配置文件(PP文件)五、在AppStore创建应用六、打包上架一、创建AppID1.打开苹果开发者网,点击“Account”登录会员中心 ​......
  • 详解Oracle用户解锁命令的两则实现方法
    在安装完Oracle10g之后,想打开sql*plus来学习,然后按照书上的步骤用scott用户来连接数据库,可输了好几次都提示一个错误。error:theaccountislocked然后上网查了一下之后发现这个用户被锁定了,至于它为什么被锁定,可能是下面几个原因。1.尝试多次登录未成功.(可能密码不正确)2.此用......
  • FLink写入Clickhouse优化
    一、背景ck因为有合并文件操作,适合批量写入。如单条插入则速度太慢二、Flink写入ck优化改为分批插入,代码如下DataStream<Row>stream=...stream.addSink(JdbcSink.sink("INSERTINTOmytable(col1,col2)VALUES(?,?)",(ps,row)->{ps.setString(1,row.ge......