首页 > 编程语言 >Flink:状态编程

Flink:状态编程

时间:2023-01-01 21:46:18浏览次数:47  
标签:状态 Exception 编程 Flink throws public new event

Flink 中的状态

在流处理中,数据是连续不断到来的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果,也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

有状态算子

算子任务可以分为无状态和有状态。

无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果,如 map、filter、flatMap。

有状态的算子任务,除当前数据之外,还需要一些其他数据(状态)来得到计算结果,如聚合算子和窗口算子。

有状态算子处理过程

  1. 算子任务接到上游发来的数据。
  2. 获取当前状态。
  3. 根据业务逻辑进行计算,更新状态。
  4. 得到计算结果,输出发送到下游任务。

状态管理

Flink 将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。可以认为是子任务实例上的一个本地变量,能够被任务的业务逻辑访问和修改。

  • 状态的访问权限。同一个分区(也就是slot)上执行的任务实例可能会包含多个 Key 的数据,它们同时访问和更改本地变量,就会导致计算结果错误。
  • 容错性。状态只保存在内存中是不够的,需要将它持久化,这样发生故障后可以恢复状态。
  • 分布式应用的横向扩展性。数据量增大时,应该对计算资源扩容,调大并行度。

状态的分类

  • 托管状态:由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现。
    • 算子状态:状态作用范围限定为当前的算子任务实例。
    • 按键分区状态:状态是根据输入流中定义的键来维护和访问的。
  • 原始状态:自定义的,Flink 不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当做最原始的字节数组来存储。

按键分区状态

支持的结构类型

  • 值状态:状态中就保存一个值。
public interface ValueState<T> extends State {
    // 获取当前状态
    T value() throws IOException;
    // 对状态进行更新
    void update(T var1) throws IOException;
}

为了让运行时上下文清楚到底是哪个状态,需要一个状态描述器 ValueStateDescriptor 来提供状态的基本信息。

  • 列表状态:将需要保存的数据,以列表的形式组织起来。
public interface ListState<T> extends MergingState<T, Iterable<T>> {
    // 传入一个列表values,直接对状态进行覆盖
    void update(List<T> var1) throws Exception;
    // 向列表中添加多个元素,以列表values形式传入。
    void addAll(List<T> var1) throws Exception;
}

列表状态的状态描述器是 ListStateDescriptor

  • 映射状态:把一些键值对作为状态整体保存起来。
public interface MapState<UK, UV> extends State {
    // 传入一个Key,查询对应的Value值
    UV get(UK var1) throws Exception;
    // 传入一个键值对,更新Key对应的Value值
    void put(UK var1, UV var2) throws Exception;
    // 将传入的映射map中所有键值对全部添加到映射状态中
    void putAll(Map<UK, UV> var1) throws Exception;
    // 将指定Key的键值对删除
    void remove(UK var1) throws Exception;
    // 判断是否存在指定Key
    boolean contains(UK var1) throws Exception;
    // 获取映射状态中所有的键值对
    Iterable<Entry<UK, UV>> entries() throws Exception;
    // 获取映射状态中所有的键
    Iterable<UK> keys() throws Exception;
    // 获取映射状态中所有的值
    Iterable<UV> values() throws Exception;
    // 获取映射状态的迭代器
    Iterator<Entry<UK, UV>> iterator() throws Exception;
    // 判断映射状态是否为空
    boolean isEmpty() throws Exception;
}
  • 归约状态:将归约聚合之后的值作为状态保存下来。
// 这个接口保存的是一个聚合值,调用add方法时是将新数据和之前的状态进行归约,并且用结果来更新状态。
public interface ReducingState<T> extends MergingState<T, T> {
}
// 第二个参数就是定义了归约聚合逻辑的ReduceFunction,另外两个参数则是状态的名称和类型。
public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {}
  • 聚合状态:是一个值,用来保存添加进来的所有数据的聚合结果。
// 聚合逻辑是由在描述器中传入一个更加一般化的聚合函数
public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, Class<ACC> stateType) {}

代码示例

SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event event, long l) {
                return event.timestamp;
            }
        })
);

stream.keyBy(event -> event.user).flatMap(new MyFlatMap()).print();
// 用于Keyed State测试
public static class MyFlatMap extends RichFlatMapFunction<Event, String>{
    // 定义状态
    ValueState<Event> myValueState;
    ListState<Event> myListState;
    MapState<String, Long> myMapState;
    ReducingState<Event> myReducingState;
    AggregatingState<Event, String> myAggregatingState;

