合流
1、概念
将不同流中的数据汇聚在一起,然后可以进行一个统计等相关操作。
2、基本合流操作
union和connect
- union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并。
package com.pzb.transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* 演示DataStream-Transformation-合并和连接操作
*/
public class TransformationDemo02 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);
//TODO 2.transformation
DataStream<String> result1 = ds1.union(ds2);//注意union能合并同类型
//ds1.union(ds3);//注意union不可以合并不同类型
ConnectedStreams<String, String> result2 = ds1.connect(ds2);//注意:connet可以连接同类型
ConnectedStreams<String, Long> result3 = ds1.connect(ds3);//注意connet可以连接不同类型
/*
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT map1(IN1 value) throws Exception;
OUT map2(IN2 value) throws Exception;
}
*/
// map 传的是CoMapFunction
SingleOutputStreamOperator<String> result = result3.map(new CoMapFunction<String, Long, String>() {//第一个String是第一个数据流的数据类型,Long为第二个数据流的数据类型,第二个String为连接后的数据类型
@Override
public String map1(String value) throws Exception {//这个map是对ds1进行处理
return "String:" + value;//提取出ds1的数据(value),将其变成返回值
}
@Override
public String map2(Long value) throws Exception {//这个map是对ds3进行处理
return "Long:" + value;//提取出ds2的数据(value),将其变成返回值
}
});
//TODO 3.sink
result1.print();
//result2.print();//注意:connect之后需要做其他的处理,不能直接输出
//result3.print();//注意:connect之后需要做其他的处理,不能直接输出
result.print();
//TODO 4.execute
env.execute();
}
}
合并和连接的区别:
- 合并的数据类型必须相同,连接没有要求
- 连接后的数据不能直接print,只能execution
连接后的数据要想输出,需要进行相应的process操作,里面传的是对应的new CoProcessFunction<>参数。
将不同流中的数据按相同key
值合并:
另外,值得一提的是,ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2);
这里传入两个参数 keySelector1
和 keySelector2
,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行 keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起, 然后针对来源的流再做各自处理,这在一些场景下非常有用。另外,我们也可以在合并之前就将两条流分别进行 keyBy,得到的 KeyedStream 再进行连接(connect)操作,效果是一样的。
package com.peng.union_connect;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* @author 海绵先生
* @Description TODO
* @date 2022/11/12-14:36
*/
public class KeyedStreamToConnection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// source
DataStreamSource<Tuple2<String, Integer>> dataStream1 = env.fromElements(new Tuple2<String, Integer>("北京", 1),
new Tuple2<String, Integer>("南京", 2),
new Tuple2<String, Integer>("广东", 3));
DataStreamSource<Tuple2<String, Integer>> dataStream2 = env.fromElements(new Tuple2<String, Integer>("北京", 2),
new Tuple2<String, Integer>("广东", 2),
new Tuple2<String, Integer>("北京", 3));
dataStream1.print("dataStream1:");
dataStream2.print("dataStream2:");
ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedDataStream = dataStream1.keyBy(data -> data.f0)
.connect(dataStream2.keyBy(data -> data.f0));
connectedDataStream.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map1(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of("dataStream1的" + value.f0, value.f1);
}
@Override
public Tuple2<String, Integer> map2(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of("dataStream2的" + value.f0, value.f1);
}
}).print("key连接后的数据");
env.execute();
}
}
3、 双流联结
双流联结(Join)是基于时间的合流。
如果我们希望统计固定时间内两条流数据的匹配情况,那就需要设置定时器、自定义触发逻辑来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了两种内置的 join 算子,以及coGroup 算子。