首页 > 其他分享 >大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析

大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析

时间:2024-09-18 17:25:49浏览次数:12  
标签:状态 01 org flink State import apache 原理 Flink


点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink 并行度
  • Flink 并行度详解
  • Flink 并行度 案例

状态类型

Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。

  • 有状态计算:依赖之前或之后的事件
  • 无状态计算:独立

根据数据结构不同,Flink定义了多种State,应用于不同的场景。

  • ValueState:即类型为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
  • ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
  • ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时候,会调用ReduceFunction,最后合并到一个单一的状态值。
  • FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态会在未来的Flink版本当中删除)
  • MapState:即状态值为一个Map,用户通过put和putAll方法添加元素

State按照是否有Key划分为:

  • KeyedState
  • OperatorState

案例1 利用State求平均值

实现思路

  • 读数据源
  • 将数据源根据Key分组
  • 按照Key分组策略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果

编写代码

package icu.wzk;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class FlinkStateTest01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<Long, Long>> data = env
                .fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(1L, 4L),
                        Tuple2.of(1L, 2L)
                );
        KeyedStream<Tuple2<Long, Long>, Long> keyed = data
                .keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
                    @Override
                    public Long getKey(Tuple2<Long, Long> value) throws Exception {
                        return value.f0;
                    }
                });
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed
                .flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    private transient ValueState<Tuple2<Long, Long>> sum;

                    @Override
                    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                        Tuple2<Long, Long> currentSum = sum.value();
                        if (currentSum == null) {
                            currentSum = Tuple2.of(0L, 0L);
                        }
                        // 更新
                        currentSum.f0 += 1L;
                        currentSum.f1 += value.f1;
                        System.out.println("currentValue: " + currentSum);
                        // 更新状态值
                        sum.update(currentSum);
                        // 如果 count >= 5 清空状态值 重新计算
                        if (currentSum.f0 >= 5) {
                            out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));
                            sum.clear();
                        }
                    }

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                                "average",
                                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
                        );
                        sum = getRuntimeContext().getState(descriptor);
                    }
                });
        flatMapped.print();
        env.execute("Flink State Test");
    }
}

运行结果

大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析_大数据

执行分析

大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析_大数据_02


大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析_分布式_03

Keyed State

表示和Key相关的一种State, 只能用于KeyedStream类型数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。

KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变化时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例可能运行一个或者多个KeyGroups的Keys。

Operator State

与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变化时自动重新分配状态数据。

同时在Flink中KeyedState和OperatorState均具有两种形式,其中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久话到CheckPoints中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发CheckPoint中,当从CheckPoint恢复任务时,算子自己再返序列化出状态的数据结构。

DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要原因是ManagedState能够更好地支持状态数据的重平衡以及更加完善的内存管理。

状态描述

大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析_java_04

State既然是暴露给用户的,那么就需要有一些属性需要指定:

  • State名称
  • Value Serializer
  • State Type Info

在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptot)


标签:状态,01,org,flink,State,import,apache,原理,Flink
From: https://blog.51cto.com/wuzikang/12047335

相关文章

  • 大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(正在更新!)章节内容上节我们完成了如下的内容:FlinkTimeWatermarkJava代码实例测试简单介......
  • TPS628501QDRLRQ1开关稳压器原装现货PDF数据手册引脚图功能框图
    TPS628501-Q1的说明TPS62850x-Q1是引脚对引脚1A、2A(持续)和3A(峰值)易用型高效同步直流/直流降压转换器系列。这些器件基于峰值电流模式控制拓扑,这些器件专为信息娱乐系统和高级驾驶辅助系统等汽车应用而设计。低阻开关可支持高达2A的持续输出电流和3A的峰值电流。TPS628......
  • 2024-03-01 Windows MySQL5.7.27绿色版安装
    背景MySQL是常用数据库,其中版本已经有很多了,安装方式也有很多,联网装、安装包等。不仅安装麻烦,卸载也很麻烦。因此笔者一般都是使用绿色版安装,安装过程自己很清晰,每一步都知道自己做了什么,卸载时也很容易,自己安装的时候做了什么,卸载的时候删除什么就行了。版本MySQL版本很多,而......
  • AGC015D题解
    简要题意给定一个区间\([l,r]\),从中选出若干整数按位或,求可能出现的数的方案数。数据范围:\(1\lel\ler\le2^{60}\)。思路首先对于\([l,r]\)里的数全都满足条件,然后因为是按位或,所以\(l,r\)二进制下的一段前缀就与答案无关可以先去掉。现在我们只需要考虑比\(r\)还要......
  • 前后端分离Vue3+SpringBoot房屋租赁系统(编号:49930163)
    目录功能和开发技术介绍具体实现截图开发核心技术介绍:技术创新点vue3和vue2的区别:核心代码部分展示非功能需求分析系统开发流程系统运行步骤软件测试源码获取功能和开发技术介绍本系统操作无需详细的操作文档,只需要用户简单的进行操作就可以掌握操作流程,购买古装操......
  • [强网杯2019]supersqli--Web安全进阶系列
    [强网杯2019]supersqli--Web安全进阶系列使用引号判断是否存在sql注入报错,可能存在sql注入,注入payload,判断列数,结果为不存在4列?inject=1'orderby4--q换2试试,正常显示,说明存在2列输出结果?inject=1'orderby2--q尝试使用联合注入失败,并且限制了select|update......
  • 合宙Air201模组LuatOS:点点鼠标就搞定的FOTA远程升级,你知道吗?
     你是不是也经常遇到小伙伴吐槽:开发是个苦差事!做项目倒还好,就怕遇到项目升级,那简直让人头大。。。如果你也有这种困惑,就多了解一下合宙的开发工具,简单实用又高效,甚至只需点点鼠标!本期,我们来学习合宙Air201的实用示例——FOTA远程升级 FOTA远程升级 编辑合宙Air201资产定位模组—......
  • 合宙Air201模组LuatOS:PWRKEY控制,一键解决解决关机难问题
    不知不觉间,我们已经发布拉期课程:helloworld初体验,点灯、远程控制、定位和扩展功能,你学的怎么样?很多伙伴表示已经有点上瘾啦!合宙Air201,如同我们一路升级打怪的得力法器,让开发愈发得心应手。本期,我们将进一步学习合宙Air201应用示例——PWRKEY控制关机功能 PWRKEY控制关机功能 编......
  • 合宙Air201模组LuatOS扩展功能:温湿度传感器篇!
     通过前面几期的学习,同学们的学习热情越来越高。合宙Air201模组除了支持3种定位方式外,还具有丰富的扩展功能,比如:通过外扩BTB链接方案,最多可支持21个IO接口:SPI、I2C、UART等多种接口全部支持。本期,我们将学习合宙Air201的扩展应用之一——I2C驱动AHT10温湿度传感器Air201驱动AHT10......
  • 视频美颜SDK与直播美颜工具的实现原理与优化方案
    本篇文章,小编将为大家详细讲解视频美颜SDK的实现原理,并提出优化方案。 一、视频美颜SDK的实现原理1.图像采集与处理2.人脸识别与关键点检测3.美颜滤镜与特效处理4.实时性与低延迟二、直播美颜工具的实现原理直播美颜工具与视频美颜SDK的原理相似,但其核心区别在于需要面对更多的实......