-
前言:
迟到数据,是指在watermark之后到来的数据,事件时间在水位线之前。所以只有在事件时间语义下,讨论迟到数据的处理才有意义。对于乱序流,可以设置一个延迟时间;对于窗口计算,可以设置窗口的允许延迟时间;另外可以将迟到数据输出到Side Outputs。
-
Trigger:
Trigger决定窗口调用窗口函数的时间,抽象类Trigger含有的方法: 1. onElement() called for each element that is added to a window. 2. onEventTime() called when a registered event-time timer fires. 3. onProcessingTime() called when a registered processing-time timer fires. 4. onMerge() elevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. 5. clear() performs any action needed upon removal of the corresponding window.
-
前三个方法返回TriggerResult,对应下列枚举,决定window操作:
CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, false), PURGE(false, true);
-
处理迟到数据:
在watermark之后到来的数据,事件时间在水位线之前。
-
设置水位线延迟时间:
watermark标记了event time的进展,是整个应用的全局逻辑时钟。 水位线会随着数据在任务间流动,从而给每个任务指明当前的事件时间。 生成水位线需要设置TimestampAssigner(分配事件时间的时间戳)和WatermarkGenerator(生成水位线的方法,on events 或者 periodically)。 当设置水位线的延迟后,所有定时器就都会按照延迟后的水位线来触发,注意一般情况不应该把延迟设置得太大,否则会大幅度降低流处理的实时性,视需求一般设在毫秒~秒级。
-
如 处理无序流常用:
WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.f0);
-
BoundedOutOfOrdernessWatermarks源码:
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> { /** The maximum timestamp encountered so far. */ private long maxTimestamp; /** The maximum out-of-orderness that this watermark generator assumes. */ private final long outOfOrdernessMillis; /** * Creates a new watermark generator with the given out-of-orderness bound. * * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps. */ public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { checkNotNull(maxOutOfOrderness, "maxOutOfOrderness"); checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative"); this.outOfOrdernessMillis = maxOutOfOrderness.toMillis(); // start so that our lowest watermark would be Long.MIN_VALUE. this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; } // ------------------------------------------------------------------------ @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); } }
-
允许窗口处理迟到数据:
对于窗口计算,如果水位线已经到了窗口结束时间,默认窗口就会关闭,那么迟到数据就要被丢弃,因此可以设置延迟时间,允许继续处理迟到数据的。 默认情况下延迟时间为0,若设置延迟时间后,watermark超过窗口结束时间戳,但未超过 延迟后的时间戳,迟到数据仍然可添加到窗口中,触发计算。 中间过程可视为,在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。逐步修正计算结果,最终得到准确的统计值。 注:对于GlobalWindows,不会存在迟到数据,因为全局窗口的结束时间戳为Long.MAX_VALUE. DataStream<T> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .<windowed transformation>(<window function>);
-
将迟到数据放入侧输出流:
窗口后关闭,仍然有迟到数据,则用侧输出流来收集关窗后的迟到数据,保证数据不丢失。 因为窗口已经真正关闭,只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新
-
实操:
pojo对象: public class Event { public String user; public String url; public long timestamp; public Event() { } public Event(String user, String url, Long timestamp) { this.user = user; this.url = url; this.timestamp = timestamp; } @Override public int hashCode() { return super.hashCode(); } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } @Override public String toString() { return "Event{" + "user='" + user + '\'' + ", url='" + url + '\'' + ", timestamp=" + new Timestamp(timestamp) + '}'; } }
-
核心代码
(1) 从kakfa抽取数据,并处理为Event数据流 (2) 设置watermark (3) window窗口操作 (4) 输出到控制台
public class LaterDataTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //读取kafka Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.42.102:9092,192.168.42.103:9092,192.168.42.104:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStream<Event> stream = env .addSource(new FlinkKafkaConsumer<String>( "clicks", new SimpleStringSchema(), properties )) .flatMap(new FlatMapFunction<String, Event>() { @Override public void flatMap(String s, Collector<Event> collector) throws Exception { String[] split = s.split(","); collector.collect(new Event(split[0], split[1], Long.parseLong(split[2]))); } }); //水位线 乱序流 需要设置Duration和timestampAssigner SingleOutputStreamOperator<Event> watermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(2)).withTimestampAssigner( new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } })); watermarks.print("input"); //定义一个输出标签 OutputTag<Event> late = new OutputTag<Event>("late") { }; //设置延迟时间 SingleOutputStreamOperator<UrlViewCount> aggregate = watermarks .keyBy(new KeySelector<Event, String>() { @Override public String getKey(Event value) throws Exception { return value.url; } }) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.minutes(1)) .sideOutputLateData(late) .aggregate(new AggregateFunction<Event, Long, Long>() { @Override public Long createAccumulator() { return 0L; } @Override public Long add(Event value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return null; } } , new ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception { long start = context.window().getStart(); long end = context.window().getEnd(); Long next = elements.iterator().next(); long currentWatermark = context.currentWatermark(); out.collect(new UrlViewCount(s + " 水位线" + currentWatermark, next, start, end)); } } ); //输出 aggregate.print("result"); aggregate.getSideOutput(late).print("late"); env.execute(); } }
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){ }; DataStream<T> input = ...; SingleOutputStreamOperator<T> result = input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
-
迟到数据触发窗口计算重复结果处理:
late firings更新计算结果后,数据流中将包含同一窗口计算的多个结果,需要对重复数据进行删除
-
备注:
文章转载自[但行益事莫问前程]的博客,链接:https://copyfuture.com/blogs-details/202205230509048677