首页 > 其他分享 >Flink State 状态原理解析

Flink State 状态原理解析

时间:2023-12-06 14:14:59浏览次数:40  
标签:状态 Flink checkpoint State key 算子 解析

一、Flink State 概念

State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。

Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBackend 实现将相关数据存储到 FS 文件系统或者 RocksDB 数据库中。在Flink应用运行过程中,通过 checkpoint 快照定期地保存状态数据。并在 Flink 应用重启时加载checkpoint/savepoint 来实现状态的恢复,从而让 Flink 应用继续完成之前的数据计算,实现数据精确一次向下游传递。

分为以下3类:

  • 基于内存的 HeapStateBackend。状态存储在内存中。
  • 基于 HDFS 或 OSS 的 FsStateBackend。状态存储在内存,并在做 cp(checkpoint)时存到远端。
  • 基于 RocksDB 的 RocksDBStateBackend。将对象序列化成二进制存在内存和本地磁盘的 RocksDB 数据中,并在 cp 时存到远端。

HeapStateBackend 和 RocksDBStateBackend 分别对应在 TaskManager 内存模型中的位置:

RocksDBStateBackend 中存储结构:

namespace: 在不同的 namespace 下存在相同名称的状态。

1.1.1 State 状态持久化

通过 Chandy-Lamport 分布式快照算法进行 checkpoint 完成状态数据的持久化。然后在 Flink 应用重启时读取 State 状态数据,进行运行现场的还原。

chekcpoint 分类:

  • 基于内存的全量 checkpoint
  • HDFS 全量 checkpoint
  • RocksDB 全量 checkpoint/增量 checkpoint

1.2 State 基于算子和数据分组的分类

State 可分为 Operator State 和 Keyed State 两类。

  • Operator State(称为 non-keyed state)

常常存在于Source, Sink中。具体实现类例如:

  • BroadcastState

例:Kafka Source 中用 OperatorState 记录 offset。

  • Keyed State

任何类型的 keyed state 都可以有有效期(TTL),所有状态类型都支持单元素的 TTL。 这意味着 List 元素和 Map 映射元素将独立到期。

例:SQL GroupBy/PartitionBy 后的窗口中的数据,每个 key 都有对应的 State。key 与 key 之间的 State 数据不可见。

keyed state 的具体实现类:

  • ValueState
  • MapState
  • ListState
  • AggregatingState
  • ReducingState
  • 。。。。。

Flink State思维导图:

Keyed State Operator State
适用算子类型 只适用于KeyedStream上的算子 可用于所有算子
状态分配 每个Key对应一个状态 一个算子子任务对应一个状态
横向扩展 状态随着keyBy的分组KeyGroup自动在多个算子子任务上迁移 有多种状态重新分配的方式
创建和访问方式 自定义算子(重写RichFunction,通过State 名称从 getRuntimeContext方法创建或获得 State ) 实现 CheckpointedFunction 等接口
支持数据结构 ValueState、ListState、MapState等 ListState、BroadcastState等

二、常见状态相关处理流程

1. Kafka Source 如何存储 OperatorState?

class FlinkKafkaConsumerBase {
 private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; // state名称:"topic-partition-offset-states"
// 特殊的State类型:Union State 
}


unionOffsetStates这个变量就是 OperatorState类型的。

2. Map算子如何存储需要累计的数据?

  • ValueState/MapState/ListState/......

思考:keyby 后的数据分发与多并行度 subtask 之间的关系是怎样的?

首先,datastream 中数据经过 keyby 之后,会划分到各个 KeyedStream 中。每个 KeyedStream 有自己的 KeyedState(如ValueState/ListState/MapState)。

其次,KeyedStream 中的数据会以 KeyGroup 方式组织在一起。KeyGroup 是 Flink 重新分发 key state 的最小单元。

最后,KeyGroup 中的数据会通过取模最大并行度的方式分散到各个 subtask 中。以下是关键源码:

KeyGroupStreamPartitioner#selectChannel(record)
{
    K key;
    key = keySelector.getKey(record.getInstance().getValue());
    return KeyGroupRangeAssignment.assignKeyToParallelOperator(
            key, maxParallelism, numberOfChannels);
}
--KeyGroupRangeAssignment#assignKeyToParallelOperator()
    {
    return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }
    --KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup()
      公式:OperatorIndex = keyGroupId * parallelism / maxParallelism
    --KeyGroupRangeAssignment#assignToKeyGroup()
      {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
       }




2.2 修改并行度场景时 State 状态存储的变化

2.3 State 与 Checkpoint 关系

分布式快照 Checkpoint 的概念,定期将 State 持久化到 外部存储系统(HDFS/OSS) 上。用户可以通过实现 CheckpointedFunction 接口来使用 operator state。通过 barrier 来对齐 checkpoint,等待 State 持久化完成(此过程参数不同也可能是异步的)。

