首页 > 其他分享 >Flink状态(二)

Flink状态(二)

时间:2024-06-20 16:34:05浏览次数:8  
标签:状态 存储 flink Flink RocksDBStateBackend checkpoint 内存

Flink提供了不同的状态存储方式,并说明了状态如何存和存储在哪里。
状态可以被存储在Jvm的堆和堆外。根据状态存储方式的不同,Flink也能代替应用管理状态,意思是Flink能够进行内存管理(有必要的时候,可能会溢出到硬盘),允许应用保存非常大的状态。默认情况下,在配置文件flink-conf.yaml中为所有Flink作业配置状态存储方式。

然而,默认的状态存储方式配置可以被单独的作业设置覆盖,就像下面那样。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);

使用Data Stream API写的程序经常需要以多种情况保存状态:

  • 在窗口被触发之前, 窗口需要保存或聚合元素
  • 转换算子也许会使用key/value状态接口保存数据
  • 转换算子也许实现CheckpointedFunction接口使本地变量容错。

当checkpointing被激活的时候,一旦发生checkpoint,状态会被保存,这样数据就不会丢失,并且在恢复的时候能够保持数据一致性。状态在内部是怎么表示的,以及当checkpoint的时候,状态怎么样被保存,以及保存到哪里依赖选择的状态存储方式。

Flink提供了三种开箱即用的状态存储方式:

  • MemoryStateBackend 内存存储
  • FsStateBackend 文件系统存储
  • RocksDBStateBackend RocksDB存储

如果没有特殊配置,系统默认使用内存存储方式。

MemoryStateBackend 内存存储

内存存储:在Java堆中保存状态对象。Key/Value状态和窗口算子都会以Hash表的方式保存状态值,触发器等。
当checkpoint的时候,状态存储将会快照状态,将当checkpoint向JobManager发送回执消息时,作为消息的一部分发给JobManager(master),JobManager会将状态存储到堆内存中。

可以配置内存存储使用异步快照。我们也强烈推荐使用异步快照,避免阻塞流处理通道。请注意默认是打开异步快照的。如果想要关闭这个特性,用户可以在实现化MemoryStateBackend对象的时候,给构造函数中相应的boolean参数传false(这应该仅用于调试目的)。

new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

内存存储有如下限制:

  • 每一个状态大小默认不超过5M。这个值可以在实例化MemoryStateBackend的时候增加
  • 不管配置的最大状态大小是多少,状态大小不能超过akka配置的桢(一次RPC传输的数据)大小(参数: akka.framesize,默认:10M)。
  • 聚合的状态必须适合JobMaanger内存

以下情况推荐使用内存存储

  • 本地开发或调用
  • 只保存少量状态的作业。例如仅仅包含一次一条记录算子(例如:Map,FlatMap,Fliter,....)的作业。对于这样的作业,Kafka Consumer 仅仅需要非常少的状态。

FsStateBackend 文件系统存储

通过配置文件系统的URL(类型,地址,路径)使用文件系统存储。例如"hdfs://namenode:40010/flink/checkpoints"或者"file:///data/flink/checkpoints"
FsStateBackend 将状态数据保存在TaskManager’s 内存中。当checkpoint的时候,将状态数据写到配置的文件系统或目录中。最小的元数据会存储到JobManager内存中(或者在HA模式下,存储到checkpoint元数据中).
FsStateBackend 默认使用异步快照,以避免阻塞流处理。如果想禁止该特性,在实现化FsStateBackend对象的时候,构造函数中应的参数传入false即可。

new FsStateBackend(path, false);

以下情况,推荐使用FsStateBackend

  • 具有大状态,长窗口,大的key/value状态的作业
  • 所有HA模式下

RocksDBStateBackend RocksDB存储

要想使用RocksDB存储,需要配置文件系统的URL(类型,地址,路径)。例如"hdfs://namenode:40010/flink/checkpoints"或者"file:///data/flink/checkpoints"
RocksDBStateBackend 将状态数据保存到RocksDB数据库.RocksDB文件默认会存储到TaskManager的数据目录中。当checkpoint的时候,整个RocksDB数据库将会保存到配置的文件系统或目录中。最小的元数据会存储到JobManager内存中(或者在HA模式下,存储到checkpoint元数据中).

RocksDBStateBackend 总是执行异步快照。

RocksDBStateBackend 具有如下限制:

  • 由于 RocksDB JNI通信使用的API基于byte[],每个key或每个value最大支持2^31字节。
    注意: 在以RocksDB作用存储情况下,使用merge操作的状态(例如:ListState)会默默地将值大小累加到大于2^31字节,当再次读取的时候会失败,这是目前RocksDB JNI的限制。

以下情况,推荐使用RocksDBStateBackend

  • 具有非常大的状态,长窗口,大的key/value状态的作业
  • 所有HA模式下

