首页 > 其他分享 >Flink-状态管理

Flink-状态管理

时间:2022-09-05 22:11:34浏览次数:74  
标签:状态 val 管理 Double Flink value SensorReading 算子

 流式计算分为无状态和有状态两种情况。   无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过 90 度时发出警告。   有状态的计算则会基于多个事件输出结果。以下是一些例子。

  • 所有类型的窗口。例如,计算过去一小时的平均温度,就是有状态的计算。
  • 所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差 20 度以上的温度读数,则发出警告,这是有状态的计算。
  • 流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
下图展示了无状态流处理和有状态流处理的主要区别。   无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。   有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。   上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。 尽管无状态的计算很重要,但是流处理对有状态的计算更感兴趣。事实上,正确地实现有状态的计算比实现无状态的计算难得多。旧的流处理系统并不支持有状态的计算,而新一代的流处理系统则将状态及其正确性视为重中之重。   由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问 Flink会进行状态管理,包括状态一致性、故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑  

1.有状态的算子和应用程序

Flink 内置的很多算子,数据源 source,数据存储 sink 都是有状态的,流中的数据都是 buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction 会保存设置的定时器信息等等。 在 Flink 中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
  • 算子状态(Operatior State)
  • 键控状态(Keyed State)
 

1.1算子状态(Operatior State)

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。 Flink 为算子状态提供三种基本数据结构: A.列表状态(List state) 将状态表示为一组数据的列表。 B.联合列表状态(Union list state) 也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。 C.广播状态(Broadcast state) 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。  

1.2键控状态(Keyed State)

键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。   Flink 为键控状态提供以下数据结构:   A.值状态(Value State) 将状态表示为单个的值 ValueState[T]保存单个的值,值的类型为 T。 o get 操作: ValueState.value() o set 操作: ValueState.update(value: T)   B.列表状态(List State) 将状态表示为一组数据的列表 ListState[T]保存一个列表,列表里的元素的数据类型为 T。基本操作如下: o ListState.add(value: T) o ListState.addAll(values: java.util.List[T]) o ListState.get()返回 Iterable[T] o ListState.update(values: java.util.List[T])   C.映射状态(Map State) 将状态表示为一组Key-Value对 MapState[K, V]保存 Key-Value 对。 o MapState.get(key: K) o MapState.put(key: K, value: V) o MapState.contains(key: K) o MapState.remove(key: K)   D.聚合状态(Reducing state & Aggregating State) 将状态表示为一个用于聚合操作的列表 ReducingState[T] AggregatingState[I, O]  

2 键控状态的使用

package com.zhen.flink.api

import java.util

import com.zhen.flink.api.WindowTest.MyReducer
import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector


/**
  * @Author FengZhen
  * @Date 8/24/22 4:02 PM
  * @Description 状态编程
  */
object StateTest {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    // nc -lk 7777
    val inputStream = env.socketTextStream("localhost", 7777)
    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )


    //需求:对于温度传感器温度值跳变,超过10度,报警
    val alertStream = dataStream
        .keyBy(_.id)
//        .flatMap( new TempChangeAlert(10.0))
        .flatMapWithState[(String, Double, Double), Double]( {

          case (data: SensorReading, None) => ( List.empty, Some(data.temperature) )
          case (data: SensorReading, lastTemp: Some[Double]) => {
            //跟最新的温度值求差值作比较
            val diff = (data.temperature - lastTemp.get).abs
            if (diff > 10.0)
              (List((data.id, lastTemp.get, data.temperature)), Some(data.temperature))
            else
              (List.empty, Some(data.temperature))
          }
        } )

    alertStream.print()

    env.execute("state test")

  }

}


/**
  * 实现自定义RichFlatMapFunction
  * @param threshold
  */
class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{

  //定义状态,保存上一次的温度值
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(
    new ValueStateDescriptor[Double]("last-temp", classOf[Double])
  )

  override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {

    //获取上次的温度值
    val lastTemp = lastTempState.value()

    //跟最新的温度值求差值作比较
    val diff = (value.temperature - lastTemp).abs
    if (diff > threshold)
      out.collect( (value.id, lastTemp, value.temperature) )

    //更新状态
    lastTempState.update(value.temperature)
  }



}



