首页 > 其他分享 >Flink的状态

Flink的状态

时间:2023-01-09 15:56:43浏览次数:62  
标签:状态 Flink flink value api org apache import

State-理解原理即可

Flink中状态的自动管理

之前写的Flink代码中其实已经做好了状态自动管理,如

发送hello ,得出(hello,1)

再发送hello ,得出(hello,2)

说明Flink已经自动的将当前数据和历史状态/历史结果进行了聚合,做到了状态的自动管理

在实际开发中绝大多数情况下,我们直接使用自动管理即可

一些特殊情况才会使用手动的状态管理!---后面项目中会使用!

所以这里得先学习state状态如何手动管理!

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Desc 演示DataStream-Source-基于Socket
 */
public class SourceDemo03_Socket {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> lines = env.socketTextStream("node1", 9999);


        //TODO 2.transformation
        /*SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] arr = value.split(" ");
                for (String word : arr) {
                    out.collect(word);
                }
            }
        });

        words.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value,1);
            }
        });*/

        //注意:下面的操作将上面的2步合成了1步,直接切割单词并记为1返回
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] arr = value.split(" ");
                for (String word : arr) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);

        //TODO 3.sink
        result.print();

        //TODO 4.execute
        env.execute();
    }
}

无状态计算和有状态计算

  • 无状态计算,不需要考虑历史值, 如map

hello --> (hello,1)

hello --> (hello,1)

  • 有状态计算,需要考虑历史值,如:sum

hello , (hello,1)

hello , (hello,2)

状态分类

  • State
    • ManagerState--开发中推荐使用 : Fink自动管理/优化,支持多种数据结构
      - KeyState--只能在keyedStream上使用,支持多种数据结构
      - OperatorState--一般用在Source上,支持ListState
    • RawState--完全由用户自己管理,只支持byte[],只能在自定义Operator上使用
      • OperatorState

分类详细图解:

代码演示-ManagerState-keyState

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Desc 使用KeyState中的ValueState获取流数据中的最大值/实际中可以使用maxBy即可
 */
public class StateDemo01_KeyState {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L)
        );

        //TODO 2.transformation
        //需求:求各个城市的value最大值
        //实际中使用maxBy即可
        DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0).maxBy(1);

        //学习时可以使用KeyState中的ValueState来实现maxBy的底层
        DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
            //-1.定义一个状态用来存放最大值
            private ValueState<Long> maxValueState;

            //-2.状态初始化
            @Override
            public void open(Configuration parameters) throws Exception {
                //创建状态描述器
                ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("maxValueState", Long.class);
                //根据状态描述器获取/初始化状态
                maxValueState = getRuntimeContext().getState(stateDescriptor);
            }

            //-3.使用状态
            @Override
            public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
                Long currentValue = value.f1;
                //获取状态
                Long historyValue = maxValueState.value();
                //判断状态
                if (historyValue == null || currentValue > historyValue) {
                    historyValue = currentValue;
                    //更新状态
                    maxValueState.update(historyValue);
                    return Tuple3.of(value.f0, currentValue, historyValue);
                } else {
                    return Tuple3.of(value.f0, currentValue, historyValue);
                }
            }
        });


        //TODO 3.sink
        //result1.print();
        //4> (北京,6)
        //1> (上海,8)
        result2.print();
        //1> (上海,xxx,8)
        //4> (北京,xxx,6)

        //TODO 4.execute
        env.execute();
    }
}

标签:状态,Flink,flink,value,api,org,apache,import
From: https://www.cnblogs.com/Mr-Sponge/p/17037271.html

相关文章

  • Flink设置Source数据源
    流处理说明有边界的流boundedstream:批数据无边界的流unboundedstream:真正的流数据Source基于集合packagecom.pzb.source;importorg.apache.flink.api.co......
  • Flink的高级应用watermake理论
    Time/Watermarker时间分类EventTime的重要性和Watermarker的引入代码演示-开发版-掌握https://ci.apache.org/projects/flink/flink-docs-release-1.12/de......
  • Flink的转换方法
    流处理说明Transformation基本操作map/flatMap/filter/keyBy/sum/reduce...和之前学习的Scala/Spark里面的一样的意思map方法、flatmap方法、keyBy方法、reduce方法m......
  • Flink处理函数
    ProcessFlink提供了8个不同的处理函数:(1)ProcessFunction最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。(2)KeyedProcessFunction对流按键分区......
  • 基于Flink CDC的现代数据栈 (Modern Data Stack)实现
    ......
  • Linux 防火墙状态
    1.查看防火墙状态:active(running)即是开启状态:systemctlstatusfirewalld2.查看已开发端口命令:firewall-cmd--list-all3.新增防火墙开放端口:firewall-cmd--zone=......
  • C 语言实现简单有限状态机
    简介常说的状态机是有限状态机FSM,是表示有限个状态以及在这些状态之间的转移和动作等行为的数学计算模型。三个特征:状态总数(state)是有限的。任一时刻,只处在一种状态......
  • 浅谈PHP设计模式的状态模式
    简介:状态模式,属于行为型的设计模式。当一个对象的内在状态发生改变时,允许改变其行为,这个对象看起来像是改变了其类。适用场景:控制一个对象的状态改变过于复杂时,把状态......
  • 有限自动状态机(Finite State Machine)
    有限状态自动机是拥有有限数量的状态,并且每个状态可以变换其他状态的数学模型。Afinite-statemachine(FSM)orfinite-stateautomaton(FSA,plural:automata),fin......
  • 设计模式-状态变化-State、Memento
    状态模式上述代码缺点:如果增加一个Stat,要加很多ifelse改进方法Memento备忘录模式如果快照多了比如50次redo/需要用到一些复杂的技术,比如序列化......