你可以保存的状态数据量仅仅受限于磁盘剩余空间大小。与将状态保存到内存中的``FsStateBackend `相比,可以保存更大的状态。然而这也意味着能达到的最大吞吐量更小。因为所有从rocksDB读或写入rocksDB都需要经过序列化与反序例化,比那些基于Java堆的存储后端开销更大。

RocksDBStateBackend 是目前唯一提供 增量的checkpoint的存储。

RocksDB的一些指标可以被获取,但是默认没打开,可以在这里找到全部文档说明。

配置状态存储

如果你什么也没配置,默认的状态存储在JobManager内存中。如果你希望为所有作业默认一个其它的存储,你可以在flink-conf.ymal中配置其它的存储。当然,每一个作业也能单独设置存储。

每个作业单独设置存储

下面示例显示StreamExecutionEnvironment 的作业如何设置存储。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果你想使用 RocksDBStateBackend ,你就必须在你的Flink项目中添加如下Maven依赖。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.8.0</version>
</dependency>

设置默认的状态存储

默认的状态存储能够在flink-conf.yaml文件中配置,参数是state.backend. 值可以选择jobmanager(MemoryStateBackend), filesystem(FsStateBackend),rocksdb(RocksDBStateBackend)三者中的一个,也可以配置实现了接口StateBackendFactory的全类名。例如: RocksDBStateBackend 的实现类 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.

state.checkpoints.dir参数定义了checkpoint数据和元数据文件存储的位置,你可以在这里发现更详细的checkpoint目录结构说明

配置示例:

# 状态存储
state.backend: filesystem

# checkpoints数据存储目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

翻译自: https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html

标签:状态,存储,flink,Flink,RocksDBStateBackend,checkpoint,内存
From: https://www.cnblogs.com/mycodingworld/p/18258934

相关文章

  • Flink 窗口计算
    Flink窗口计算1.背景2.Watermark3.Watermark与Window之间的关系4.Window窗口计算1.背景在当今大数据时代,实时数据处理的需求日益增长,Flink的窗口计算在这一领域中发挥着至关重要的作用。窗口计算使得我们能够将无界的数据流切分成有意义的片段,从而进行......
  • 数据采集与控制> 数字I/O卡 > PXI2371,PXI总线,离散量输入输出卡,每通道可通过跳线实现切换
    数据采集与控制 > 数字I/O卡 > PXI2371/2372/2373本章主要介绍237X的系统组成及基本特性,为用户整体了解237X的相关特性提供参考。产品简介237X板卡是本公司推出的一系列高密度离散量输入输出卡,每通道可通过跳线实现切换电源/开、电源/地、地/开三种输入状态选择,输入通......
  • 「前端+鸿蒙」鸿蒙应用开发-组件状态管理
    在鸿蒙应用开发中,组件状态管理是确保UI与数据同步更新的重要概念。状态(State)是组件内部数据的集合,它可以影响组件的渲染输出。以下是组件状态管理的入门、深入和实战介绍,以及示例代码。组件状态管理-入门入门阶段,你需要了解状态是什么以及如何使用状态来更新UI。定......
  • Vue3 状态管理 - Pinia,超详细讲解!
    前言:哈喽,大家好,我是前端菜鸟的自我修养!今天给大家分享【Vue3状态管理-Pinia】,超详细讲解!并提供具体代码帮助大家深入理解,彻底掌握!原创不易,如果能帮助到带大家,欢迎收藏+关注哦......
  • “detached HEAD” 状态
    当前处于一个“detachedHEAD”状态,这意味着你当前的HEAD(当前检出的提交)没有绑定到任何分支。一般情况下,这种情况出现在你检出一个特定的提交(而不是分支的最新提交)时。在这种状态下,你无法使用常规的gitpush命令,因为你不在任何分支上。因此,Git提示你可以使用特定的命令将当......
  • 设计模式-利用状态机实现订单状态流转控制
    状态机是状态模式的一种应用,相当于上下文角色的一个升级版。在工作流和游戏中有大量使用。如各种工作流引擎,几乎是状态机的子集和实现,封装状态的变化规则。Spring也给我们提供了一个很好的解决方案。在spring中的组件名称就叫StateMachine。状态机简化状态控制的开发过程,让状态机......
  • 设计模式-状态模式
    状态模式状态模式也成为状态机模式,是允许对象在内部状态发生改变时改变它的行为。对象看起来好像改变了它的类,属于行为型模式。角色:上下文角色(Context):定义客户端需要的接口,内部维护一个当前状态实例,并负责具体状态的切换。抽象状态角色(State):定义该状态下的行为,可以有一个或多......
  • 【扩散映射+线性卡尔曼滤波+Koopman算子】一种用于高维非线性随机动力系统状态估计的
     ......
  • 行为型模式-状态模式
    状态模式模式是什么   状态模式是一种行为型设计模式,它允许对象在内部状态发生改变时改变它的行为。在状态模式中,对象的行为是基于当前状态来决定的,对象会根据不同的状态来执行不同的操作。这样可以将复杂的状态逻辑封装在具体的状态类中,使得代码更加可维护、可扩展,并且符......
  • 【状态估计】非线性受控动力系统的线性预测器——Koopman模型预测MPC(Matlab代码实现)
     ......