/**
  * keyed state测试:必须定义在RichFunction中,因为需要运行时上下文
  */
class MyRichMapper extends RichMapFunction[SensorReading, String]{

  var valueState: ValueState[Double] = _

  lazy val listState: ListState[Int] = getRuntimeContext.getListState(
    new ListStateDescriptor[Int]("listState", classOf[Int])
  )

  lazy val mapState: MapState[String, Double] = getRuntimeContext.getMapState(
    new MapStateDescriptor[String, Double]("mapState", classOf[String], classOf[Double])
  )

  lazy val reduceState: ReducingState[SensorReading] = getRuntimeContext.getReducingState(
    new ReducingStateDescriptor[SensorReading]("reduceState", new MyReducer, classOf[SensorReading])
  )

  override def open(parameters: Configuration): Unit = {

    valueState = getRuntimeContext.getState(
      new ValueStateDescriptor[Double]("valueState", classOf[Double])
    )

  }


  override def map(value: SensorReading): String = {

    //状态读写
    val myValue = valueState.value()
    valueState.update(value.temperature)

    listState.add(1)

    val list = new util.ArrayList[Int]()
    list.add(2)
    list.add(3)
    listState.addAll(list)
    listState.update(list)

    mapState.put(value.id, value.temperature)

    //当前聚合完成的值
    reduceState.get()
    //对新value做聚合操作
    reduceState.add(value)

    value.id
  }

}

 

 

 

             

标签:状态,val,管理,Double,Flink,value,SensorReading,算子
From: https://www.cnblogs.com/EnzoDin/p/16659788.html

相关文章

  • 数据湖管理-数据血缘管理
    https://cloud.baidu.com/doc/SUGAR/s/4k6z5xntphttps://support.huaweicloud.com/download_dgc/index.html......
  • 网络填坑之路(6)ethtool - 命令管理以太网卡
    来源:如何使用ethtool命令管理以太网卡作者:MageshMaruthamuthu译者:Xingyu.Wangethtool介绍在配置和显示以太网设备统计数据方面,ethtool提供了与mii-tool相似的性......
  • 磁盘管理、挂载、格式化
    这写命令经常会用到,但是容易忘,今天做一下记录。查看挂载df-lh查看磁盘fdisk-l查看UUIDsudolsblk-f格式化磁盘mkfs.ext4/dev/sdb查看磁盘空间du-......
  • 为什么不用看板来管理日常工作?
    ​了解看板工具的时候,还是之前在公司实施敏捷开的时候关注的。但是后来因为没有好的项目来实践敏捷,我们的敏捷工具用的是Leangoo领歌,看板的方式管理,非常直观,效率也很高,还......
  • 浏览器输入URL发生了什么:DNS解析、TCP握手、HTTP缓存、重定向、服务器状态码、渲染引
    输入地址,浏览器查找域名的IP地址。浏览器向该IP地址的web服务器发送一个HTTP请求,在发送请求之前浏览器和服务器建立TCP的三次握手,判断是否是HTTP缓存,如果是强制......
  • 从零开始配置vim(21)——会话管理
    很多代码编辑器都有这么一个功能,重新进入编辑器之后能恢复上次打开的所有文件,窗口布局,有的甚至是上次设置的一些配置。那么vim是否也可以实现这样的功能呢?答案是肯定的。使......
  • Linux 用户管理
    Linux用户管理查看所有用户信息通过文件/etc/passwd查看所有用户信息:每一行对应于一个用户通过文件/etc/shadow查看所有用户信息:用于记录用户密码通过命令getent查看......
  • 软考-高项-第六章进度管理
    项目管理第十三个过程(进度管理第一个)规划进度管理......
  • Windows磁盘管理
    硬盘配置类型有两种:基本磁盘和动态磁盘  基本磁盘基本磁盘是最常用于Windows的存储类型。使用分区的形式来组织硬盘数据,几乎你见到的电脑大部分都是采用的基本磁盘基......
  • 如何在 RUST 的官方包管理器 CARGO 中发布项目!
    如何在RUST的官方包管理器CARGO中发布项目!介绍货物是系统和语言包管理器锈.大多数Rust开发人员使用这个工具来管理他们的项目,因为货物为您处理许多任务,例如......