    @Override
    public void open(Configuration parameters) throws Exception {
        myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Event>("valueState", Event.class));
        myListState = getRuntimeContext().getListState(new ListStateDescriptor<Event>("listState", Event.class));
        myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("mapState", String.class, Long.class));
        myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Event>("reducingState",
                new ReduceFunction<Event>() {
                    @Override
                    public Event reduce(Event event, Event t1) throws Exception {
                        return new Event(event.user, event.url, t1.timestamp);
                    }
                },
                Event.class));
        myAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Long, String>("aggregatingState",
                new AggregateFunction<Event, Long, String>() {
                    @Override
                    public Long createAccumulator() {
                        return 0L;
                    }

                    @Override
                    public Long add(Event event, Long aLong) {
                        return aLong + 1;
                    }

                    @Override
                    public String getResult(Long aLong) {
                        return "count:" + aLong;
                    }

                    @Override
                    public Long merge(Long aLong, Long acc1) {
                        return aLong + acc1;
                    }
                },
                Long.class));
    }

    @Override
    public void flatMap(Event event, Collector<String> collector) throws Exception {
        // 访问和更新状态
        myValueState.update(event);
        myListState.add(event);
        myMapState.put(event.user, myMapState.get(event.user) == null ? 1: myMapState.get(event.user) + 1);
        System.out.println(event.user + myMapState.get(event.user));
        myReducingState.add(event);
        System.out.println(event.user + myReducingState.get());
        myAggregatingState.add(event);
        System.out.println(myAggregatingState.get());
    }
}

状态生存时间

在实际应用中,很多状态会随着时间的推移而增长,如果不加以限制,最终会导致存储空间的耗尽。

可以在代码中调用 clear() 方法来清除状态,但是有时候逻辑要求不能这么做,这时就需要配置一个状态生存时间,当状态存在时间超过这个值就将它清除。

状态创建的时候,设置失效时间 = 当前时间 + TTL,之后如果有对状态的访问或修改,可以再对失效时间进行更新。配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的 enableTimeToLive() 方法启动 TTL 功能。

ValueStateDescriptor<Event> valueStateDescriptor = new ValueStateDescriptor<>("valueState", Event.class);
// 配置状态TTL
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
        .build();
valueStateDescriptor.enableTimeToLive(ttlConfig);
myValueState = getRuntimeContext().getState(valueStateDescriptor);

算子状态

特点

算子状态就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 Key 无关,所以不同 Key 的数据只要被分发到同一个并行子任务,就会访问到同一个算子状态。

当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

类型

  • 列表状态:将状态表示为一组数据的列表。与 Keyed State 中的列表状态的区别是,在算子状态的上下文中,不会按键分别处理状态,所以每一个并行子任务上只会保留一个“列表”。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
  • 联合列表状态:将状态表示为一组数据的列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。
  • 广播状态:有时我们希望算子并行子任务都保特同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样。

代码示例

因为不存在 Key,所有数据发往哪个分区是不能预测的。所以当发生故障重启之后,我们不能保证某个数据跟之前一样,进入同一个并行子任务、访问同一个状态。所以需要自行设计状态的快照保存和恢复逻辑。

  • CheckpointedFunction 接口
public interface CheckpointedFunction {
    // 保存状态快照到检查点,调用这个方法
    void snapshotState(FunctionSnapshotContext var1) throws Exception;
    // 初始化状态时调用这个方法,也会在恢复状态时调用
    void initializeState(FunctionInitializationContext var1) throws Exception;
}
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event event, long l) {
                return event.timestamp;
            }
        })
);

// 批量缓存输出
stream.addSink(new BufferSink(10));
public static class BufferSink implements SinkFunction<Event>, CheckpointedFunction {
    // 定义当前类的属性,批量
    private final int threshold;

    public BufferSink(int threshold) {
        this.threshold = threshold;
    }

    private List<Event> bufferedElements;

    // 定义一个算子状态
    private ListState<Event> checkPointedState;

