首页 > 其他分享 >【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

时间:2024-01-04 12:37:25浏览次数:28  
标签:java flink State concurrent state apache org



文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、Operator State
  • 1、CheckpointedFunction
  • 2、带状态的 Source Function
  • 3、operator state示例:实现程序异常时自动保存state,当超过重启次数时中断运行
  • 1)、实现
  • 2)、运行结果
  • 3)、hdfs上的checkpoint



本文介绍了Flink State中的operator state基本功能及示例,其中包含详细的验证步骤与验证结果。


本文除了maven依赖外,没有其他依赖。

本文需要hadoop环境,因为模拟checkpoint的时候使用了hdfs。

一、maven依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
    <dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-common</artifactId>
		<version>3.1.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>3.1.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-hdfs</artifactId>
		<version>3.1.4</version>
	</dependency>
</dependencies>

二、Operator State

用户可以通过实现 CheckpointedFunction 接口来使用 operator state。

1、CheckpointedFunction

CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

进行 checkpoint 时会调用 snapshotState()。 用户自定义函数初始化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此 initializeState() 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。

当前 operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:

  • Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发读为 1,包含两个元素 element1 和 element2,当并发读增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。
  • Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.

下面的例子中的 SinkFunction 在 CheckpointedFunction 中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

initializeState 方法接收一个 FunctionInitializationContext 参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个 ListState 用于在 checkpoint 时保存 non-keyed state 对象。

注意这些状态是如何初始化的,和 keyed state 类系,StateDescriptor 会包括状态名字、以及状态类型相关信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如 getUnionListState(descriptor) 会使用 union redistribution 算法, 而 getListState(descriptor) 则简单的使用 even-split redistribution 算法。

当初始化好状态对象后,我们通过 isRestored() 方法判断是否从之前的故障中恢复回来,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑。

正如代码所示,BufferingSink 中初始化时,恢复回来的 ListState 的所有元素会添加到一个局部变量中,供下次 snapshotState() 时使用。 然后清空 ListState,再把当前局部变量中的所有元素写入到 checkpoint 中。

另外,我们同样可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。

2、带状态的 Source Function

带状态的数据源比其他的算子需要注意更多东西。为了保证更新状态以及输出的原子性(用于支持 exactly-once 语义),用户需要在发送数据前获取数据源的全局锁。

public static class CounterSource extends RichParallelSourceFunction<Long>  implements CheckpointedFunction {

    /**  current offset for exactly once semantics */
    private Long offset = 0L;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;
    
    /** 存储 state 的变量. */
    private ListState<Long> state;
     
    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
            "state",
            LongSerializer.INSTANCE));
            
        // 从我们已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
        for (Long l : state.get()) {
            offset = l;
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        state.clear();
        state.add(offset);
    }
}

希望订阅 checkpoint 成功消息的算子,可以参考 org.apache.flink.api.common.state.CheckpointListener 接口。

3、operator state示例:实现程序异常时自动保存state,当超过重启次数时中断运行

实际生产中,一般不需要自己实现state,除非特殊情况。

本示例仅仅用于展示state的工作过程。

本示例实现功能是当程序出现异常时能自动重启并保存当前的state信息,当超过2次异常后程序中断运行。

该示例肯定是画蛇添足,Flink已经实现了该类,并且在介绍operator state的时候也给出了示例,本示例仅仅是以极其简单的介绍一下operator state的实现。

1)、实现

  • AlanOperatorState.java
import java.util.Iterator;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

/**
 * @author alanchan 
 * 使用OperatorState中的ListState模拟KafkaSource进行offset维护
 */
public class AlanOperatorState extends RichParallelSourceFunction<String> implements CheckpointedFunction {
	private boolean flag = true;
	private ListState<Long> offsetState = null;
	private Long offset = 0L;

	// 创建ListState
	@Override
	public void initializeState(FunctionInitializationContext context) throws Exception {
		ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>("offsetState", Long.class);
		offsetState = context.getOperatorStateStore().getListState(stateDescriptor);
	}

	// 使用state
	@Override
	public void run(SourceContext<String> ctx) throws Exception {
		while (flag) {
			Iterator<Long> iterator = offsetState.get().iterator();
			// 由于是模拟,该迭代器仅有一条数据
			if (iterator.hasNext()) {
				offset = iterator.next();
			}
			offset += 1;
			int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
			ctx.collect("subTaskId:" + subTaskId + ",当前的offset值为::" + offset);
			Thread.sleep(1000);

			// 模拟异常
			if (offset % 3 == 0) {
				throw new Exception("出现了异常.....");
			}

		}
	}

