分流
将一条数据流拆分成完全独立的两条、甚至多条流。基于一个 DataStream,得到完全平等的多个子 DataStream。
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
// 定义辅助标签
OutputTag<Tuple3<String, String, Long>> marryTag = new OutputTag<Tuple3<String, String, Long>>("Marry") {};
OutputTag<Tuple3<String, String, Long>> bobTag = new OutputTag<Tuple3<String, String, Long>>("Bob") {};
SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, ProcessFunction<Event, Event>.Context context, Collector<Event> collector) throws Exception {
if (event.user.equals("Marry")) {
context.output(marryTag, Tuple3.of(event.user, event.url, event.timestamp));
} else if (event.user.equals("Bob")) {
context.output(bobTag, Tuple3.of(event.user, event.url, event.timestamp));
} else {
collector.collect(event);
}
}
});
processStream.print("else");
processStream.getSideOutput(marryTag).print("Marry");
processStream.getSideOutput(bobTag).print("Bob");
基本合流操作
联合 Union
联合操作要求流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,且数据类型不变。
// union() 参数可以是多个,也就是可以实现多流合并
stream1.union(stream2, stream3, ...)
连接 Connect
连接流
为了处理更加灵活,连接操作允许流的数据类型不同。连接流可以看成是两条流形式上的“统一”,放在了同一个流中,事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的 DataStream, 还需要进一步定义一个“同处理”转换操作,用来说明对于不同来源、不同类型的数据,怎样进行处理转换,得到统一的输出类型。
DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3);
DataStreamSource<Long> stream2 = env.fromElements(4L, 5L, 6L);
stream1.connect(stream2).map(new CoMapFunction<Integer, Long, String>() {
@Override
public String map1(Integer integer) throws Exception {
return "Integer:" + integer.toString();
}
@Override
public String map2(Long aLong) throws Exception {
return "Long:" + aLong.toString();
}
}).print();
对账案例
// 来自app的支付日志
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L),
Tuple3.of("order-3", "app", 3500L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
return stringStringLongTuple3.f2;
}
}));
// 来自第三方支付平台的支付日志
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdStream = env.fromElements(
Tuple4.of("order-1", "third-part", "success", 3000L),
Tuple4.of("order-3", "third-part", "success", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
@Override
public long extractTimestamp(Tuple4<String, String, String, Long> stringStringLongTuple4, long l) {
return stringStringLongTuple4.f3;
}
}));
// 检测同一支付单在两条流中是否匹配
appStream.keyBy(data -> data.f0).connect(thirdStream.keyBy(data -> data.f0))
.process(new OrderMatchResult())
.print();
// 自定义实现CoProcessFunction
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String> {
// 定义状态变量,用来保存已经到达的事件
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String, String, Long>> thirdEventState;
@Override
public void open(Configuration parameters) throws Exception {
appEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
);
thirdEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple4<String, String, String, Long>>("third-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG))
);
}
@Override
public void processElement1(Tuple3<String, String, Long> stringStringLongTuple3, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context context, Collector<String> collector) throws Exception {
// 来的是app事件,看另一条流中是否来过
if (thirdEventState.value() != null) {
collector.collect("对账成功:" + stringStringLongTuple3 + " " + thirdEventState.value());
// 清空状态
thirdEventState.clear();
} else {
// 更新状态
appEventState.update(stringStringLongTuple3);
// 定义注册一个5s后的定时器,开始等待另一条流的事件
context.timerService().registerEventTimeTimer(stringStringLongTuple3.f2 + 5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> stringStringStringLongTuple4, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context context, Collector<String> collector) throws Exception {
// 来的是third事件,看另一条流中是否来过
if (appEventState.value() != null) {
collector.collect("对账成功:" + appEventState.value() + " " + stringStringStringLongTuple4);
// 清空状态
appEventState.clear();
} else {
// 更新状态
thirdEventState.update(stringStringStringLongTuple4);
// 定义注册一个定时器,开始等待另一条流的事件
context.timerService().registerEventTimeTimer(stringStringStringLongTuple4.f3);
}
}
@Override
public void onTimer(long timestamp, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
// 定时器触发,判断状态,如果某个状态不为空,说明另一条流中的事件未来
if (appEventState.value() != null) {
out.collect("对账失败:" + appEventState.value() + " " + "第三方支付平台信息未到");
}
if (thirdEventState.value() != null) {
out.collect("对账失败:" + thirdEventState.value() + " " + "app信息未到");
}
appEventState.clear();
thirdEventState.clear();
}
}
双流联结 Join
窗口联结
Flink 为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键的数据放在窗口中进行配对处理。
首先需要调用 DataStream 的 join()
方法来合并两条流,得到一个 JoinedStreams。然后通过 where()
和 equalTo()
方法指定两条流中联结的 key,再然后通过 window()
开窗并调用 apply()
传入联结窗口函数进行处理计算。
SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
})
);
SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = env.fromElements(
Tuple2.of("a", 3000),
Tuple2.of("b", 4000),
Tuple2.of("a", 4500),
Tuple2.of("b", 5500)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
})
);
stream1.join(stream2)
.where(data -> data.f0)
.equalTo(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Integer>, Object>() {
@Override
public Object join(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringLongTuple2 + "->" + stringIntegerTuple2;
}
}).print();
间隔联结
在有些场景下,我们要处理的时间间隔可能并不是固定的。间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。给定两个时间点,分别叫作间隔的“上界”和“下界”。
SingleOutputStreamOperator<Tuple2<String, Long>> orderStream = env.fromElements(
Tuple2.of("Marry", 1000L),
Tuple2.of("Alice", 5000L),
Tuple2.of("Bob", 10000L),
Tuple2.of("Alice", 20000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
})
);
SingleOutputStreamOperator<Event> clickStream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 4000L),
new Event("Alice", "./cart", 9000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
orderStream.keyBy(data -> data.f0)
.intervalJoin(clickStream.keyBy(data -> data.user))
.between(Time.seconds(-5), Time.seconds(5))
.process(new ProcessJoinFunction<Tuple2<String, Long>, Event, String>() {
@Override
public void processElement(Tuple2<String, Long> stringLongTuple2, Event event, ProcessJoinFunction<Tuple2<String, Long>, Event, String>.Context context, Collector<String> collector) throws Exception {
collector.collect(event + "->" + stringLongTuple2);
}
}).print();
窗口同组联结
与窗口联结不同的是,调用 apply()
方法定义具体操作时,传入的是一个 CoGroupFunction
函数类接口。
SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
})
);
SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = env.fromElements(
Tuple2.of("a", 3000),
Tuple2.of("b", 4000),
Tuple2.of("a", 4500),
Tuple2.of("b", 5500)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
})
);
stream1.coGroup(stream2)
.where(data -> data.f0)
.equalTo(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> iterable, Iterable<Tuple2<String, Integer>> iterable1, Collector<String> collector) throws Exception {
collector.collect(iterable + "->" + iterable1);
}
}).print();
标签:转换,Flink,多流,long,event,Tuple2,Override,new,public
From: https://www.cnblogs.com/fireonfire/p/17016135.html