常见 State 与 CP 相关的问题

  • State 状态过大。现象为多个算子或单个算子多个 subtask 做 checkpoint 慢,可导致 CP 对齐时间长,严重时会导致 CP 超时。
  • 数据倾斜导致某个 subtask 处理不及时。现象为单个算子少数几个 subtask 做 checkpoint 慢,导致 CP 对齐时间长。严重时会导致 CP 超时。
  • 大作业(并行度搞)频繁做 CP,会频繁上传小文件,导致 HDFS 集群小文件过多。

常用解决措施:调大托管内存大小。

三、参考文档:

作者:京东物流 吴云涛

来源:京东云开发者社区 自猿其说Tech 转载请注明来源

标签:状态,Flink,checkpoint,State,key,算子,解析
From: https://www.cnblogs.com/Jcloud/p/17879251.html

相关文章

  • foxy与galactic解析rosbag的不同之处
    前言foxy和galactic版本在rosbag2_storage这个包的调整有点大(头文件及接口的命名空间),下面的代码仅供参考使用foxy#include"db3_reader.h"#include<pcl/common/transforms.h>#include<pcl/point_types.h>#include<pcl_conversions/pcl_conversions.h>#include<rosba......
  • Unity Transform接口的几个常用方法解析_unity基础开发教程
    UnityTransform接口常用方法解析1.Transform.position2.Transform.right、Transform.forward、Transform.up3.Transform.Rotate4.Transform.Translate在Unity中,Transform类是游戏对象位置、旋转和缩放的表示。在日常开发中我们回经常用到Transform接口的几个常用方法,这些方......
  • c++ json的解析和QT中json的操作
    1.下载jsoncpp源码2.首先建议jsoncpp源码编译成动态库https://www.bilibili.com/video/BV1pb4y1W7ZZhttps://www.bilibili.com/video/BV1Ra4y1e7gL (1)用QT的Cmake工具 (2)用Visualstudio a.工具打开jsoncpp源码,在CMakeLists.txt右键->jsoncpp的CMak......
  • Linux 用户管理:解析用户与组概念,掌握用户/组管理技巧
    在Linux操作系统中,用户管理是系统管理员日常工作中不可或缺的一部分。有效的用户管理有助于确保系统的安全性、可靠性和可维护性。本文将深入探讨Linux中用户与组的概念,以及如何有效地进行用户和组的管理。用户与组的概念在Linux系统中,每个用户都有一个唯一的用户标识符(UserID,......
  • 实例讲解Python 解析JSON实现主机管理
    本文分享自华为云社区《Python解析JSON实现主机管理》,作者:LyShark。JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式,它以易于阅读和编写的文本形式表示数据。JSON是一种独立于编程语言的数据格式,因此在不同的编程语言中都有对应的解析器和生成器。JSON格式的设计......
  • 深度解析C#中LinkedList<T>的存储结构
    本文承接前面的3篇有关C#的数据结构分析的文章,对于C#有关数据结构分析还有一篇就要暂时结束了,这个系列主要从Array、List、Dictionary、LinkedList、 SortedSet等5中不同类型进行介绍和分析。废话不多说,接下来我们来最后看一下这个系列的最后一种数据类型"链表"。提到链......
  • 钉钉员工组织资料实时同步至飞书的应用解析
    如何实现应用之间的同步?随着企业应用的日益增多,在帮助企业提供办公效率的同时,也增加了对这些应用的运维成本。有没有一种好的办法,实现saas应用之间的桥梁搭建,自动化地完成不同应用之间的数据流转呢?答案是有的,这里推荐一款应用连接器,是Restcloud推出的AppLink平台,这个产品可以通过......
  • java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 1
    java.lang.IllegalStateException:ExpectedBEGIN_OBJECTbutwasSTRINGatline1column2path$packagecom.example.core.mydemo.scooterOrderSms;importcom.alibaba.fastjson.JSON;importcom.example.core.mydemo.json2.GsonUtils;importcom.google.gson.Gso......
  • CF1198B Welfare State 题解
    题意:有一个长度为$n$的序列$a$,给定$q$次操作,每次操作为以下两种之一:$1$.$1$$p$$x$:$a_p=x$$2$.$2$$x$:$a_i$$=$$max$$($$a_i$,$x$$)$$(1\lei\len)$求经过$q$次操作后的序列$a$。思路:$a_i$的最终值只受......
  • Python 解析JSON实现主机管理
    JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式,它以易于阅读和编写的文本形式表示数据。JSON是一种独立于编程语言的数据格式,因此在不同的编程语言中都有对应的解析器和生成器。JSON格式的设计目标是易于理解、支持复杂数据结构和具有良好的可扩展性。JSON数据是......