    @Override
    public void invoke(Event value, Context context) throws Exception {
        // 缓存到列表
        bufferedElements.add(value);
        // 判断是否到达阈值,如果达到,就批量写入
        if (bufferedElements.size() == threshold) {
            // 用打印到控制台模拟写入外部系统
            for (Event event: bufferedElements) {
                System.out.println(event);
            }
            System.out.println("========输出完毕========");
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        // 清空状态
        checkPointedState.clear();

        // 对状态进行持久化,复制缓存的列表到列表状态
        for (Event event: bufferedElements) {
            checkPointedState.add(event);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        // 定义算子状态
        ListStateDescriptor<Event> stateDescriptor = new ListStateDescriptor<>("buffered", Event.class);

        checkPointedState = functionInitializationContext.getOperatorStateStore().getListState(stateDescriptor);

        // 如果从故障恢复,需要将ListState中的所有元素复制到列表中
        if (functionInitializationContext.isRestored()) {
            for (Event event: checkPointedState.get()) {
                bufferedElements.add(event);
            }
        }
    }
}

广播状态

特点

将状态广播出去,所有并行子任务的状态都是相同的,并行度调整时只要直接复制就可以了。

由于配置或者规则数据是全局有效的,我们需要把它广播给所有的并行子任务。而子任务需要把它作为一个算子状态保存起来,以保证故障恢复后处理结果是一致的。这时的状态,就 是一个典型的广播状态。

在代码上,可以直接调用 DataStream 的 broadcast() 方法,传入一个“映射状态描述器”说明状态的名称和类型,就可以得到一个“广播流”,进而将要处理的数据流与这条广播流进行连接,就会得到“广播连接流”。注意广播状态只能用在广播连接流中。

代码示例

电商应用中,往往需要判断用户先后发生的行为的“组合模式”,比如“登录-下单”,检测出这些连续的行为进行统计,就可以了解平台的运用状况以及用户的行为习惯。

// 用户的行为数据流
DataStreamSource<Action> actionStream = env.fromElements(
        new Action("Alice", "login"),
        new Action("Alice", "pay"),
        new Action("Bob", "login"),
        new Action("Bob", "order")
);

// 行为模式流,基于它构建广播流
DataStreamSource<Pattern> patternStream = env.fromElements(
        new Pattern("login", "pay"),
        new Pattern("login", "order")
);

// 定义广播状态描述器
MapStateDescriptor<Void, Pattern> descriptor = new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class));
BroadcastStream<Pattern> broadcastStream = patternStream.broadcast(descriptor);

// 连接两条流进行处理
SingleOutputStreamOperator<Tuple2<String, Pattern>> matches = actionStream.keyBy(data -> data.userId)
        .connect(broadcastStream)
        .process(new PatternDetector());

matches.print();
// 实现自定义的KeyedBroadcastProcessFunction
public static class PatternDetector extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>> {
    // 定义一个keyedState,保存上一次用户行为
    ValueState<String> preActionState;

    @Override
    public void open(Configuration parameters) throws Exception {
        preActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-Action", String.class));
    }

    @Override
    public void processElement(Action action, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.ReadOnlyContext readOnlyContext, Collector<Tuple2<String, Pattern>> collector) throws Exception {
        // 从广播状态中获取匹配模式
        ReadOnlyBroadcastState<Void, Pattern> patternState = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
        Pattern pattern = patternState.get(null);

        // 拿到上一次行为
        String preAction = preActionState.value();

        // 判断是否匹配
        if (preAction != null && pattern != null) {
            if (pattern.action1.equals(preAction) && pattern.action2.equals(action.action)) {
                collector.collect(new Tuple2<>(readOnlyContext.getCurrentKey(), pattern));
            }
        }

        // 更新状态
        preActionState.update(action.action);
    }

    @Override
    public void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.Context context, Collector<Tuple2<String, Pattern>> collector) throws Exception {
        // 从上下文中获取广播状态,并用当前数据更新状态
        BroadcastState<Void, Pattern> patternState = context.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
        patternState.put(null, pattern);
    }
}
// 定义用户行为事件和模式的POJO类
public static class Action {
    public String userId;
    public String action;

    public Action() {
    }

    public Action(String userId, String action) {
        this.userId = userId;
        this.action = action;
    }

    @Override
    public String toString() {
        return "Action{" +
                "userId='" + userId + '\'' +
                ", action='" + action + '\'' +
                '}';
    }
}

public static class Pattern {
    public String action1;
    public String action2;

