Apache Flink架构及其工作原理
1、定义:
Apache flink 是一个实时计算框架和分布式处理引擎,用于再无边界和有边界数据流上进行有状态的计算,Flink能在所有的集群环境中运行,并能以内存的速度和任意规模进行计算
2、Apache Flink特性
支持高吞吐、低延迟、高性能的流处理
支持带有事件时间的窗口(Window)操作
支持有状态计算的Exactly-once语义
支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
支持具有反压功能的持续流模型
支持基于轻量级分布式快照(Snapshot)实现的容错
一个运行时同时支持Batch on Streaming处理和Streaming处理
Flink在JVM内部实现了自己的内存管理,避免了出现oom
支持迭代计算
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
3、Apache Flink组件栈
4、阿里云实时计算支持的抽象层级
5、Flink ApIs
6、SQL进行实时计算的优势
7、Programs& Dataflows
二、Flink实时wordcount
package com.shujia.flink
import org.apache.flink.streaming.api.scala._
object core {
def main(args: Array[String]): Unit = {
/**
* 构建Flink环境
* 不需要指定运行环境,Flink会自动识别
*/
//source
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 手动设置并行度,默认等于当前主机逻辑上最大的CPU核数
env.setParallelism(2)
// 控制上下游数据传递的时间
// env.setBufferTimeout(0)
//tranformation
//通过socket构建数据流 ,nc_lk 8888
val linesDs: DataStream[String] = env.socketTextStream("master01", 8888)
//1、将数据进行切分
val wordDS: DataStream[String] = linesDs.flatMap(word => (word.split(",")))
//将数据转成kv格式,给每个单词后加上数字1
val kvDS: DataStream[(String, Int)] = wordDS.map(kv => (kv, 1))
//3、使用keyBy按照每个单词进行分组3、使用keyBy按照每个单词进行分组
val grpDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => (kv._1))
//4.将数据输出出来
val wordcntDS: DataStream[(String, Int)] = grpDS.sum(1)
wordcntDS.print()
//启动flink任务 :sink
env.execute()
}
}
1、Flink与Spark处理数据的区别
(1)、spark处理数据
在spark处理数据中首先要运行map端,等到map端运行完成才会启动reduce端---MapReduce模型
优点:map端可以进行预聚合,可以减少shuffle过程中传输的数据量,提高运行效率
缺点:如果使用MR模型处理实时数据,延时会比较长,数据不会一瞬间输出出来,而是进行预聚合处理数据过后将数据输出出来;
(2)、Flink处理数据
在Flink任务运行时,上下游任务会同时启动,等数据过来,来一条处理一条--数据流模型
优点:上下游任务可以同时启动,所以数据的延迟非常低
缺点:需要一直占用资源,无法进行“Map端”的预聚合
在Flink中为了提高效率,上游任务在往下游任务发出数据时吗,默认大小为32KB,时间默认为200ms;
在Flink中每一个并行度可以看成一个Task;
shuffle之前叫做上游任务,shuffle之后的叫做下游任务;
2、控制Flink的处理方式:
控制Flink的处理方式:
* BATCH --> 只能用于有界流,类似Spark的批处理方式,只会输出最终的一个结果
* STREAMING --> 既能作用在无界流上,也可作用在有界流上,都是进行逐项处理
* AUTOMATIC --> 自动判断
Source数据源:
Flink 在流处理和批处理上的 source 大概有 4 类:
基于本地集合的 source、
基于文件的 source、
基于网络套接字的 source、
自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
package com.shujia.flink.source
import org.apache.flink.streaming.api.scala._
object Demo01Source {
def main(args: Array[String]): Unit = {
/**
* 构建Flink环境
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//1、基于本地集合进行构建,例如List构建DataStream
val listDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5))
// listDS.print()
//2、基于文件的source(做WordCount)
val textDS: DataStream[String] = env.readTextFile("F:\\IdeaProjects\\Flink\\src\\data\\words.txt")
textDS.flatMap(_.split(","))
.map(word=>{(word,1)})
.keyBy(kv=>{(kv._1)})
.sum(1)
.print()
//3、基于网络套接字
val linesDs: DataStream[String] = env.socketTextStream("master01", 8888)
env.execute()
}
}
Flink自定义source-----addsource
addsource要传入一个SourceFunction,所以直接自定义sourcefunction进行进行传入
在自定义Myfunction要进行继承SourceFunction进行实现方法
package com.shujia.flink.source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object Demo02SourceFunction {
def main(args: Array[String]): Unit = {
/**
* 搭建Flink环境
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val mysource: DataStream[Int] = env.addSource(new Mysource)
mysource.print()
env.execute()
}
class Mysource extends SourceFunction[Int]{
/**
* 任务运行时只会被执行一次,用于加载数据
* @param ctx:用于上游向下游发送数据
*/
override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit ={
while (true){
sourceContext.collect(1)
Thread.sleep(1000)
}
}
/**
* 当任务结束时会运行,主要用于回收资源
*/
override def cancel(): Unit = {}
}
}
Transformation
Transformation:数据转换的各种操作,
有 Map / FlatMap / Filter /KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,
操作很多,可以将数据转换计算成你想要的数据。
1、常用的transformation
package com.shujia.flink.transformation
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala._
object Demo02ScalaApI {
def main(args: Array[String]): Unit = {
//构建Flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//进行数据批处理
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
//求每个班男生的数量
val linesDS: DataStream[String] = env.readTextFile("F:\\IdeaProjects\\Flink\\src\\data\\students.txt")
//过滤出男生
/**
* filter算子:用于过滤数据,需要返回一个布尔值
* 如果返回true则保留数据,如果返回false则过滤数据
*/
val fliterDS: DataStream[String] = linesDS.filter(_.split(",")(3).equals("男"))
//将以班级分组后面加上1
/**
* map算子:传入一条数据返回一条
* flatMap算子:传入一条,返回一个数据容器,并进行展开
*/
val mapDS: DataStream[(String, Int)] = fliterDS.map(word => {(word.split(",")(4), 1)})
//按照班级分组
/**
* keyBy:从数据中指定一个“字段”进行分组
*/
val keybyDS: KeyedStream[(String, Int), String] = mapDS.keyBy(word => word._1)
//统计人数
/**
* 使用sum自动进行加和的聚合操作
*/
// val clazzcntDS: DataStream[(String, Int)] = keybyDS.sum(1)
/**
* 使用reduce手动进行聚合操作
*/
val clazzcntDS: DataStream[(String, Int)] = keybyDS.reduce((kv1, kv2) => {
(kv1._1, kv1._2 + kv2._2)
})
//将会最后结果进行输出
clazzcntDS.print()
//执行
env.execute()
}
}
windows窗口
1、处理时间
处理时间是基于当前系统时间进行操作的,以系统时间为准
再调用窗口函数时,需要将 .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))添加在所需要执行DataSteam类型的变量上;
package com.shujia.flink.transformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object Demo04Window {
def main(args: Array[String]): Unit = {
//创建Flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 统计最近10s内的单词数量 -- 使用窗口
val linesDS: DataStream[String] = env.readTextFile("F:\\IdeaProjects\\Flink\\src\\data\\students.txt")
// val linesDS: DataStream[String] = env.socketTextStream("master01", 8888)
val windowDS: WindowedStream[(String, Int), String, TimeWindow] = linesDS
.flatMap(_.split(","))
.map(word => (word, 1))
.keyBy(_._1)
/**
* 在分组过后10秒时间进行统计数量,类似于批处理,基于处理时间的滚动窗口,处理完结束程序结束
*/
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
val wordCntDS: DataStream[(String, Int)] = windowDS.sum(1)
wordCntDS.print()
env.execute()
}
}
2、事件时间(Flink特有的,Spark中没有)
这里的事件时间是固定的,对于每个用户设置都是固定的,当事件时间超过这个时间就会触发,并返回数据;
对于时间字符串顺序进入的情况下:
事件时间是对于要处理的数据中存在的与时间有关的,基于这个时间(字符串类型)转化成真正的时间,最后通过调用事件时间将刚转为时间的字符串时间作为时间 ,并设置时间的间隔用作时间,不以系统时间为准;
重要的一点:要将时间的并行度设为1;
对于数据乱序进入后:
(如果数据是乱序进入Flink中,并不是完全按照时间顺序的情况下,如何处理数据轻微乱序问题?)
如果数据乱序的时间超过一个小时也就没有意义了
水位线(WaterMark):将不在固定时间间隔内的数据接收到的时候将水平线进行提前延时处理;使数据不会丢失;
package com.shujia.flink.core
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import java.text.SimpleDateFormat
import java.time.Duration
import java.util.Date
object Demo04EventTime {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 将并行度设置为1 避免数据随机到达每一个并行度 导致无法满足5s的条件
env.setParallelism(1)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
// 将接受的数据切开,把单词、时间提取出来
/**
* java,2022-11-17 14:21:20
* java,2022-11-17 14:21:21
* java,2022-11-17 14:21:22
* java,2022-11-17 14:21:23
* java,2022-11-17 14:21:24
* java,2022-11-17 14:21:25
* java,2022-11-17 14:21:27
* java,2022-11-17 14:21:28
* java,2022-11-17 14:21:29
* java,2022-11-17 14:21:30
* java,2022-11-17 14:21:31
* java,2022-11-17 14:21:32
* java,2022-11-17 14:21:33
* java,2022-11-17 14:21:34
* java,2022-11-17 14:21:35
*/
val wordAndTimeDS: DataStream[(String, String)] = linesDS.map(line => {
val splits: Array[String] = line.split(",")
val word: String = splits(0)
val timeStr: String = splits(1)
(word, timeStr)
})
// 如果需要使用事件时间则需要告诉Flink哪一列是时间列
// val assDS: DataStream[(String, String)] = wordAndTimeDS.assignAscendingTimestamps(t2 => {
// // 将时间字符串转换成时间戳
// val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// val date: Date = format.parse(t2._2)
// date.getTime
// })
// 如果数据不是完全按照时间顺序进入到Flink,怎么处理数据乱序问题?
// 可以使用Watermark水位线前移的方式来解决数据的轻微乱序问题
val assDS: DataStream[(String, String)] = wordAndTimeDS
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[(String, String)](Duration.ofSeconds(5)) // 将水位线前移5s
// 指定用哪一列作为事件时间
.withTimestampAssigner(new SerializableTimestampAssigner[(String, String)] {
override def extractTimestamp(element: (String, String), recordTimestamp: Long): Long = {
// 将时间字符串转换成时间戳
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date: Date = format.parse(element._2)
date.getTime
}
}))
/**
* 统计最近5s内的单词数量 --> 使用时间窗口
*/
/**
* 窗口的触发条件:
* 1、窗口内有数据
* 2、水位线大于等于窗口的触发点
*
* 水位线默认等于当前并行度接收到的最大的事件时间
* 如果前移则需要减去对应的时间
*/
assDS
.map(t2 => (t2._1, 1))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
.print()
env.execute()
}
}
Sink---Flink 将转换计算后的数据发送的地点
Flink 常见的 Sink 大概有如下几类:
写入文件、
打印出来、
写入 socket 、
自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。
SinkToFlie
1、可以使用 .writeAsText("路径")默认会生成同任务并行度数量一样多的文件,目前已经弃用
2、在使用FlinkSink写文件的时候,需要导入Flink-connect-files依赖,定义所需要的路径以及编码格式,接着定义文件滚动的格式
例如:
1)有数据多久切换一个文件
2)没数据多久切换一个文件
3)当文件超过多大时进行文件的切换,单位为字节
package com.shujia.flink.sink
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import java.time.Duration
object Demo01SinkToFile {
def main(args: Array[String]): Unit = {
//创建Flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//source使用网络套接字
val linesDS: DataStream[String] = env.readTextFile("F:\\IdeaProjects\\Flink\\src\\data\\words.txt")
// val linesDS: DataStream[String] = env.socketTextStream("master01", 8888)
// 将输入的数据进行计数
val wordCntDS: DataStream[String] = linesDS
.flatMap(_.split(","))
.map(word => (word, 1))
.keyBy(_._1)
.sum(1)
.map(kv => s"${kv._1}\t${kv._2}")
// 该方法已被弃用,默认会生成同任务并行度数量一样多的文件
// wordCntDS.writeAsText("Flink/data/sink/")
/**
* 使用FileSink写文件,需要导入flink-connector-files该依赖
*/
val fileSink: FileSink[String] = FileSink
.forRowFormat(new Path("F:\\IdeaProjects\\Flink\\src\\data\\fileSink"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
/**
* 文件滚动的策略
*/
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofSeconds(10)) // 假如一直有数据多久切换一个文件
.withInactivityInterval(Duration.ofSeconds(10)) // 假如多久没数据切换一个文件
.withMaxPartSize(MemorySize.ofMebiBytes(1024 * 1024 * 128)) // 当文件大小超过该值时就切换文件,单位为字节
.build())
.build()
wordCntDS.sinkTo(fileSink)
env.execute()
}
}
自定义sinkFunction
package com.shujia.flink.sink
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
object Demo02SinkFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
val linesDS: DataStream[String] = env.readTextFile("F:\\IdeaProjects\\Flink\\src\\data\\words.txt")
val wordcntDS: DataStream[String] = linesDS
.flatMap(_.split(","))
.map(kv => {
(kv, 1)
})
.keyBy(kv => {
(kv._1)
})
.sum(1)
.map(kv => s"${kv._1}\t${kv._2}")
wordcntDS.addSink(new MySink)
env.execute()
}
}
class MySink extends SinkFunction[String]{
override def invoke(value: String, context: SinkFunction.Context): Unit =
println(s"自定Sink:$value")
}
设置任务的并行度
设置任务的并行度:
1、env.setParallelism 设置任务全局的并行度
2、提交任务时可以手动指定并行度 命令行通过-p参数设置(推荐)
3、每个算子后也可以设置并行度
(代码中的优先级最高的)
当DS在转换时,如果两个DS中间没有shuffle过程并且并行度一致时可以合并到一个任务中
并行度就决定了任务同一时间能够处理的数据量--吞吐量
最大的并行度决定了最终需要给任务分配多少个槽位
这里测试任务是将任务使用standalone进行上传jar进行编译的
package com.shujia.flink.core
import org.apache.flink.streaming.api.scala._
object Demo03Parallelism {
def main(args: Array[String]): Unit = {
/**
* 设置任务的并行度:
* 1、env.setParallelism 设置任务全局的并行度
* 2、提交任务时可以手动指定并行度 命令行通过-p参数设置(推荐)
* 3、每个算子后也可以设置并行度
* (代码中的优先级最高的)
*
* 当DS在转换时,如果两个DS中间没有shuffle过程并且并行度一致时可以合并到一个任务中
*
* 并行度就决定了任务同一时间能够处理的数据量--吞吐量
* 最大的并行度决定了最终需要给任务分配多少个槽位
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val wordsDS: DataStream[String] = linesDS
.flatMap(_.split(","))
.setParallelism(1)
.name("切分出每个单词")
val kvDS: DataStream[(String, Int)] = wordsDS.map(word => (word, 1)).setParallelism(1).name("将每个单词变成KV格式")
/**
* keyBy之后不能设置并行度
*/
val grpDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
val wordCntDS: DataStream[(String, Int)] = grpDS.sum(1).setParallelism(1).name("统计单词数量")
wordCntDS.print().setParallelism(1).name("将单词数量打印")
env.execute()
}
}
标签:DataStream,flink,架构,String,val,Flink,env,Apache
From: https://www.cnblogs.com/tanggf/p/16901250.html