首页 > 其他分享 >【Flink入门修炼】2-2 Flink State 状态

【Flink入门修炼】2-2 Flink State 状态

时间:2024-03-06 19:46:57浏览次数:26  
标签:状态 入门 Flink state Keyed Tuple2 State

  • 什么是状态?状态有什么作用?
  • 如果你来设计,对于一个流式服务,如何根据不断输入的数据计算呢?
  • 又如何做故障恢复呢?

一、为什么要管理状态

流计算不像批计算,数据是持续流入的,而不是一个确定的数据集。在进行计算的时候,不可能把之前已经输入的数据全都保存下来,然后再和新数据合并计算。效率低下不说,内存也扛不住。
另外,如果程序出现故障重启,没有之前计算过的状态保存,那么也就无法再继续计算了。

因此,就需要一个东西来记录各个算子之前已经计算过值的结果,当有新数据来的时候,直接在这个结果上计算更新。这个就是状态

常见的流处理状态功能如下:

  • 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
  • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
  • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
  • 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

二、state 简介

Flink的状态是由算子的子任务来创建和管理的。一个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。
image.png

状态的保存:
需要考虑的问题:

  • container 异常后,状态不丢
  • 状态可能越来越大

因此,状态不能直接放在内存中,以上两点问题都无法保证。
需要有一个外部持久化存储方式,常见的如放到 HDFS 中。(此部分读者感兴趣可自行搜索资料探索)

image.png

一)Managed State 和 Raw State

  • Managed State 是由 Flink 管理的。Flink帮忙存储、恢复和优化。
  • Raw State 是开发者自己管理的,需要自己序列化(较少用到)。

在 Flink 中推荐用户使用Managed State管理状态数据 ,主要原因是 Managed State 能够更好地支持状态数据的重平衡以及更加完善的内存管理。

Managed State Raw State
状态管理方式 Flink Runtime 管理,自动存储,自动恢复,内存管理方式上优化明显 用户自己管理,需要用户自己序列化
状态数据结构 已知的数据结构 value , list ,map flink不知道你存的是什么结构,都转换为二进制字节数据
使用场景 大多数场景适用 需要满足特殊业务,自定义operator时使用,flink满足不了你的需求时候,使用复杂

下文将重点介绍Managed State。

二)Keyed State 和 Operator State

Managed State 又有两种类型:Keyed State 和 Operator State。

keyed state operator state
适用场景 只能应用在 KeyedSteam 上 可以用于所有的算子
State 处理方式 每个 key 对应一个 state,一个 operator 处理多个 key ,会访问相应的多个 state 一个 operator 对应一个 state
并发改变 并发改变时,state随着key在实例间迁移 并发改变时需要你选择分配方式,内置:1.均匀分配 2.所有state合并后再分发给每个实例
访问方式 通过RuntimeContext访问,需要operator是一个richFunction 需要你实现CheckPointedFunction或ListCheckPointed接口
支持数据结构 ValuedState, ListState, Reducing State, Aggregating State, MapState, FoldingState(1.4弃用) 只支持 listState

Keyed State

简单来说,通过 keyBy 分组的就会用到 Keyed State。就是按照分组来的状态。(Keyed State 是Operator State的特例,区别在于 Keyed State 事先按照 key 对数据集进行了分区,每个 Key State 仅对应ー个Operator和 Key 的组合。)
image.png

Keyed State可以通过 Key Groups 进行管理,主要用于当算子并行度发生变化时,自动重新分布Keyed State数据 。分配代码如下:

// KeyGroupRangeAssignment.java
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
		return MathUtils.murmurHash(keyHash) % maxParallelism;
	}

Operator State

Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
例如 Kafka Connector 中,每一个并行的 Kafka Consumer 都在 Operator State 中维护当前 Consumer 订阅的 partiton 和 offset。
image.png

image.png

在开发中,需要保存的状态也有不同的数据结构,那么 Flink 也提供了相应的类。
如上图所示:

  • ValueState[T] 保存单一变量状态
  • MapState[K, V] 同 java map,保存 kv 型状态
  • ListState[T] 数组类型状态
  • ReducingState[T] 单一状态,将原状态和新状态合并后再更新
  • AggregatingState[IN, OUT] 同样是合并更新,只不过前后数据类型可以不一样

四、实践

实现一个简单的计数窗口。
输入数据是一个元组 Tuple2.of(1L, 3L),把元组的第一个元素当作 key(在示例中都 key 都是 “1”),第二个元素当 value。
该函数将出现的次数以及总和存储在 ValueState 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

四、小结

本节我们介绍了 Flink 状态,是用于流式计算中中间数据存储和故障恢复的。
Flink 状态分为 Raw State 和 Manage State,其中 Manage State 中又包含 Keyed State 和 Operator State。最重要的是 Keyed State 要重点理解和掌握。
在编程开发过程中,针对不同的数据结构,Flink 提供了对应的 State 类。并提供了一个 state demo 代码供学习。