	// 持久化state, 该方法会定时执行将state状态从内存存入Checkpoint磁盘目录中
	@Override
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		offsetState.clear();// 清理内容数据并存入Checkpoint磁盘目录中
		offsetState.add(offset);
	}

	@Override
	public void cancel() {
		flag = false;
	}

}
  • TestOperatorStateDemo.java
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestOperatorStateDemo {
	public static void main(String[] args) throws Exception {

		System.setProperty("HADOOP_USER_NAME", "alanchan");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		env.setParallelism(1);
		env.enableCheckpointing(1000);
		// 设置checkpoint点在hdfs上
		env.setStateBackend(new FsStateBackend("hdfs://server1:8020//flinktest/flinkckp"));
		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

		// 重启策略:程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));

		// source
		DataStreamSource<String> ds = env.addSource(new AlanOperatorState()).setParallelism(1);

		// transformation

		// sink
		ds.print();

		// execute
		env.execute();
	}
}

2)、运行结果

subTaskId:0,当前的offset值为::1
subTaskId:0,当前的offset值为::2
subTaskId:0,当前的offset值为::3
subTaskId:0,当前的offset值为::4
subTaskId:0,当前的offset值为::5
subTaskId:0,当前的offset值为::6
subTaskId:0,当前的offset值为::7
subTaskId:0,当前的offset值为::8
subTaskId:0,当前的offset值为::9
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.util.concurrent.CompletableFuture.uniApply(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
	
。。。。。。

	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=3000)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
	
。。。。。。

	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: java.lang.Exception: 出现了异常.....
	at org.datastreamapi.state.AlanOperatorState.run(AlanOperatorState.java:46)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)

3)、hdfs上的checkpoint

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state_kafka

以上,本文介绍了Flink State中的operator state基本功能及示例,其中包含详细的验证步骤与验证结果。


标签:java,flink,State,concurrent,state,apache,org
From: https://blog.51cto.com/alanchan2win/9098714

相关文章

  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed Sta
    文章目录Flink系列文章一、maven依赖二、KeyedState1、KeyedState介绍及示例2、KeyedState状态有效期(TTL)1)、过期数据的清理2)、全量快照时进行清理3)、增量数据清理4)、在RocksDB压缩时清理3、keyedstate示例:实现地铁站哪个进站口人数最多1)、javabean2)、实现3)、验证本文......
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延
    文章目录Flink系列文章一、watermark介绍1、watermark介绍2、Watermark策略简介3、使用Watermark策略4、处理空闲数据源5、自定义WatermarkGenerator1)、自定义周期性Watermark生成器2)、自定义标记Watermark生成器6、Watermark策略与Kafka连接器7、算子处理Watermark......
  • Mapped Statements collection does not contain value for
    前倾概要:在测试Springabtch分区的过程中,我在本地使用mybatis-plus的时候出现了下面的问题:org.mybatis.spring.MyBatisSystemException:nestedexceptionisorg.apache.ibatis.exceptions.PersistenceException:###Errorqueryingdatabase.Cause:java.lang.IllegalArgumen......
  • Flink侧输出流解析
    在实时数据处理领域,ApacheFlink已成为一个不可或缺的工具。它以其高吞吐量和低延迟处理能力而闻名。而在Flink的众多特性中,侧输出流(SideOutputs)提供了一种灵活的方式来处理复杂的数据流。本文将探讨如何在Flink的ScalaAPI中有效使用侧输出流。1.侧输出流的基本概念侧......
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • 【Flink系列二十一】深入理解 JVM的类型加载约束,解决 Flink 类型加载冲突问题的通用方
    classByteArrayDeserializerisnotaninstanceoforg.apache.kafka.common.serialization.DeserializerDebuggingClassloading类似的XcannotbecasttoXexceptions如何理解这类异常?这类异常可以归纳为类型异常,按个人有限经验,现象分为两种常见情况:类型赋值检查:不能......
  • flink中的setStreamTimeCharacteristic 指定为EventTime的source需要自己定义event ti
    flink中的setStreamTimeCharacteristicTimeCharacteristic   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 此处可以取以下三类值:EventTime事件时间,事件(Event)本身的时间,即数据流中事件实际发生的时间,通常使用事件发生时的时间戳来描述,这些......
  • Flink mysql-cdc连接器参数
    一、背景通过Flink同步mysql到iceberg中,任务一直在运行中,但是在目标表看不到数据。经排查发现jobmanager一直在做切片,日志如下:2023-12-2816:58:36.251[snapshot-splitting]INFOcom.ververica.cdc.connectors.mysql.source.assigners.ChunkSplitter[]-ChunkSplitterhas......
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延
    文章目录Flink系列文章一、maven依赖二、示例-Flink1.13.6版本:kafka数据源,每10s统计一次地铁进站每个入口人数1、maven依赖2、实现1)、javabean2)、实现3、验证1)、验证步骤2)、验证三、示例-Flink1.17.0版本:kafka数据源,每10s统计一次地铁进站每个入口人数1、maven依赖2、实现1)、j......
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延
    文章目录Flink系列文章一、maven依赖二、示例:每5s统计一次地铁进站每个入口人数1、实现1)、bean2)、实现2、验证三、示例:处理延迟数据超过能接受的时间,每10s统计一次地铁进站每个入口人数1、实现1)、javabean2)、实现2、验证本文介绍了FlinkWaterMark的基本用法以及超过最大延迟允......