flink的分流器-sideoutput Flink 有两种常见的 State类型,分别是:Keyed State (键控状态)和Operator State(算子状态)
为了说明侧输出(sideouptut)的作用,浪尖举个例子,比如现在有一篇文章吧,单词长度不一,但是我们想对单词长度小于5的单词进行wordcount操作,同时又想记录下来哪些单词的长度大于了5,那么我们该如何做呢
比如,Datastream是单词流,那么一般做法(只写了代码模版)是
datastream.filter(word.length>=5); //获取不统计的单词,也即是单词长度大于等于5。
datastream.filter(word.length <5);// 获取需要进行wordcount的单词。
这样数据,然后每次筛选都要保留整个流,然后遍历整个流,显然很浪费性能,假如能够在一个流了多次输出就好了,flink的侧输出提供了这个功能,侧输出的输出(sideoutput)类型可以与主流不同,可以有多个侧输出(sideoutput),每个侧输出不同的类型。
如何使用侧输出。
1.定义OutputTag
在使用侧输出的时候需要先定义一个OutputTag。定义方式,如下:
OutputTag
OutputTag有两个构造函数,上面例子构造函数只有一个id参数,还有一个构造函数包括两个参数,id,TypeInformation信息。
OutputTag(String id)
OutputTag(String id, TypeInformation<T>typeInfo)
登录后复制
2.使用特定的函数
要使用侧输出,在处理数据的时候除了要定义相应类型的OutputTag外,还要使用特定的函数主要是有四个:
ProcessFunction
CoProcessFunction 两条输入流
ProcessWindowFunction
ProcessAllWindowFunction
3.示例
/**
-
以用户自定义FlatMapFunction函数的形式来实现分词器功能,该分词器会将分词封装为(word,1),
-
同时不接受单词长度大于5的,也即是侧输出都是单词长度大于5的单词。
*/
public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;@Override
public void processElement(
String value,
Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\W+");// emit the pairs
for (String token : tokens) {
if (token.length() > 5) {
ctx.output(rejectedWordsTag, token);
} else if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}}
}
Flink State 状态
Flink 有两种常见的 State类型,分别是:
Keyed State (键控状态)
Operator State(算子状态)
1.Keyed State(键控状态)
基于 KeyedStream 上的状态,这个状态是跟特定的Key绑定的。
KeyedStream流上的每一个Key,都对应一个 State。Flink针对Keyed State提供了以下可以保存 State 的数据结构
**※ ValueState:**保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 Key,
因此算子接收到的每个Key都可能对应一个值)。这个值可以通过 update(T)进行更新,通过 T value() 进行检索。
**※ ListState:**保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List
※ ReducingState:保存一个单值,表示添加到状态的所有聚合。接口与ListState 类似,使用 add(T)增加元素,会使用提供的 ReduceFunction 进行聚合
**※ AggregatingState:**保留一个单值,表示添加到状态的所以值得聚合。和ReducingState 相反得是,聚合类型可能与添加到状态得元素得类型不同。接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
**※ FoldingState:**保留一个单值,表示添加到状态的所有值的聚合。与 ReducingState 相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState类型,但使用 add(T) 添加的元素会用指定的FoldFunction 折叠成聚合值。
**※ MapState:**维护了一个添加映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 分别检索映射、键和值的可迭代视图。
2.Operator State(算子状态)
Operator State 与 Key 无关,而是与 Operator 绑定,整个 Operator 只对应一个 State。比如:Flink 中的 Kafka Connector 就使用了 Operator State,它会在每个 Connector 实例中,保存该实例消费 Topic 的所有(partition,offset)映射。
原文链接:https://www.modb.pro/db/107507 标签:OutputTag,状态,Flink,分流器,键控,单词,State,Operator From: https://www.cnblogs.com/sunny3158/p/18021770