首页 > 其他分享 >flink的分流器-sideoutput Flink 有两种常见的 State类型,分别是:Keyed State (键控状态)和Operator State(算子状态)

flink的分流器-sideoutput Flink 有两种常见的 State类型,分别是:Keyed State (键控状态)和Operator State(算子状态)

时间:2024-02-19 19:11:07浏览次数:29  
标签:OutputTag 状态 Flink 分流器 键控 单词 State Operator

flink的分流器-sideoutput Flink 有两种常见的 State类型,分别是:Keyed State (键控状态)和Operator State(算子状态)

为了说明侧输出(sideouptut)的作用,浪尖举个例子,比如现在有一篇文章吧,单词长度不一,但是我们想对单词长度小于5的单词进行wordcount操作,同时又想记录下来哪些单词的长度大于了5,那么我们该如何做呢

比如,Datastream是单词流,那么一般做法(只写了代码模版)是
datastream.filter(word.length>=5); //获取不统计的单词,也即是单词长度大于等于5。
datastream.filter(word.length <5);// 获取需要进行wordcount的单词。

这样数据,然后每次筛选都要保留整个流,然后遍历整个流,显然很浪费性能,假如能够在一个流了多次输出就好了,flink的侧输出提供了这个功能,侧输出的输出(sideoutput)类型可以与主流不同,可以有多个侧输出(sideoutput),每个侧输出不同的类型。

如何使用侧输出。

1.定义OutputTag

在使用侧输出的时候需要先定义一个OutputTag。定义方式,如下:

OutputTag outputTag = newOutputTag(“side-output”) {};

OutputTag有两个构造函数,上面例子构造函数只有一个id参数,还有一个构造函数包括两个参数,id,TypeInformation信息。

OutputTag(String id)
OutputTag(String id, TypeInformation<T>typeInfo)
登录后复制

2.使用特定的函数

要使用侧输出,在处理数据的时候除了要定义相应类型的OutputTag外,还要使用特定的函数主要是有四个:

ProcessFunction

CoProcessFunction 两条输入流

ProcessWindowFunction

ProcessAllWindowFunction

3.示例

/**

  • 以用户自定义FlatMapFunction函数的形式来实现分词器功能,该分词器会将分词封装为(word,1),

  • 同时不接受单词长度大于5的,也即是侧输出都是单词长度大于5的单词。
    */
    public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(
    String value,
    Context ctx,
    Collector<Tuple2<String, Integer>> out) throws Exception {
    // normalize and split the line
    String[] tokens = value.toLowerCase().split("\W+");

    // emit the pairs
    for (String token : tokens) {
    if (token.length() > 5) {
    ctx.output(rejectedWordsTag, token);
    } else if (token.length() > 0) {
    out.collect(new Tuple2<>(token, 1));
    }
    }

    }
    }

Flink State 状态

Flink 有两种常见的 State类型,分别是:

Keyed State (键控状态)

Operator State(算子状态)

1.Keyed State(键控状态)

基于 KeyedStream 上的状态,这个状态是跟特定的Key绑定的。

KeyedStream流上的每一个Key,都对应一个 State。Flink针对Keyed State提供了以下可以保存 State 的数据结构

**※ ValueState:**保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 Key,

因此算子接收到的每个Key都可能对应一个值)。这个值可以通过 update(T)进行更新,通过 T value() 进行检索。

**※ ListState:**保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获取整个列表。还可以通过 update(List) 覆盖当前的列表。

※ ReducingState:保存一个单值,表示添加到状态的所有聚合。接口与ListState 类似,使用 add(T)增加元素,会使用提供的 ReduceFunction 进行聚合

**※ AggregatingState:**保留一个单值,表示添加到状态的所以值得聚合。和ReducingState 相反得是,聚合类型可能与添加到状态得元素得类型不同。接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

**※ FoldingState:**保留一个单值,表示添加到状态的所有值的聚合。与 ReducingState 相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState类型,但使用 add(T) 添加的元素会用指定的FoldFunction 折叠成聚合值。

**※ MapState:**维护了一个添加映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 分别检索映射、键和值的可迭代视图。

