首页 > 其他分享 >5分钟了解Flink状态管理

5分钟了解Flink状态管理

时间:2024-02-11 11:01:31浏览次数:36  
标签:状态 存储 Flink private 分钟 算子 退款

什么叫做Flink的有状态计算呢?说白了就是将之前的中间结果暂时存储起来,等待后续的事件数据过来后,可以使用之前的中间结果继续计算。本文主要介绍Flink状态计算和管理、代码示例。

1、有状态的计算

什么是Flink的有状态的计算。在流式计算过程中将算子的中间结果保存在内存或者文件系统中,等下一个事件进入算子后可以从之前的状态中获取中间结果,以便计算当前的结果,从而无需每次都基于全部的原始数据来统计结果,极大地提升了系统性能。

每一个具有一定复杂度的流计算应用都是有状态的,任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接受的事件或者中间结果。

2、状态管理

Flink如何管理状态?主要就是:本地访问和存储、容错性(可以自动按一定的时间间隔产生快照,并且在任务失败后进行恢复)。

状态(State)操作是指需要把当前数据和历史计算结果进行累加计算,即当前数据的处理需要使用之前的数据或中间结果。

例如,对数据流中的实时单词进行计数,每当接收到新的单词时,需要将当前单词数量累加到之前的结果中。这里单词的数量就是状态,对单词数量的更新就是状态的更新。如下图:

状态的计算模型,如下图:

如下图,Source、map()、keyBy()/window()/apply()算子的并行度为2,Sink算子的并行度为1。keyBy()/window()/apply()算子是有状态的,并且map()与keyBy()/window()/apply()算子之间通过网络进行数据分发。

Flink应用程序的状态访问都在本地进行,这样有助于提高吞吐量和降低延迟。通常情况下,Flink应用程序都是将状态存储在JVM堆内存中,但如果状态数据太大,也可以选择将其以结构化数据格式存储在高速磁盘中。

通过状态快照,Flink能够提供可容错的、精确一次的计算语义。Flink应用程序在执行时会获取并存储分布式Pipeline(流处理管道)中整体的状态,它会将数据源中消费数据的偏移量记录下来,并将整个作业图中算子获取到该数据(记录的偏移量对应的数据)时的状态记录并存储下来。

当发生故障时,Flink作业会恢复上次存储的状态,重置数据源从状态中记录的上次消费的偏移量,开始重新进行消费处理。而且状态快照在执行时会异步获取状态并存储,并不会阻塞正在进行的数据处理逻辑。这个机制跟Kafka等消息中间件的消费方式很像。

Flink需要知道状态,以便可以使用Checkpoint和Savepoint来保证容错(下一篇会继续介绍)。状态还允许重新调整Flink应用程序,这意味着Flink负责在并行实例之间重新分配状态。

3、Keyed State

Keyed State是Flink提供的按照Key进行分区的状态机制。

在通过keyBy()分组的KeyedStream上使用,对每个Key的数据进行状态存储和管理,状态是跟每个Key绑定的,即每个Key对应一个状态对象。

Keyed State支持的状态数据类型如下:ValueState<T>、ListState<T>、ReducingState<T>、AggregatingState<IN, OUT>、MapState<UK, UV>。下文以ValueState为例,介绍如何使用。

4、状态管理示例和代码

我们来模拟这样一个场景:如果某个用户1分钟内连续两次退款,第二次则发出告警。

模拟订单对象:

public class OrderBO {
    /**
     * ID
     */
    private Integer id;
    /***
     * 订单标题
     */
    private String title;
    /**
     * 订单金额
     */
    private Integer amount;
    /**
     * 订单状态:1-已支付,2-已退款
     */
    private Integer state;
    /**
     * 用户ID
     */
    private String userId;
}

利用状态管理,处理告警逻辑:

/**
* 告警处理逻辑
**/
private static class AlarmLogic extends KeyedProcessFunction<String, OrderBO, OrderBO> {
    // 是否已经出现退款的标记
    private ValueState<Boolean> flagState;
    // 定时器,时间到了会清掉状态
    private ValueState<Long> timerState;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(OrderBO value, KeyedProcessFunction<String, OrderBO, OrderBO>.Context context, Collector<OrderBO> collector) throws Exception {
        Boolean refundFlag = flagState.value();

        // 如果已经退款过一次了,如果再出现退款则发射给下个算子,然后清理掉定时器。状态2代表退款
        if (refundFlag != null && refundFlag) {
            if (value.getState() == 2) {
                collector.collect(value);
            }
            cleanUp(context);
        } else {
            // 如果第一次出现退款,则写入状态,同时开启定时器。状态2代表退款
            if (value.getState() == 2) {
                flagState.update(true);
                long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
                context.timerService().registerProcessingTimeTimer(timer);
                timerState.update(timer);
            }
        }
    }

    /**
     * 定时器到了之后,清理状态值
     */
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, OrderBO, OrderBO>.OnTimerContext ctx, Collector<OrderBO> out) throws Exception {
        timerState.clear();
        flagState.clear();
    }

    /**
     * 手动清理状态值
     *
     * @param ctx
     * @throws Exception
     */
    private void cleanUp(Context ctx) throws Exception {
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        timerState.clear();
        flagState.clear();
    }
}

模式生成数据和主流程代码:

