流式计算分为无状态和有状态两种情况。 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过 90 度时发出警告。 有状态的计算则会基于多个事件输出结果。以下是一些例子。
- 所有类型的窗口。例如,计算过去一小时的平均温度,就是有状态的计算。
- 所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差 20 度以上的温度读数,则发出警告,这是有状态的计算。
- 流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
1.有状态的算子和应用程序
Flink 内置的很多算子,数据源 source,数据存储 sink 都是有状态的,流中的数据都是 buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction 会保存设置的定时器信息等等。 在 Flink 中,状态始终与特定算子相关联。总的来说,有两种类型的状态:- 算子状态(Operatior State)
- 键控状态(Keyed State)
1.1算子状态(Operatior State)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。 Flink 为算子状态提供三种基本数据结构: A.列表状态(List state) 将状态表示为一组数据的列表。 B.联合列表状态(Union list state) 也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。 C.广播状态(Broadcast state) 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。1.2键控状态(Keyed State)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。 Flink 为键控状态提供以下数据结构: A.值状态(Value State) 将状态表示为单个的值 ValueState[T]保存单个的值,值的类型为 T。 o get 操作: ValueState.value() o set 操作: ValueState.update(value: T) B.列表状态(List State) 将状态表示为一组数据的列表 ListState[T]保存一个列表,列表里的元素的数据类型为 T。基本操作如下: o ListState.add(value: T) o ListState.addAll(values: java.util.List[T]) o ListState.get()返回 Iterable[T] o ListState.update(values: java.util.List[T]) C.映射状态(Map State) 将状态表示为一组Key-Value对 MapState[K, V]保存 Key-Value 对。 o MapState.get(key: K) o MapState.put(key: K, value: V) o MapState.contains(key: K) o MapState.remove(key: K) D.聚合状态(Reducing state & Aggregating State) 将状态表示为一个用于聚合操作的列表 ReducingState[T] AggregatingState[I, O]2 键控状态的使用
package com.zhen.flink.api import java.util import com.zhen.flink.api.WindowTest.MyReducer import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction} import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @Author FengZhen * @Date 8/24/22 4:02 PM * @Description 状态编程 */ object StateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 0.读取数据 // nc -lk 7777 val inputStream = env.socketTextStream("localhost", 7777) // 1.先转换成样例数据 val dataStream: DataStream[SensorReading] = inputStream .map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) } ) //需求:对于温度传感器温度值跳变,超过10度,报警 val alertStream = dataStream .keyBy(_.id) // .flatMap( new TempChangeAlert(10.0)) .flatMapWithState[(String, Double, Double), Double]( { case (data: SensorReading, None) => ( List.empty, Some(data.temperature) ) case (data: SensorReading, lastTemp: Some[Double]) => { //跟最新的温度值求差值作比较 val diff = (data.temperature - lastTemp.get).abs if (diff > 10.0) (List((data.id, lastTemp.get, data.temperature)), Some(data.temperature)) else (List.empty, Some(data.temperature)) } } ) alertStream.print() env.execute("state test") } } /** * 实现自定义RichFlatMapFunction * @param threshold */ class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{ //定义状态,保存上一次的温度值 lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("last-temp", classOf[Double]) ) override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //获取上次的温度值 val lastTemp = lastTempState.value() //跟最新的温度值求差值作比较 val diff = (value.temperature - lastTemp).abs if (diff > threshold) out.collect( (value.id, lastTemp, value.temperature) ) //更新状态 lastTempState.update(value.temperature) } } /** * keyed state测试:必须定义在RichFunction中,因为需要运行时上下文 */ class MyRichMapper extends RichMapFunction[SensorReading, String]{ var valueState: ValueState[Double] = _ lazy val listState: ListState[Int] = getRuntimeContext.getListState( new ListStateDescriptor[Int]("listState", classOf[Int]) ) lazy val mapState: MapState[String, Double] = getRuntimeContext.getMapState( new MapStateDescriptor[String, Double]("mapState", classOf[String], classOf[Double]) ) lazy val reduceState: ReducingState[SensorReading] = getRuntimeContext.getReducingState( new ReducingStateDescriptor[SensorReading]("reduceState", new MyReducer, classOf[SensorReading]) ) override def open(parameters: Configuration): Unit = { valueState = getRuntimeContext.getState( new ValueStateDescriptor[Double]("valueState", classOf[Double]) ) } override def map(value: SensorReading): String = { //状态读写 val myValue = valueState.value() valueState.update(value.temperature) listState.add(1) val list = new util.ArrayList[Int]() list.add(2) list.add(3) listState.addAll(list) listState.update(list) mapState.put(value.id, value.temperature) //当前聚合完成的值 reduceState.get() //对新value做聚合操作 reduceState.add(value) value.id } }
标签:状态,val,管理,Double,Flink,value,SensorReading,算子 From: https://www.cnblogs.com/EnzoDin/p/16659788.html