参考文章:
七、Flink入门--状态管理_flink流式任务如何保证7*24小时运行-CSDN博客
Flink状态管理详解:Keyed State和Operator List State深度解析
爆肝 3 月,3w 字、15 章节详解 Flink 状态管理!(建议收藏)-腾讯云开发者社区-腾讯云(较详细)
Flink 笔记二 Flink的State--状态原理及原理剖析_flink key state是每个key对应一个state还是每个分区对应一个state-CSDN博客(源码剖析)
Flink 状态管理详解(State TTL、Operator state、Keyed state)-腾讯云开发者社区-腾讯云
Flink 源码阅读笔记(10)- State 管理

标签:状态,入门,Flink,state,Keyed,Tuple2,State
From: https://www.cnblogs.com/shuofxz/p/18057385

相关文章

  • Flink AggregatingState 实例
    FlinkAggregatingState实例AggregatingState介绍AggregatingState需要和AggregateFunction配合使用add()方法添加一个元素,触发AggregateFunction计算get()获取State的值需求:计算每个设备10秒内的平均温度importorg.apache.flink.api.common.eventtime.SerializableTimesta......
  • StatefulSet是怎样实现的
    StatefulSet是Kubernetes中用于管理有状态应用的标准实现。与Deployment不同,StatefulSet为每个Pod提供了一个唯一的、稳定的网络标识符,并且Pod的启动和停止顺序是受控的。这使得StatefulSet非常适合于需要持久化存储、稳定网络标识符或有序部署、扩展、删除和终止的应用场景。Sta......
  • Salesforce入门级认证!App Builder备考指南
    AppBuilder认证适用于具有在Lightning平台上开发自定义应用程序的经验的个人,备考者通常需要有6个月到1年在Lightning平台或类似技术平台上构建应用程序的经验。AppBuilder认证对备考者的要求AppBuilder认证验证了备考者在数据建模与管理、流程自动化、用户界面、应用开发......
  • flink 提交yarn 命令 flink run -m yarn-cluster
    flink提交yarn命令flinkrun-myarn-cluster文章目录Flink集群搭建和使用local本地测试flink集群搭建1、standallonecluster提交任务--将代码打包2.flinkonyarn只需要部署一个节点flink启动方式1、yarn-session2、直接提交任务到yarnFlink集群搭建和使用local本地......
  • 基于preparedStatement对数据的增删改查,以及全自动遍历
    1packagecom.atsyc.api.preparedstatement;23/*4*使用preparedStatement进行t_user表的增删改查动作5*/67importcom.mysql.cj.xdevapi.PreparableStatement;8importorg.junit.Test;910importjava.sql.*;11importjava.util.*;......
  • Gorm简单入门
    Gorm简单入门介绍简单的数据库连接和CRUD内容0.基本配置goget-ugorm.io/gormgoget-ugorm.io/driver/mysql1.连接数据库packagemainimport( "time" "gorm.io/driver/mysql" "gorm.io/gorm")typeUserstruct{ IDuint Namestring......
  • 网络安全入门(持续更新...)
    第零章网络安全概述网络安全是什么网络安全基本要素(CIA)机密性(Confidentiality):确保一些重要信息/敏感数据不会被未授权访问(不会被窃取);完整性(Integrity):确保数据在传输过程中不会被篡改;可用性(Availability):确保已授权人员可以正常获取数据;网络安全关心什么网络通信安全:......
  • 从0开始入门智能知识库和星火大模型,打造AI客服。
    介绍FastWikiFastWiki是一个高性能、基于最新技术栈的知识库系统,旨在为大规模信息检索和智能搜索提供解决方案。它采用微软SemanticKernel进行深度学习和自然语言处理,在后端使用MasaFramework,前端采用MasaBlazor框架,实现了一个高效、易用、可扩展的智能向量搜索平台。其目标是帮......
  • GDB调试入门笔记
    目录What?WhyHow安装GDB安装命令查看是否安装成功调试简单的程序预备一个程序调试使用breakinfolistnextprintstepWhat?GDB是什么?全称GNUsymbolicdebugger百度百科的解释:程序调试工具UNIX及UNIX-like下的调试工具。或许,各位比较喜欢那种图形界面方式的,像VC、BCB等IDE的调试......
  • 基于preparedStatement方式优化
    1packagecom.atsyc.api.preparedstatement;23/*4*使用预编译statement完成用户登录5*6*TODO:7*防止注入攻击,演示preparedstatement完成用户登录8*/910importjava.sql.*;11importjava.util.Scanner;1213publicclassPSUserLogi......