public static void main(String[] args) throws Exception {
    // 1、执行环境创建
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    // 2、读取Socket数据端口。实际根据具体业务对接数据来源
    DataStreamSource<String> orderStream = environment.socketTextStream("localhost", 9527);
    // 3、数据读取个切割方式
    SingleOutputStreamOperator<OrderBO> resultDataStream = orderStream
            .flatMap(new CleanDataAnd2Order()) // 清洗和处理数据
            .keyBy(x -> x.getUserId()) // 分区
            .process(new AlarmLogic()); // 处理告警逻辑

    // 4、打印分析结果
    resultDataStream.print("告警===>");
    // 5、环境启动
    environment.execute("OrderAlarmApp");
}

模拟数据:

模拟场景:某个用户1分钟内连续两次退款,第二次发出告警。
示例数据:
1,aaa,100,1,user1
2,bbb,200,1,user2
3,ccc,300,2,user1
4,ddd,400,2,user1

5,ddd,400,2,user1
6,bbb,200,2,user2
7,bbb,400,2,user2

完整代码地址:https://github.com/yclxiao/flink-blog/blob/7eb84d18aa71d8f2023d6158796de34d331b9b3f/src/main/java/top/mangod/flinkblog/demo005/OrderAlarmApp.java#L43

总结:本文主要介绍了Flink的状态和状态管理,以及Demo和相关代码,希望对你有帮助!

本篇完结!感谢你的阅读,欢迎点赞 关注 收藏 私信!!!

原文链接:http://www.mangod.top/articles/2023/08/13/1691889230463.htmlhttps://mp.weixin.qq.com/s/7I6ZYuk9V19QltYSRAXpBQ

标签:状态,存储,Flink,private,分钟,算子,退款
From: https://www.cnblogs.com/mangod/p/18013234

相关文章

  • 一次打通FlinkCDC同步Mysql数据
    业务痛点离开了业务谈技术都是耍流氓。我们来聊聊基于业务的痛点,衍生出来的多种数据同步方案。业务中常见的需要数据同步的场景1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历......
  • 10分钟入门Flink--架构和原理
    相信你读完上一节的《10分钟入门Flink--了解Flink》对Flink已经有初步了解了。这是继第一节之后的Flink入门系列的第二篇,本篇主要内容是是:了解Flink运行模式、Flink调度原理、Flink分区、Flink安装。1、运行模式Flink有多种运行模式,可以运行在一台机器上,称为本地(单机)模式;也可以......
  • 10分钟入门Flink--了解Flink
    Flink入门系列文章主要是为了给想学习Flink的你建立一个大体上的框架,助力快速上手Flink。学习Flink最有效的方式是先入门了解框架和概念,然后边写代码边实践,然后再把官网看一遍。Flink入门分为四篇,第一篇是《了解Flink》,第二篇《架构和原理》,第三篇是《DataStream》,第四篇是《Tabl......
  • 10分钟入门Flink--安装
    本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、FlinkStandalone搭建、FlinkStandalongHA搭建。演示使用的Flink版本是1.15.4,官方文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/......
  • 在K8S中,Pod可能位于的状态有什么?
    在Kubernetes(K8s)中,Pod可能处于以下几种状态:Pending:Pod已经被集群接受,但至少有一个容器镜像尚未创建。这个阶段包括调度Pod到节点的时间、下载容器镜像时间以及等待其他初始化条件满足的过程。ContainerCreating:这是一个过渡状态,表示kubelet正在为Pod创建容器,这包括从镜......
  • 了解策略模式和状态模式,并理解二者差异
    策略模式和状态模式的代码结构非常相似,其UML类图更是一致,容易让人困惑。究其原因,是没有理解两种模式的设计目的,以至于明明设计了状态模式的代码结构,仍以策略模式的形式使用这些代码。策略模式策略模式比较简单,分析应用类,将类中用于完成特定任务的不同操作抽离成一组独立的类,称之......
  • 【Flink入门修炼】1-3 Flink WordCount 入门实现
    本篇文章将带大家运行Flink最简单的程序WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对Flink的各种概念和架构进行介绍。下面将从创建项目开始,介绍如何创建出一个Flink项目;然后从DataStream流处理和FlinkSQL执行两种方式来带大家学习Word......
  • Flink CDC实时同步PG数据库到Kafka
    一、安装规划操作系统服务器IP主机名硬件配置CentOS7.6192.168.80.131hadoop01内存:2GB,CPU:2核,硬盘:100GBCentOS7.6192.168.80.132hadoop02内存:2GB,CPU:2核,硬盘:100GBCentOS7.6192.168.80.133hadoop03内存:2GB,CPU:2核,硬盘:100GB......
  • powercfg是一个Windows操作系统中的命令行工具,用于管理和配置电源设置。通过使用power
    powercfg是一个Windows操作系统中的命令行工具,用于管理和配置电源设置。通过使用powercfg命令,用户和系统管理员可以查询、更改、导出、导入电源计划设置,检查电池状态,以及分析系统能耗情况等。这个工具非常有用,尤其是在需要优化电池使用时间、调整电源计划以提高性能或节能时。为......
  • 【Flink】使用CoProcessFunction完成实时对账、基于时间的双流join
    【Flink】使用CoProcessFunction完成实时对账、基于时间的双流join文章目录零处理函数回顾一CoProcessFunction的使用1CoProcessFunction使用2实时对账(1)使用离线数据源(批处理)(2)使用高自定义数据源(流处理)二基于时间的双流Join1基于间隔的Join(1)正向join(2)反向join2......