Flink DataStream Transform(三)
环境变量
import org.apache.flink.api.scala.ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment;//批处理运行上下文环境
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
val streamenv = StreamExecutionEnvironment.getExecutionEnvironment//流处理运行上下文环境
map filter flatMap
import org.apache.flink.streaming.api.scala.createTypeInformation
streamenv.fromCollection(Array("1,2","3,4","5,6"))
.flatMap(x=>x.split(",")).map(x=>(x,1)).filter(x=>x._2.toInt>=1).print()
keyby min max
//样例数据key都是一样的,注意结果打印
streamenv.fromCollection(Array((1,3),(1,2),(1,1))).keyBy(x=>x._1).sum(1).print()
streamenv.fromCollection(Array((1,3),(1,2),(1,1))).keyBy(x=>x._1).min(1).print()
streamenv.fromCollection(Array((1,1),(1,2),(1,3))).keyBy(x=>x._1).max(1).print()
streamenv.fromCollection(Array((1,1),(1,2),(1,3))).keyBy(x=>x._1).minBy(1).print()
streamenv.fromCollection(Array((1,3),(1,2),(1,1))).keyBy(x=>x._1).maxBy(1).print()
streamenv.fromCollection(Array((1,(3,2)),(1,(6,5)),(2,(4,7)))).keyBy(x=>x._1)
.reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2))).print()
val data1 = streamenv.fromCollection(Array((1,1),(1,2),(1,3)))
val data2 = streamenv.fromCollection(Array((2,1),(2,2),(2,3)))
data2.union(data1).print()
connect map flatMap
val data1 = streamenv.fromCollection(Array((1),(2),(3)))
val data2 = streamenv.fromCollection(Array((2,1),(2,2),(2,3)))
data1.connect(data2).map((x=>x+1),(x=>(x._1+1,x._2))).print()
data1.connect(data2).flatMap((x=>Array(x)),(x=>Array(x._1,x._2))).print()
分区
//随机分配
streamenv.fromCollection(Array((1),(2),(3))).shuffle.print()
//对输入流进行重分区
streamenv.fromCollection(Array((1),(2),(3))).rebalance.print()
//接收方任务的数量是发送方任务数量的倍数或者相反 rescale转换更有效
streamenv.fromCollection(Array((1),(2),(3))).rescale.print()
rebalance()会在所有发送任务与所有接收任务之间创建通信通道,而rescale()则只创建从每个任务到下游操作的某些任务的通道。
streamenv.fromCollection(Array((1),(2),(3))).broadcast.print() //将数据广播给下游
streamenv.fromCollection(Array((1),(2),(3))).global.print() //并行度会变成1
自定义分区
import org.apache.flink.api.common.functions.Partitioner
class MyPartitioner extends Partitioner[Int]{//自定义分区
override def partition(word: Int, num: Int): Int = {
if(word>=2) 0 else 1
}
}
streamenv.fromCollection(Array((1),(2),(3))).partitionCustom(new MyPartitioner,x => x ).print()
并行度
import org.apache.flink.streaming.api.scala.createTypeInformation
streamenv.setParallelism(1)//设置全局的并行度
streamenv.fromCollection(Array((1),(2),(3))).setParallelism(2)//每一个算子 都可以有并行度
flink可实现接口汇总(暂作为了解,后面详解)
MapFunction:实现自定义的map函数,就是map中的表达式可以抽成一个class。个人感觉还不如直接写表达式
FilterFunction:实现自定义的filter函数
RichFunction:普通函数没有办法在执行之前进行变量初始化。RichFunction可以(上一章其实有样例的,后面也会有)
ProcessFunction:功能最强大的接口,可以获取上下文环境等信息(先记一下,后续详细看,加强版本的richfunction)
ProcessFunction、KeyedProcessFunction、CoProcessFunction、
ProcessJoinFunction、BroadcastProcessFunction、KeyedBroadcastProcessFunction、
ProcessWindowFunction、ProcessAllWindowFunction
水位线和watermark
数据是源源不断的,处理数据的时候只能一次处理一段时间的数据。
这时候就需要一个字段告诉程序当前时间,数据会有延迟,所以就需要在时间基础上延迟。
这就是水位线和watermark的简单理解
//水位线:指示时间进展的标记延迟一段时间关闭
//周期性的产生标记/比较之前的标记/整体延迟等待 当前是10s记录的是0
//任务中比较之前的标记(以最慢的为主) 水位线是由上游任务广播到下游任务的
import org.apache.flink.streaming.api.scala.createTypeInformation
val data = streamenv.fromCollection(Array((1,1000),(2,2001),(3,9002))).map(x=>(x._1,x._2,1)).keyBy(x=>x._1)
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.util.Collector
import java.time.Duration
import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
streamenv.getConfig.setAutoWatermarkInterval(100)//设置小的延迟处理小大批量的数据
//有序watermark生成
data.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(
new SerializableTimestampAssigner[Tuple3[Int,Int,Int]]{
override def extractTimestamp(t: Tuple3[Int,Int,Int], l: Long): Long = t._2.toLong//获取时间字段位置
}))
//乱序程度可以确定的
data.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2))
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple3[Int,Int,Int]](){
override def extractTimestamp(t: Tuple3[Int,Int,Int], l: Long): Long = t._2.toLong//获取时间字段位置
}))
ProcessFunction 处理 侧输出流处理
数据延迟有水位线处理,但更延迟的数据我想单独处理怎么办?侧输出流
import org.apache.flink.streaming.api.scala.OutputTag
val latetag = new OutputTag[String]("late")
class SplitProcessFunction(latetag:OutputTag[String]) extends ProcessFunction[String,String] {
override def processElement(input: String, ctx: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
if(input.toLong>1000){
out.collect(input)
}else{
ctx.output(latetag,input)
}
}
}
val data = streamenv.fromCollection(Array((1,1000),(2,2001),(3,9002))).map(x=>(x._1,x._2,1)).keyBy(x=>x._1)
val success = data.map(x=>(x._2.toString)).process(new SplitProcessFunction(latetag))
val fail = success.getSideOutput(latetag).print()
状态
数据源源不断过来,这时候需要暂存一些中间数据,flink提供了一些数据结构,这就是状态
// flink中的状态 算子状态(算子状态的作用范围限定为算子任务)
// 键控状态(keyby 以后根据输入数据流中定义的key来维护和访问)
// 算子状态(liststate、unionliststate、broadcaststate)
// 键控状态(对每一个key都保存了一份状态,相同key可以访问状态)
// (valuestate、liststate、mapstate、reducestate&Aggregatingstate)
import org.apache.flink.api.common.state.MapState//键控状态 样例
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor,
ValueStateDescriptor,ValueState,MapState,MapStateDescriptor,ReducingState}
import org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction, RichMapFunction}
class MyMapper extends RichMapFunction[String,String] {
var valuestate:ValueState[String] = _
var liststate:ListState[String] = _
var mapstate:MapState[String,String] = _
var reducestate:ReducingState[String] = _//reducestate = getRuntimeContext.getReducingState(
new ReducingStateDescriptor[String]("reducestate",,classOf[String]))
override def open(parameters: Configuration): Unit ={
valuestate = getRuntimeContext.getState(new ValueStateDescriptor[String]("valuestate",classOf[String]))
liststate = getRuntimeContext.getListState(new ListStateDescriptor[String]("liststate",classOf[String]))
mapstate = getRuntimeContext.getMapState(new MapStateDescriptor[String,String]
("mapstate",classOf[String],classOf[String]))
}
override def map(in: String): String = {
valuestate.value().substring(0,1)
valuestate.update("1")
liststate.add("1")
liststate.get()//获取到列表
liststate.update(null)
mapstate.put("1","1")
mapstate.remove("1")
reducestate.get()
reducestate.add("1")
"1"
}
}
状态后端
上面的状态读写数据以后是需要存储的,可以存储在内存外部系统等一些存储系统中。
这就涉及到了一些存储系统和存储策略。这就是状态后端
状态后端:状态的存储、序列化、checkpoint
MemoryStateBackend:内存级别的状态后端,存储再taskManager的JVM堆上,
checkpoint存储在jobmanager的内存中,读取写入比较快,但是不稳定
FsStateBackend:存储再taskManager的JVM堆上,checkpoint存储在jobmanager的文件系统中,内存状态的限制
RocksDBStateBackend:所有状态序列化后,存储再本地的DB中
checkpoint一致性检查
//保存所有任务处理完相同输入数据的时候 kafka下标6执行完成以后 都保存这个状态
//检查点和数据处理分离开 不暂定整个应用(再某一个数据后面插入一个标记位 再这个数据处理完成以后进行checkpoint)
//jobmanager 通知所有source进行检查点的保存,source进行check保存以后,广播告诉下游checkpointid及下标
//聚合任务收到聚合数据下标(等到数据下标过来的时候等到其他聚合任务收到下标数据的时候共同保存)
//这时候来的其他数据缓存 最大的时间消耗是向远程同步
状态后端保存策略
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
sourceTestEnv.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE)
//source添加完成 到所有节点完成时间
sourceTestEnv.getCheckpointConfig.setCheckpointTimeout(60*1000l)
//最大允许的checkpoint
sourceTestEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
//保证checkpoint之间的间隔 延迟checkpoint发生的时间 设置这个上面同时checkpoint就是1
sourceTestEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
//checkpoint 失败多少次
sourceTestEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(10)
//checkpoint重启策略
//参数:尝试重启次数和尝试重启时间间隔
sourceTestEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000))
//参数:失败率 时间间隔 两次重启时间间隔
sourceTestEnv.setRestartStrategy(RestartStrategies.failureRateRestart(5,
org.apache.flink.api.common.time.Time.minutes(5),
org.apache.flink.api.common.time.Time.seconds(10)))
状态查询
QueryableStateClient
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-queryable-state-clientjava_2.12</artifactid>
<version>1.7.1</version>
</dependency>
有状态的flatMap
import org.apache.flink.api.common.functions.{RichFlatMapFunction}
class MyFlatMap(tmp:Double) extends RichFlatMapFunction[String,(String,Double,Double)] {
var valuestate:ValueState[String] = _
override def open(parameters: Configuration): Unit = {
valuestate = getRuntimeContext.getState(new ValueStateDescriptor[String]("valuestate",classOf[String]))
}
override def flatMap(in: String, collector: Collector[(String, Double, Double)]): Unit = {
if(tmp < valuestate.value().toDouble){
collector.collect(valuestate.value(),tmp,valuestate.value().toDouble)
valuestate.update(tmp.toString)
}
}
}
import scala.collection.immutable.List.empty
val data = streamenv.fromCollection(Array((1,1000),(2,2001),(3,9002))).map(x=>(x._1,x._2,1)).keyBy(x=>x._1)
data.map(x=>(x,1)).keyBy(x=>x._1)//必须是再keyby以后
.flatMapWithState[(String,Int),String] = {//输出类型,中间结果的类型
case ((x,y),None)=>(empty,Some(x))//data输入的数据 None输出的类型
case ((x,y),last:Some[String])=>(Tuple2(last.get,y),Some(x))//第一个值换成上一个的值
}
水位线的高级处理
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
//通过水位线可以简单利用水位线,但是如果我们需要获取当前水位线等操作是无法实现的,这时候就需要使用ProcessFunction
val data = streamenv.fromCollection(Array((1,1000),(2,2001),(3,9002))).map(x=>(x._1,x._2,1)).keyBy(x=>x._1)
data.process(new MyKeyedProcessFunction).print()
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
//key的类型 输入类型 输出类型 添加定时器
class MyKeyedProcessFunction extends KeyedProcessFunction[Int,(Int,Int,Int),String]{
var lastvalue:ValueState[String] = _
var lastts:ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
lastvalue = getRuntimeContext.getState(new ValueStateDescriptor[String]("lastvalue",classOf[String]))
lastts = getRuntimeContext.getState(new ValueStateDescriptor[Long]("lastts",classOf[Long]))
}
override def processElement(input: (Int,Int,Int), ctx: KeyedProcessFunction[Int, (Int,Int,Int), String]#Context, collector: Collector[String]): Unit = {
// ctx.output(empty) //侧输出流 ctx.timerService().currentWatermark()//当前的水位线 ctx.timerService().registerEventTimeTimer(ctx.timestamp()+10*1000)
if(lastvalue.value()>input.toString() && lastts.value()==0)
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+10*1000)//注册当前时间戳的定时器
else if(lastvalue.value()<input.toString() && lastts.value()!=0){
ctx.timerService().deleteEventTimeTimer(lastts.value()+10*1000)//注册当前时间戳的定时器
lastts.clear()
}
lastts.update(input.toString().toLong)
lastvalue.update(input.toString())
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, (Int,Int,Int), String]#OnTimerContext, out: Collector[String]): Unit = {
//多个定时器通过if else 判断
System.out.println("一直上升")
}
}
窗口
flink只能处理一段时间内的数据,这个一段时间就可以用窗口来切分
//数据延迟可以通过:可以启动多个窗口(桶)、数据往多个窗口里面扔
//窗口的分类:1.按照驱动类型分类(时间窗口和计数窗口)
// 2.分配数据类型分类(滚动窗口[当前窗口的大小,没有重迭]、
// 滑动窗口[窗口大小,滑动的距离,会重迭]、会话窗口[设置会话超时时间]、
// 全局窗口[全局有效 不会触发计算])
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows//事件时间处理窗口
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows//事件时间处理窗口
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows//事件时间处理窗口
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.assignersTumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
val sourceTestEnv = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
val windowdata = sourceTestEnv.fromElements[String]
("1,1663828406000","2,1663828416000","3,1663828426000","4,1663828436000")
windowdata.map(x=>(x,1)).keyBy(x=>x._1).window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(5)))
windowdata.map(x=>(x.split(",").apply(0),x.split(",").apply(1))).keyBy(x=>x._1)//按照用户分区
.window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(5)))//滑动事件窗口 1窗口大小 2滑动距离
// .window(TumblingEventTimeWindows.of(Time.minutes(5)))//滚动事件窗口 滚动大小
// .window(EventTimeSessionWindows.withGap(Time.seconds(2)))//时间时间会话窗口
// .countWindow(2000,500)//一个参数滚动窗口 两个参数滑动窗口
windowdata.map(x=>(x.split(",").apply(0),x.split(",").apply(1)))//不keyby的操作
.windowAll(EventTimeSessionWindows.withGap(Time.seconds(2)))//并行度是1
watermark window 联合使用
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}//事件时间处理窗口
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
val sourceTestEnv = StreamExecutionEnvironment.getExecutionEnvironment
//
val watermark_window_data = sourceTestEnv.fromElements[String]
("1,1663828406000","2,1663828416000","3,1663828426000","4,1663828436000")
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala.OutputTag
val watermark_window_data_latetag = new OutputTag[(String, Int)]("late")
sourceTestEnv.getConfig.setAutoWatermarkInterval(100)
val result = watermark_window_data.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(
new SerializableTimestampAssigner[String](){
override def extractTimestamp(t: String, l: Long): Long = t.split(",").apply(3).toLong
})).map(x=>(x,1)).keyBy(x=>x._1)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.minutes(10))//处理迟到的数据 迟到的数据立刻输出
.sideOutputLateData(watermark_window_data_latetag)
.reduce((x,y)=>(x._1,x._2+y._2))
result.getSideOutput(watermark_window_data_latetag).print()
其他概念
保存点 savepoint 有计划的时间备份/更新应用程序/版本迁移/暂停和重启应用/资源的重新分配
data.uid("保存的id名称")
状态一致性 数据不能丢,也不能重复计算 AT-MOST-ONCE(最多一次) AT-LEAST-ONCE(至少一次) EXACTLY-ONCE(精确一次)
EXACTLY-ONCE如何保证: 内部checkpoint source可以重新读取数据 sink保证不会重复写入(幂等写入/事务写入)
幂等写入(写一次和写多次结果是一样的hashmap) 事务写入(原子性 等到checkpoint完成以后 再写入到外部系统 (预写日志 效率不高))
两阶段提交:每个checkpoint和sink任务都会启动一个事务 实现了EXACTLY-ONCE 需要实现支持外部的接口(TwoPhaseCommitSinkFunction)
两阶段提交对外部系统有要求:事务支持/checkpoint间隔时间内必须开启一个事务/完成以前事务是等待提交状态/能够恢复事务/提交事务是幂等操作
flink_kafka 保证状态一致性 :
标签:DataStream,flink,String,import,Flink,Transform,._,apache,org
From: https://www.cnblogs.com/wuxiaolong4/p/16777015.html