    public Pattern() {
    }

    public Pattern(String action1, String action2) {
        this.action1 = action1;
        this.action2 = action2;
    }

    @Override
    public String toString() {
        return "Pattern{" +
                "action1='" + action1 + '\'' +
                ", action2='" + action2 + '\'' +
                '}';
    }
}

状态持久化和状态后端

Flink 对状态进行持久化的方式,就是将当前所有分布式状态迸行“快照”保存,写入一个检查点或者保存点,保存到外部存储系统中。具体的存储介质,一般是分布式文件系统。

检查点

有状态流应用中的检查点,其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态。

默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的 enableCheckpointin() 方法就可以开启检查点。

状态后端

检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令,TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中,完成之后向 JobManager 返回确认信息。这个过程是分布式的, JobManger 收到所有 TaskManager 的返回信息后,就会确认当前检查点成功保存。

状态后端

在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端。

状态后端主要负责两件事:一是本地的状态管理,二是将检查点写入远程的持久化存储。

  • 哈希表状态后端:哈希表状态后端在内部会直接把状态当作对象,保存在 TaskManager 的 JVM 堆上。将本地状态全部放入内存中,这样可以获得最快的读写速度,使计算性能达到最佳。但是代价是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。
  • 内嵌 RocksDB 状态后端:RocksDB 是一种内嵌的键值存储介质,可以把数据持久化到本地硬盘。数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。

标签:状态,Exception,编程,Flink,throws,public,new,event
From: https://www.cnblogs.com/fireonfire/p/17018655.html

相关文章

  • php面向对象(OOP)编程
    大多数类都有一种称为构造函数的特殊方法。当创建一个对象时,它将自动调用构造函数,也就是使用new这个关键字来实例化对象的时候自动调用构造方法。构造函数的声明与其它操......
  • Pygame游戏编程
    1、篮球自动弹跳importsysimportpygamepygame.init()size=width,height=640,480screen=pygame.display.set_mode(size)color=(0,0,0)ball=pygame.image.load("ball3.jpg......
  • fix协议介绍17-查询订单状态报告(QuoteStateReport)
    FIX.5.0SP2MessageQuoteStatusReport [type'AI']<QuotStatRpt>Thequotestatusreportmessageisused:•astheresponsetoaQuoteStatusRequestmessage......
  • 学习ASP.NET Core Blazor编程系列十八——文件上传(中)
    学习ASP.NETCoreBlazor编程系列文章之目录学习ASP.NETCoreBlazor编程系列一——综述学习ASP.NETCoreBlazor编程系列二——第一个Blazor应用程序(上)学习A......
  • E - Don't Isolate Elements(DP,状态设计)
    E-Don'tIsolateElements题意​ 给出一个01矩阵,长为\(n\),宽为\(m\)。现在你可以进行一个操作:任选一行,将其该行上的0变1,1变0。请问最少需要多少次操作,可以使得整......
  • 极客编程python入门-序列化
    序列化我们把变量从内存中变成可存储或传输的过程称之为序列化,在Python中叫pickling,在其他语言中也被称之为serialization,marshalling,flattening等等,都是一个意思。序列化......
  • python编程 ——从入门到实践——第四章,操作列表
    1、遍历列表——for循环的基本形式magicians=['alice','david','carolina']forainmagicians:#for循环会读取列表中的第一个字符串,然后和a对应,再打印,然后再读取第......
  • 网络程序设计 实验3 多人聊天室 流式套接字 多线程编程
    实验3多人聊天室实验目的:通过流式套接字编程,及多线程编程,实现简单的多人聊天室。开发语言与工具:VC实验要求:1.使用MFC编程。2.利用流式套接字编程及多线程编程。3......
  • Flink Shuffle 3.0: Vision, Roadmap and Progress
    摘要:本文整理自阿里云高级技术专家宋辛童(五藏),在FFA2022核心技术专场的分享。本篇内容主要分为五个部分:FlinkShuffle的演进流批融合云原生自适应Shuffle3.0一、Flin......
  • FFA 2022 主会场 Keynote:Flink Towards Streaming Data Warehouse
    摘要:本文整理自ApacheFlink中文社区发起人、阿里巴巴开源大数据平台负责人王峰(莫问),在FlinkForwardAsia2022主会场的分享。本篇内容主要分为四个部分:实时流计算全球......