2.Operator State(算子状态)

Operator State 与 Key 无关,而是与 Operator 绑定,整个 Operator 只对应一个 State。比如:Flink 中的 Kafka Connector 就使用了 Operator State,它会在每个 Connector 实例中,保存该实例消费 Topic 的所有(partition,offset)映射。

img

原文链接:https://www.modb.pro/db/107507

标签:OutputTag,状态,Flink,分流器,键控,单词,State,Operator
From: https://www.cnblogs.com/sunny3158/p/18021770

相关文章

  • 博客园跳转编辑页面没有重新加载页面 pushState
    博客园前端是用angular写的全局搜索pushState,打断点,可以看到 pushState main.6267e7d35558bee5.is:1gomain.6267e7d35558bee5.js:1setBrowserUrl main.6267e7d35558bee5.js:1 setBrowserUrl(p,I){constQ=this.urlSerializer.serialize(p)......
  • 当创建statefulset资源后,k8s组件如何协作
    当创建statefulset资源后,k8s组件如何协作点击关注......
  • 【漏洞复现】用友NC-Cloud PMCloudDriveProjectStateServlet接口存在JNDI注入漏洞
    阅读须知花果山的技术文章仅供参考,此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等(包括但不限于)进行检测或维护参考,未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失,均由使用者本人负责。本......
  • Caused by: java.lang.IllegalStateException: A unix domain socket connection requ
    Causedby:java.lang.IllegalStateException:Aunixdomainsocketconnectionrequiresepollorkqueueandneitherisavailable出现这个错误,首先确保自己的操作系统是否支持epoll,或者kqueue。如果支持。请导入netty的大库,lettuce中好像缺失了一部分,我怀疑是这是怀疑,......
  • srs(state thread)如何实现协程切换
    417行的宏执行协程A上下文的保存419行_st_vp_schedule在RUNQ中找到一个待执行协程B,恢复协程B的上下文,切换到该协程B执行.协程B执行到io阻塞或者sleep事件,就会重新把协程B缓存起来,并寻找一个待执行协程(假设这里就AB两个协程),恢复协程A的上下文继续执行.完成协程切......
  • Hive连接报错:root is not allowed to impersonate root (state=08S01,code=0)
    问题描述使用hive/bin目录下的hive启动客户端,使用!connectjdbc:hive2://hadoop01:10000连接Hive数据仓库时提示输入用户名和密码,输入数据库的用户名和密码报错:Error:CouldnotopenclienttransportwithJDBCUri:jdbc:hive2://hadoop01:10000:Failedtoopennewsession......
  • Flink之状态编程 值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(Reducin
    Flink之状态编程值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(ReducingState)聚合状态(AggregatingState)广播状态(BroadcastState)Flink之状态编程一、按键分区状态(KeyedState)1.1、值状态(ValueState)1.1.1、定义1.1.2、使用案例1.2、列表状态(ListState)1.2.1......
  • useState返回的为什么是数组而不是对象?
    首先,const[count,setCount]=useState(0)这种语法是ES6的解构赋值语法。数组在解构赋值时,按照返回的顺序一一解构,并且可以重新命名:constfoo=[1,2,3]const[a,b,c]=foo//a=1,b=2,c=3而对象在解构赋值时,必须和useState函数内部返回的对象的key同名:constfood={......
  • 为什么不能这样使用 Object.assign(state, { visibilityFilter: action.filter })
    为什么不能这样使用Object.assign(state,{visibilityFilter:action.filter})?在Redux的reducer中,直接使用Object.assign(state,{visibilityFilter:action.filter})来修改状态是不推荐的做法。原因如下:纯函数原则:Redux要求reducer必须是一个纯函数,即给定相同的输入(sta......
  • 在K8S中,deploy和Statefulset有何区别?
    在Kubernetes(K8s)中,Deployment和StatefulSet是两种不同类型的控制器对象,它们设计用于管理Pod的生命周期,但在管理和部署的应用程序类型、持久性保证以及Pod标识等方面存在显著区别:Deployment:无状态应用:Deployment主要用于部署无状态服务,即服务实例之间可以相互替换且不需要保留......