Flink是间断性(punctuate)或者周期性(periodic)生成水位线的
1. 定义和用途
* punctuate:为每条消息都尝试生成watermark,这提供了更细粒度的控制,但增加了不必要的计算开销
* periodic:周期性的生成watermark,可以通过env.getConfig().setAutoWatermarkInterval(1 * 1000L)设置周期间隔,默认是200ms。这样更加高效,并且对于大多数场景已经够用
2. 社区趋势
* punctuate已经过期,
* periodic:通常被作为推荐的方式来生成watermark
点击查看代码
package flink.shangguigu_test;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Tublingwindow_pv {
private static final String KAFKA_SERVER = " 10.210.44.10:9092,10.210.44.33:9092,10.210.44.17:9092";
private static final String KAFKA_TOPIC = "flink_test";
private static final String KAFKA_GROUP_ID = "flink_test_consumer_group_id_001";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(1L); //每个一分钟分配一条水位线,默认是200毫秒 设置为1,是为了让每一条数据都生成watermark,便于分析结果数据
env.enableCheckpointing(5* 1000, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3*1000,5* 1000));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop-ha/user/flink/cluster_yarn/checkpoints");
DataStream<String> input = env.socketTextStream("127.0.0.1", 8085).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value != null && value != "";
}
});
SingleOutputStreamOperator<String> assign_event_time = input
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
JSONObject jsonObject = JSON.parseObject(element);
Long operate_time = jsonObject.getLong("operate_time");
return operate_time * 1000L;
}
})
);
SingleOutputStreamOperator<Tuple2<String,Long>> process = assign_event_time.process(new ProcessFunction<String, Tuple2<String,Long>>() {
@Override
public void processElement(String value, Context ctx, Collector<Tuple2<String,Long>> out) throws Exception {
Long watermark = ctx.timerService().currentWatermark();
out.collect(Tuple2.of("时间戳:" + JSON.parseObject(value).getLong("operate_time") +
" 事件时间:" + ctx.timestamp() +
" 上一条水位:" + ctx.timerService().currentWatermark() +
" 当前水位:" + (ctx.timestamp() - 2000 -1 > watermark ? ctx.timestamp() - 2000 -1 : watermark)
,1L)
);
}
});
SingleOutputStreamOperator<String> aggregate = process.windowAll(TumblingEventTimeWindows.of(Time.seconds(2))).trigger(EventTimeTrigger.create()).aggregate(new AggregateFunction<Tuple2<String, Long>, Tuple2<StringBuilder,Long>, String>() {
@Override
public Tuple2<StringBuilder, Long> createAccumulator() {
return Tuple2.of(new StringBuilder(),0L);
}
@Override
public Tuple2<StringBuilder, Long> add(Tuple2<String, Long> value, Tuple2<StringBuilder, Long> accumulator) {
return Tuple2.of(accumulator.f0.append(value.f0).append("-->\n"),value.f1 + accumulator.f1);
}
@Override
public String getResult(Tuple2<StringBuilder, Long> accumulator) {
return accumulator.f0 + "==" + accumulator.f1;
}
@Override
public Tuple2<StringBuilder, Long> merge(Tuple2<StringBuilder, Long> a, Tuple2<StringBuilder, Long> b) {
return null;
}
});
aggregate.print();
env.execute();
}
}
样例数据
{"UUID":"","operate_time":"1718718279"}
{"UUID":"","operate_time":"1718718279"}
{"UUID":"","operate_time":"1718718280"}
{"UUID":"","operate_time":"1718718280"}
{"UUID":"","operate_time":"1718718281"}
{"UUID":"","operate_time":"1718718285"}
{"UUID":"","operate_time":"1718718280"}
{"UUID":"","operate_time":"1718718279"}
执行后的结果:
时间戳:1718718260 事件时间:1718718260000 上一条水位:-9223372036854775808 当前水位:1718718257999-->
==1
时间戳:1718718279 事件时间:1718718279000 上一条水位:1718718257999 当前水位:1718718276999-->
时间戳:1718718279 事件时间:1718718279000 上一条水位:1718718276999 当前水位:1718718276999-->
==2
时间戳:1718718280 事件时间:1718718280000 上一条水位:1718718276999 当前水位:1718718277999-->
时间戳:1718718280 事件时间:1718718280000 上一条水位:1718718277999 当前水位:1718718277999-->
时间戳:1718718281 事件时间:1718718281000 上一条水位:1718718277999 当前水位:1718718278999-->
==3
(时间戳:1718718285事件时间:1718718285000 水位:1718718278999 当前水位:1718718282999,1)
结论:
flink的初始化水位是Long.min_value = -9223372036854775808,后序自动生成的watermark必须是大于0的,比如你设置第一条的event_time,计算出来的watermark=-2001,这个值不会被保留,也就是你第二条数据打印出来的上一条watermark还是-9223372036854775808
Flink中使用的都是毫秒,抽取数据中的event_time(如果是秒)时要转换成毫秒
水位线的计算方式是:按照当前最大时间戳-允许乱序的时间-1
在上述代码i中,watermark分配使用的是forBoundedOutOfOrderness,这个函数是周期性分配watermark,我们可以通过env.getConfig().setAutoWatermarkInterval(200L)设置水位周期性生成的时间间隔,默认是200ms。
watermark作为单独的流数据在flink中流转,ctx.timerService().currentWatermark()打印的是上一个水位值,所以我们要根据最新的event_time计算最新的水位值。水位值单调递增的,如果计算出来最新的水位值比当前的水位值小,则抛弃,否则更新逻辑时钟的水位值。
窗口的关闭和计算是根据最新的水位值判断的。当watermark大于窗口的关闭时间则触发窗口的执行。