【Flink从入门到精通 02】DataStream API
在之前的文章中,我们介绍了Flink的安装部署、基础概念,今天我们来一起学习Flink的核心之一DataStream API。
01 分布式流处理基础
上图中,我们将整个代码分为了三个部分,即分布式流处理的基本模型:
- Source
- Transformation
- Sink
从而,我们可以给出Flink编程框架:
// 1. 获取运行环境
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 加载数据源
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
// 3. 数据处理操作
DataStream<Person> adults = flintstones.filter(new
FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
// 4. 写出到Sink
adults.print();
// 5. 提交任务执行
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
02 Flink DataStream API概览
对于编程模型,这里我们重点关注第二步Transformations,对DataStream的处理。下图中,给出了DataStream到各个Stream流的转换过程。
首先来对各个Stream之间的转换算子做一个总结。
DataStream --> DataStream
fliter算子
对每条记录执行过滤函数,返回为true的结果。
SingleOutputStreamOperator<String> filter = kafkaDStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
if (s.contains("actions")) {
return true;
}
return false;
}
});
filter.print();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
map算子
一个输入对应一个输出,对每条记录执行MapFunction。
SingleOutputStreamOperator<String> map = kafkaDStream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return "大数据干货杨" + s;
}
});
map.print();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
flatMap算子
一个输入对应多个输出。
SingleOutputStreamOperator<String> flatMap = kafkaDStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] split = s.split("},");
for (String s1 : split) {
collector.collect(s1);
}
}
});
flatMap.print();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
DataStream --> KeyedStream
keyBy算子
按key对数据进行分组,相同key的数据被分配到同一个分区,内部使用的是hashcode。
下列情况的数据类型无法作为key:
- POJO类型没有重写hashcode方法并且依赖Object.hashcode()实现
- 任何类型的数组
dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);
- 1
- 2
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = jsonDS.map(new MapFunction<JSONObject, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(JSONObject jsonObject) throws Exception {
return new Tuple2<>(jsonObject.getJSONObject("common").getString("mid"), 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
}).sum(1);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
KeyedStream --> DataStream
Reduce算子
只有keyedStream可以执行reduce,将上次reduce的值和当前值聚合并提交新的结果
注意,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果
SingleOutputStreamOperator<JSONObject> reduce = jsonDS.keyBy(new KeySelector<JSONObject, String>() {
@Override
public String getKey(JSONObject value) throws Exception {
return value.getJSONObject("common").getString("mid");
}
}).reduce(new ReduceFunction<JSONObject>() {
@Override
public JSONObject reduce(JSONObject value1, JSONObject value2) throws Exception {
Integer num = Integer.parseInt(value1.getJSONObject("common").getString("is_new")) +
Integer.parseInt(value2.getJSONObject("common").getString("is_new"));
value1.getJSONObject("common").put("is_new", num);
return value1;
}
});
reduce.print();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
KeyedStream --> WindowedStream
window算子
在上一篇《Flink流式处理概念》的文章中,我们介绍了窗口的概念,即逻辑上将数据流划分成一个个的“桶”,对桶进行数据计算。
窗口定义在keyedStream上,窗口将每个key的数据按照某种规则进行分组,如最近5秒到达的数据。
下面只给出了window算子的使用样例,具体的使用方法会在后续文章中给出。
SingleOutputStreamOperator<JSONObject> reduceWindow = jsonDS.keyBy(new KeySelector<JSONObject, String>() {
@Override
public String getKey(JSONObject value) throws Exception {
return value.getJSONObject("common").getString("mid");
}
}).window(TumblingEventTimeWindows.of(Time.seconds(10))) // 指定滚动窗口,窗口大小10s
.reduce(new ReduceFunction<JSONObject>() {
@Override
public JSONObject reduce(JSONObject value1, JSONObject value2) throws Exception {
Integer num = Integer.parseInt(value1.getJSONObject("common").getString("is_new")) +
Integer.parseInt(value2.getJSONObject("common").getString("is_new"));
value1.getJSONObject("common").put("is_new", num);
return value1;
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
DataStream --> AllWindowedStream
windowAll算子
窗口定义在通用DataStream上,对整个流中的事件按照某种规则分组,没有并行操作,所有的数据会被聚合到一个任务中。
SingleOutputStreamOperator<JSONObject> reduceWindowAll = jsonDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.reduce(new ReduceFunction<JSONObject>() {
@Override
public JSONObject reduce(JSONObject value1, JSONObject value2) throws Exception {
Integer num = Integer.parseInt(value1.getJSONObject("common").getString("is_new")) +
Integer.parseInt(value2.getJSONObject("common").getString("is_new"));
value1.getJSONObject("common").put("is_new", num);
return value1;
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
WindowedStream/AllWindowedStream --> DataStream
apply算子
对窗口应用一个通用函数。
windowedStream.apply(new WindowFunction<Tuple2<String, Integer>, Integer, Tuple, window>(){
public void apply(Tuple tuple,Window window,Iterable<Tuple2<String,Integer>> values, Collector<Integer> out) throws Exception(){
int sum=0;
for(value t : values){
sum += t.f1;
}
out.collect(new Integer(sum));
}
});
allWindowedStream.apply(new AllWindowFunction<Tuple2<String, Integer>, Integer, Tuple, window>(){
public void apply(Tuple tuple,Window window,Iterable<Tuple2<String,Integer>> values, Collector<Integer> out) throws Exception(){
int sum=0;
for(value t : values){
sum += t.f1;
}
out.collect(new Integer(sum));
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
WindowReduce算子
对窗口应用一个reduce函数,并将结果返回,调用方法桶KeyedStream的reduce算子。
windowedStream.reduce(new ReduceFunction<Tuple2<String,Integer>>(){
public Tuple2<String,Integer> reduce(Tuple2<String,Integer> value1,Tuple2<String,Integer> value2) throws Exception{
return Tuple2<String,Integer>(value1.f0,value.f1+value2.f1);
}
});
- 1
- 2
- 3
- 4
- 5
DataStream* --> DataStream
union算子
创建一个新的DataStream,包含多个流的所有元素,要求流的数据类型相同。
dataStream.union(otherStream1,OtherStream2,...);
- 1
DataStream,DataStream --> DataStream
join算子
将两个DataStream根据给定的key进行join。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new JObFunction(){...});
- 1
- 2
- 3
- 4
CoGroup算子
在一个窗口内将两个DataStream按照给定的key进行组合
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new CoGroupFunction(){...});
- 1
- 2
- 3
- 4
KeyedStream,KeyedStream --> DataStream
Interval Join算子
将两个KeyedStream在一个给定时间间隔内进行join
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2),Time.milliseconds(2))
.upperBoundExclusive(true)
.lowerBoundExclusive(true)
.process(new IntervalJoinFunction(){...});
- 1
- 2
- 3
- 4
- 5
DataStream,DataStream --> ConnectedStream
connect算子
连接两个保持类型的数据流,两个数据流被connect后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
- 1
- 2
- 3
- 4
ConnectedStream --> DataStream
CoMap,CoFlatMap算子
类似map、flatMap,不过可以对两个流分别应用不同的处理逻辑
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>(){
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, Boolean>(){
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
DataStream --> IterativeStream -->ConnectedStream
Iterate算子
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建一个“反馈”循环。 这对于定义不断更新模型的算法特别有用。 下面的代码从一个流开始,不断地应用迭代体。 大于0的元素被送回反馈通道,其余元素向下游转发
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map(...);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception{
return value>0;
}
});
iteration.colseWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception{
return value = 0;
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
03 物理分组
Flink程序可以对每个算子设置并行度
使用物理分组可以对数据进行更细粒度的分区,常用的物理分组如下所示:
Custom Partitioning
使用用户自定义的分区函数来选择每个数据的目标。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
- 1
- 2
Random Partitioning
随机均匀分区,即随机分组。
dataStream.shuffle();
- 1
Rescaling
本地的轮流分配。
对于需要从源的每个并行实例输出到多个映射器的子集以分配负载但不希望 rebalance() 会导致完全重新平衡的情况,这个算子很有用。
同时只需要本地数据传输,而不是通过网络传输数据,具体取决于其他配置值,例如 TaskManager 的插槽数。
上游操作向其发送元素的下游操作子集取决于上游和下游操作的并行度。例如,如果上游操作的并行度为 2,下游操作的并行度为 6,则一个上游操作会将元素分配给三个下游操作,而另一个上游操作将分配给其他三个下游操作。另一方面,如果下游操作的并行度为 2,而上游操作的并行度为 6,则三个上游操作将分配给一个下游操作,而其他三个上游操作将分配给另一个下游操作。
在不同并行度不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。
jsonDS.keyBy(new KeySelector<JSONObject, String>() {
@Override
public String getKey(JSONObject value) throws Exception {
return value.getJSONObject("common").getString("mid");
}
}).map(new MapFunction<JSONObject, JSONObject>() {
@Override
public JSONObject map(JSONObject value) throws Exception {
return null;
}
}).setParallelism(6).rescale();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
Broadcasting
广播每一个元素到所有的分区,即将每个元素都会发送到所有分区。
dataStream.broadcast();
- 1
04 类型系统
Flink支持的数据类型如下:
类型 | 说明 |
---|---|
基本类型 | Java基本类型(包装类)以及void、String、Date、BigDecimal、BigInteger |
复合类型 | Tuple和Scala case class(不支持null)、ROW、POJO |
辅助、集合类型 | Option、Either、List、Map等 |
上述类型的数组 | |
其他类型 | 自定义类型 |
05 任务链和资源组
任务链:将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。
Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作),还提供了对链接更细粒度控制的 API ,这些API只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。
资源组:一个资源组对应着 Flink 中的一个 slot 槽,可以根据需要手动地将各个算子隔离到不同的 slot 中
Start New Chain
someStream.filter(...).map(...).startNewChain().map(...);
- 1
Disable Chaining
不开启链接操作。
someStream.map(...).disableChaining();
- 1
Set Slot Sharing Group
设置操作的槽共享组。 Flink 会将具有相同 slot 共享组的操作放在同一个 slot 中,而将没有 slot 共享组的操作保留在其他 slot 中。 这可用于隔离插槽。
如果所有输入操作都在同一个槽共享组中,则槽共享组继承自输入操作。 默认槽共享组的名称是“default”,可以通过调用 slotSharingGroup(“default”) 将操作显式放入该组。
someStream.filter(...).slotSharingGroup("name");
- 1
关于Flink的DataStream API就介绍到这里了,学习过程中还是需要自己去写代码才能领悟其中的精髓,文章中的代码可以去flink-learning的项目中获取,希望能给你带来帮助。