0 序言
0.1 缘起
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* window 样例作业
* @author johnnyzen
* @reference-doc
* [1] Flink实战之窗口WindowsAPI使用示例 - Weixin - https://mp.weixin.qq.com/s/Pwv_pufdzi-WfPvXMID6aw
*/
public class SampleWindowJob {
private final static Logger logger = LoggerFactory.getLogger(SampleWindowJob.class);
public static void main(String[] args) throws Exception {
//1. 作业配置
Configuration jobConfiguration = new Configuration();
// 设置WebUI绑定的本地端口
//jobConfiguration.setInteger(RestOptions.PORT, 8081);//"rest.port" / RestOptions.PORT / RestOptions.BIND_PORT
jobConfiguration.setString(RestOptions.BIND_PORT, "18081");
//2. 创建流处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(jobConfiguration); // StreamExecutionEnvironment.getExecutionEnvironment();
//声明使用 eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//3. 使用 StreamExecutionEnvironment 创建 DataStream
//Source(可以有多个Source)
//Socket 监听本地端口 8888
// 接收一个socket文本流
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//4. 数据流的数据处理
//4.1 输入的活动数据转换
DataStream<Tuple3<String, Long, Integer>> windowCountDataStream =
lines.map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(String line) throws Exception {
String[] words = line.split(" ");
return new Tuple3<String, Long, Integer>(words[0], Long.valueOf(words[1]), 1);
}
});
//4.2 描述 flink 如何获取数据中的 event 时间戳进行判断
/**
* maxOutOfOrderness
* 1. 用于指定element允许滞后(t-t_w,t为 element 的 eventTime , t_w 为前一次 watermark 的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略
*/
Time maxOutOfOrderness = Time.milliseconds(1000);//描述延迟的 watermark 1秒
DataStream<Tuple3<String, Long, Integer>> textWithEventTimeDataStream =
windowCountDataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Integer>>(maxOutOfOrderness) {
/**
* 本方法会调用子类的 extractTimestamp 方法抽取时间
* 1. 如果该时间 大于 currentMaxTimestamp,则: 更新 currentMaxTimestamp;
* 2. getCurrentWatermark 先计算 potentialWM :
* 如果 potentialWM >= lastEmittedWatermark , 则: 更新 lastEmittedWatermark
* currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness :
* 这里表示 lastEmittedWatermark 太小了,所以差值超过了 maxOutOfOrderness ,因而调大 lastEmittedWatermark )
* 最后,返回 Watermark(lastEmittedWatermark)
* @param stringLongIntegerTuple3
* @return
*/
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> stringLongIntegerTuple3) {
return stringLongIntegerTuple3.f1;
}
}
).setParallelism(1);
//4.3 按 key 分组,keyBy 之后是分到各个分区,再开 window 去处理
KeyedStream<Tuple3<String, Long, Integer>, Tuple> textKeyStream = textWithEventTimeDataStream.keyBy(0);
textKeyStream.print("textKey: ");
//4.4 设置5秒的(会话窗口)活动时间间隔
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> sessionWindowStream = textKeyStream.window( EventTimeSessionWindows.withGap(Time.milliseconds(5000L)) ).sum(2);
//4.5 调用 Sink (Sink必须调用)
sessionWindowStream.print("sessionWindow: ").setParallelism(1);
//5. 启动 (此处若有异常不建议try...catch... 捕获。因为:它会抛给上层flink,flink根据异常来做相应的重启策略等处理)
env.execute("StreamWordCount");
}
}
- flink.version = 1.13.1
- scala.version = 2.12 / 2.11
1 源码分析
1.1 继承关系 : BoundedOutOfOrdernessTimestampExtractor extends AssignerWithPeriodicWatermarks extends TimestampAssigner
public abstract class org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T>
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T>
public interface TimestampAssigner<T> extends org.apache.flink.api.common.eventtime.TimestampAssigner<T>, Function
1.2 源码分析
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
: flink-streaming-java_2.11-1.13.1.jar
package org.apache.flink.streaming.api.functions.timestamps;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
/** The current maximum timestamp seen so far. 定义当前时间数据的最大时间戳 */
private long currentMaxTimestamp;
/** The timestamp of the last emitted watermark. 上一次提交的水印时间戳 */
private long lastEmittedWatermark = Long.MIN_VALUE;
/**
* The (fixed) interval between the maximum seen timestamp seen in the records
* and that of the watermark to be emitted.
*/
private final long maxOutOfOrderness; //最大乱序度,最大容忍的延时时长
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0L) {
throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
} else {
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
}
public long getMaxOutOfOrdernessInMillis() {
return this.maxOutOfOrderness;
}
/**
* Extracts the timestamp from the given element.
*
* @param element The element that the timestamp is extracted from.
* @return The new timestamp.
*/
public abstract long extractTimestamp(T var1);
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards. (这保证了水印不会倒退)
// 这个句代码保证了生成的水印是【单调递增】的
long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
if (potentialWM >= this.lastEmittedWatermark) {// 当前最大的时间戳减去延时时间 和 上次最后提交的水印时间比较
this.lastEmittedWatermark = potentialWM;// 保留最大的时间(减去延时时间)作为水印
}
return new Watermark(this.lastEmittedWatermark);
}
public final long extractTimestamp(T element, long previousElementTimestamp) {
// 提取业务事件数据中的时间作为 timestamp
long timestamp = this.extractTimestamp(element);//extractTimestamp 由 业务用户 编程实现
if (timestamp > this.currentMaxTimestamp) {
this.currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
-
BoundedOutOfOrdernessTimestampExtractor
抽象类实现AssignerWithPeriodicWatermarks
接口的extractTimestamp
及getCurrentWatermark
方法,同时声明抽象方法extractAscendingTimestamp
供子类实现 -
BoundedOutOfOrdernessTimestampExtractor
的构造器接收maxOutOfOrderness
参数用于指定element
允许滞后
- t-t_w
- t为element的eventTime
- t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略
BoundedOutOfOrdernessTimestampExtractor
的extractTimestamp方法会调用子类的extractTimestamp
方法抽取时间(t1)
- 如果该时间(t1) 大于
currentMaxTimestamp
,则: 更新currentMaxTimestamp
;- getCurrentWatermark 方法:
- 先计算potentialWM,如果 potentialWM >= lastEmittedWatermark,则: 更新 lastEmittedWatermark(currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness)
这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark),最后返回Watermark(lastEmittedWatermark)