Flink教程(6)-Flink Window 详解
文章目录
Flink Window
实际应用中真实的流数据都是无界的,即源源不断地传入。那么如何处理这种无界的数据呢?从微观的角度出发,我们可以把无限的流数据进行切分,得到一个个局部有限数据集。比如1分钟内的数据、1小时内的数据、1天的数据;前5个数据、前10个数据、前100个数据。这种将无限流划分为有限流的方式在flink中称之为window。Flink通过window机制将无限的流数据分发到具有有限大小的bucket(桶)中进行处理。
window原理与分类
基于时间的Time Window
-
滚动时间窗口(tumbling windows)
按照固定的时间间隔对数据进行切分。时间对齐,窗口跨度固定,没有重叠。
比如下图中的窗口划分,每隔1分钟统计一次这1分钟内的数据之和。第一个窗口[1min,2min)期间,user1这条数据流的和3、第二个窗口[2min,3min)和为4、第三个窗口[3min,4min)和为5,第四个窗口[4min,5min)和为24。
-
滑动时间窗口(sliding windows)
滑动窗口可以看作是固定窗口的更一般化形式。滑动窗口由固定的窗口长度和滑动间隔构成,窗口长度固定,窗口可以重叠。
比如下图中的窗口划分,每隔半分钟(30秒)统计一次1分钟内的数据之和。第一个窗口[1min,2min)期间,user1这条数据流的和3、第二个窗口[1.5min,2.5min)和为3、第三个窗口[2min,3min)和为4,第四个窗口[2.5min,3.5min)和为3。
-
会话窗口(session windows)
类似web应用的session。由一系列事件加上一个指定长度的timeout间隔组成,比如指定的一段时间内没有收到新数据生成一个新的窗口。比如下图中的窗口划分,每隔10秒没有新的数据到达则开启一个会话窗口统计这段时间内数据之和。user1数据流第一个会话窗口内数据和为10、第二个会话窗口内数据和为14;user2数据流第一个会话窗口内数据和为27。
滚动窗口前闭后开,滑动的步子大一点,等于一个窗口大小,就退化为滚动窗口。
基于计数的Count Window
-
滚动计数窗口
按照固定的元素个数(事件数量)对数据进行切分。窗口元素个数跨度固定,没有重叠。
比如下图中,数据流中每到达3个数据统计一次和。user1数据流第一个窗口内的数据和为10、第二个窗口内数据和为17。user2数据流第一个窗口内数据和为10、第二个窗口和为14。
-
滑动计数窗口
由固定的窗口长度(元素个数)和滑动个数隔构成,窗口长度固定,可以重叠。
比如下图中,每隔2个数据统计一次3个数之和。user1数据流第一个窗口内的数据之和为5、第二个窗口内数据和为15、第三个窗口内数据和为17。
3)Global Window 和 Keyed Window
在使用窗口计算时,Flink根据上游数据集是否为KeyedStream类型,对应的Window类型也会有所不同。
-
Keyed Window
上游数据集如果是KeyedStream类型,即keyBy之后的数据,则调用DataStream API 的Window()方法时数据会根据Key在不同的Task中分别聚合,最后得出窗口内针对每个Key分别统计的结果。
-
Global Window
如果上游数据集是Non-Keyed类型,即没有keyBy操作,则调用WindowsAll()方法,所有的数据都会在同一个窗口中汇到一个Task中计算,并得出窗口内的全局统计结果。 -
举例:
// Global Window
dataStream.windowAll(自定义的WindowAssigner)
//KeyedWindow
dataStream.keyBy(_.filed).window(自定义的WindowAssigner)
window api
前文讲述了几种window的分类和概念。本节展示在flink应用程序中如何使用window。
首先要引入flink的依赖包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
在flink中通常使用.window()开启一个Keyed Window窗口,用.windowAll()方法开启Global Window。即window()方法必须在keyBy之后才能调用,然后可以基于此window做业务聚合操作。下面以一个统计司机实时上报里程数的例子展示各种window的用法。
实体类DriverMileages
public class DriverMileages {
public String driverId;
/**
* 上报时的时间戳,单位毫秒
*/
public long timestamp;
/**
* 相比上一个时间戳内行驶的里程数,单位米
*/
public double currentMileage;
//省略getter/setter/toString方法
}
1)滚动时间窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(x)))
简写方式:.timeWindow(Time.seconds(x))
需求:每隔5秒钟统计一次这5s内各个司机行驶的总里程数
public class WindowDemo1 {
public static void main(String[] args) throws Exception {
int port = 9001;
String hostname = "192.168.174.136";
String delimiter = "\n";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
//连接socket获取输入数据
DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
//字符串数据转换为实体类
data.map(new MapFunction<String, DriverMileages>() {
@Override
public DriverMileages map(String value) throws Exception {
String[] split = value.split(",");
DriverMileages driverMileages = new DriverMileages();
driverMileages.driverId = split[0];
driverMileages.currentMileage = Double.parseDouble(split[1]);
driverMileages.timestamp = Long.parseLong(split[2]);
return driverMileages;
}
})
.keyBy(DriverMileages::getDriverId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//.timeWindow(Time.seconds(5))
.sum("currentMileage")
.print();
//启动计算任务
environment.execute("window demo1");
}
}
Linux 命令行开启socket监听,输入数据
[root@vm1 ~]# nc -lk 9001
1001,20,1605419689100
1002,25,1605419690100
1001,20,1605419689100
IDEA控制台输出
DriverMileages{driverId='1001', timestamp=1605419689100, mileage=40.0}
DriverMileages{driverId='1002', timestamp=1605419690100, mileage=25.0}
如果不设置窗口,则会从开始统计时,一直累加,不会划分窗口来统计。
2)滑动时间窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(x), Time.seconds(y)))
简写方式:.timeWindow(Time.seconds(x),Time.seconds(y))
需求:每隔2秒钟统计一次司机5秒内的总里程数
public static void main(String[] args) throws Exception {
int port = 9001;
String hostname = "192.168.174.136";
String delimiter = "\n";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
//连接socket获取输入数据
DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
//字符串数据转换为实体类
data.map(new MapFunction<String, DriverMileages>() {
@Override
public DriverMileages map(String value) throws Exception {
String[] split = value.split(",");
DriverMileages driverMileages = new DriverMileages();
driverMileages.driverId = split[0];
driverMileages.currentMileage = Double.parseDouble(split[1]);
driverMileages.timestamp = Long.parseLong(split[2]);
return driverMileages;
}
})
.keyBy(DriverMileages::getDriverId)
.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2)))
//.timeWindow(Time.seconds(5),Time.seconds(2))
.sum("currentMileage")
.print();
//启动计算任务
environment.execute("window demo1");
}
3)会话窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
需求:若司机每隔10s没有新数据则输出一次统计结果
public static void main(String[] args) throws Exception {
int port = 9001;
String hostname = "192.168.174.136";
String delimiter = "\n";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
//连接socket获取输入数据
DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
//字符串数据转换为实体类
data.map(new MapFunction<String, DriverMileages>() {
@Override
public DriverMileages map(String value) throws Exception {
String[] split = value.split(",");
DriverMileages driverMileages = new DriverMileages();
driverMileages.driverId = split[0];
driverMileages.currentMileage = Double.parseDouble(split[1]);
driverMileages.timestamp = Long.parseLong(split[2]);
return driverMileages;
}
})
.keyBy(DriverMileages::getDriverId)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.sum("currentMileage")
.print();
//启动计算任务
environment.execute("window demo1");
}
数据源源不断地进入flink,如果两个数据之间的时间间隔小于设定的10秒,它们还是属于同一个时间窗口。如果两个数据之间的时间间隔大于10秒了,则前一个窗口关闭,后一个窗口开启。
4)滚动计数窗口
需求:司机每上传3次数据统计一次这3次的总里程
.countWindow(x)
public static void main(String[] args) throws Exception {
int port = 9001;
String hostname = "192.168.174.136";
String delimiter = "\n";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
//连接socket获取输入数据
DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
//字符串数据转换为实体类
data.map(new MapFunction<String, DriverMileages>() {
@Override
public DriverMileages map(String value) throws Exception {
String[] split = value.split(",");
DriverMileages driverMileages = new DriverMileages();
driverMileages.driverId = split[0];
driverMileages.currentMileage = Double.parseDouble(split[1]);
driverMileages.timestamp = Long.parseLong(split[2]);
return driverMileages;
}
})
.keyBy(DriverMileages::getDriverId)
.countWindow(3)
.sum("currentMileage")
.print();
//启动计算任务
environment.execute("window demo1");
}
linux命令行输入如下数据,可见只有3个数据到达时才会触发计算
[root@vm1 ~]# nc -lk 9001
1002,25,1605419690100
1001,20,1605419689100
1002,25,1605419690100
1001,20,1605419689100
1002,25,1605419690100
1001,20,1605419689100
IDEA控制台输出
DriverMileages{driverId='1002', timestamp=1605419690100, mileage=75.0}
DriverMileages{driverId='1001', timestamp=1605419689100, mileage=60.0}
5)滑动计数窗口
需求:司机每上传1次数据统计一次前3次(包含本次)数据的总里程
.countWindow(x,y)
public static void main(String[] args) throws Exception {
int port = 9001;
String hostname = "192.168.174.136";
String delimiter = "\n";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
//连接socket获取输入数据
DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
//字符串数据转换为实体类
data.map(new MapFunction<String, DriverMileages>() {
@Override
public DriverMileages map(String value) throws Exception {
String[] split = value.split(",");
DriverMileages driverMileages = new DriverMileages();
driverMileages.driverId = split[0];
driverMileages.currentMileage = Double.parseDouble(split[1]);
driverMileages.timestamp = Long.parseLong(split[2]);
return driverMileages;
}
})
.keyBy(DriverMileages::getDriverId)
.countWindow(3,1)
.sum("currentMileage")
.print();
//启动计算任务
environment.execute("window demo1");
}
6)全量窗口
.windowAll统计时,不再需要根据key区分数据,而是对全量数据在窗口期内做统计。
全量时间:.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));每5秒钟统计一次
全量计数:.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(2)));每2个数据统计一次
public class WindowDemo1 {
public static void main(String[] args) throws Exception {
int port = 9001;
String hostname = "192.168.174.136";
String delimiter = "\n";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
//连接socket获取输入数据
DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
//字符串数据转换为实体类
data.map(new MapFunction<String, DriverMileages>() {
@Override
public DriverMileages map(String value) throws Exception {
String[] split = value.split(",");
DriverMileages driverMileages = new DriverMileages();
driverMileages.driverId = split[0];
driverMileages.currentMileage = Double.parseDouble(split[1]);
driverMileages.timestamp = Long.parseLong(split[2]);
return driverMileages;
}
})
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum("currentMileage")
.print();
//启动计算任务
environment.execute("windowAll demo1");
}
}
此时,不管是按时间还是计数方式,输出的是全量数据统计结果,不按司机id区分
[root@vm1 ~]# nc -lk 9001
1001,50,1605419689100
1002,25,1605419690100
DriverMileages{driverId='1001', timestamp=1605419689100, mileage=75.0}
7)简写方式
在上述api的例子中,可以看到Flink为内置的窗口提供了简写方式,不需要调用.window()方法,直接简写即可,比如:
滚动时间窗口.timeWindow(Time.seconds(10))
滑动时间窗口.timeWindow(Time.seconds(10),Time.seconds(2))
滚动计数窗口.countWindow(10)
滑动计数窗口.countWindow(10,2)
简写方式,传一个参数表示滚动窗口,2个参数表示滑动窗口。滑动窗口有2个参数,时间和滑动步长。
滚动窗口有offset参数,比如9:00->10:00 是1小时, 9:05->10:05也是1小时,这里的5分钟就是offset。
简化方式也不能指定offset偏移量,原始方式可以。
会话窗口没有简写方式。
8)WindowAssigner
.window()方法的输入参数实质上是一个WindowAssigner类型,到达窗口的元素被传递给 WindowAssigner,由WindowAssigner负责把每个输入数据分发到正确的window中。
Flink内置了一些通用的WindowAssigner。
TumblingEventTimeWindows:基于EventTime的滚动时间窗口分配器
TumblingProcessingTimeWindows:基于ProcessingTime的滚动时间窗口分配器
SlidingEventTimeWindows:基于EventTime的滑动时间窗口分配器
SlidingProcessingTimeWindows:基于ProcessingTime的滑动时间窗口分配器
EventTimeSessionWindows:基于EventTime的会话窗口分配器
ProcessingTimeSessionWindows:基于ProcessingTime的会话窗口分配器
GlobalWindows:全局窗口分配器,所有数据分配到一个窗口
上面例子中,窗口操作都没有设置时间语义,因此用的是Flink默认的时间语义,即ProcessTime。也可以使用EventTime(事件时间)。事件时间一般结合watermark使用,具体使用方式在讲解watermark时给出。
9)开窗函数(window function)
开窗函数定义了对window中收集的数据如何处理,可以分为两类。
-
增量聚合函数
来一条数据计算一次,比如ReduceFunction和AggregationFunction。性能比全量聚合高,但无法获取窗口元数据。ReduceFunction函数:使用reduce函数自行实现求和功能。
public class WindowFunctionDemo1 { public static void main(String[] args) throws Exception { int port = 9001; String hostname = "192.168.174.136"; String delimiter = "\n"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); //连接socket获取输入数据 DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter); //字符串数据转换为实体类 data.map(new MapFunction<String, DriverMileages>() { @Override public DriverMileages map(String value) throws Exception { String[] split = value.split(","); DriverMileages driverMileages = new DriverMileages(); driverMileages.driverId = split[0]; driverMileages.currentMileage = Double.parseDouble(split[1]); driverMileages.timestamp = Long.parseLong(split[2]); return driverMileages; } }) .keyBy(DriverMileages::getDriverId) .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<DriverMileages>() { @Override public DriverMileages reduce(DriverMileages t1, DriverMileages t2) throws Exception { DriverMileages newItem = new DriverMileages(); newItem.driverId = t1.driverId; newItem.currentMileage = t1.getCurrentMileage() + t2.getCurrentMileage(); return newItem; } }) .print(); //启动计算任务 environment.execute("window demo1");
AggregationFunction函数,需求:使用aggregation函数实现求和功能。Aggregation可以看作是ReduceFunction的通用版本。
Aggregation支持三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT),可以使用自定义实体类。
public static void main(String[] args) throws Exception { int port = 9001; String hostname = "192.168.174.136"; String delimiter = "\n"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); //连接socket获取输入数据 DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter); //字符串数据转换为实体类 data.map(new MapFunction<String, DriverMileages>() { @Override public DriverMileages map(String value) throws Exception { String[] split = value.split(","); DriverMileages driverMileages = new DriverMileages(); driverMileages.driverId = split[0]; driverMileages.currentMileage = Double.parseDouble(split[1]); driverMileages.timestamp = Long.parseLong(split[2]); return driverMileages; } }) .keyBy(DriverMileages::getDriverId) .timeWindow(Time.seconds(5)) .aggregate(new AggregateFunction<DriverMileages, DriverMileages, DriverMileages>() { /** * 创建累加器保存中间状态 * @return */ @Override public DriverMileages createAccumulator() { return new DriverMileages(); } /** * 将元素添加到累加器并返回新的累加器 * @param value * @param accumulator * @return */ @Override public DriverMileages add(DriverMileages value, DriverMileages accumulator) { DriverMileages newItem = new DriverMileages(); newItem.currentMileage = value.currentMileage + accumulator.currentMileage; newItem.driverId = value.driverId; return newItem; } /** * 从累加器提取结果 * @param accumulator * @return */ @Override public DriverMileages getResult(DriverMileages accumulator) { return accumulator; } /** * 合并累加器 * @param a * @param b * @return */ @Override public DriverMileages merge(DriverMileages a, DriverMileages b) { DriverMileages newItem = new DriverMileages(); newItem.currentMileage = a.currentMileage + b.currentMileage; newItem.driverId = a.driverId; return newItem; } }) .print(); //启动计算任务 environment.execute("window demo1"); }
-
全量窗口函数
先把窗口期内的所有数据收集起来,等到计算的时候一次性处理所有数据,比如ProcessWindowFunction。由于先要缓存窗口内的所有元素,因此性能略低,但是可以拿到窗口元数据。
ProcessWindowFunction:实现司机里程数求和功能,并输出详细的window信息。
public static void main(String[] args) throws Exception { int port = 9001; String hostname = "192.168.174.136"; String delimiter = "\n"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); //连接socket获取输入数据 DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter); //字符串数据转换为实体类 data.map(new MapFunction<String, DriverMileages>() { @Override public DriverMileages map(String value) throws Exception { String[] split = value.split(","); DriverMileages driverMileages = new DriverMileages(); driverMileages.driverId = split[0]; driverMileages.currentMileage = Double.parseDouble(split[1]); driverMileages.timestamp = Long.parseLong(split[2]); return driverMileages; } }) .keyBy(DriverMileages::getDriverId) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction<DriverMileages, Object, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<DriverMileages> elements, Collector<Object> out) throws Exception { Iterator<DriverMileages> iterator = elements.iterator(); long total = 0; int count = 0;//统计窗口内元素个数 String key = ""; while (iterator.hasNext()) { DriverMileages next = iterator.next(); total += next.currentMileage; count++; key = next.driverId; } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); out.collect("key:" + key + ",window[" + sdf.format(context.window().getStart()) + "," + sdf.format(context.window().getEnd()) + "),count:" + count + ",sum is:" + total); } }) .print(); //启动计算任务 environment.execute("window demo1"); }
linux命令行输入
[root@vm1 ~]# nc -lk 9001 1002,25,1605419690100 1002,25,1605419690100 1001,70,1605419689100
IDEA控制台输出
key:1002,window[2020-11-15 21:20:20.000,2020-11-15 21:20:25.000),count:2,sum is:50 key:1001,window[2020-11-15 21:20:40.000,2020-11-15 21:20:45.000),count:1,sum is:70
如果上游数据是windowedStream,则可以直接使用apply方法。process方法更底层一些,keyBy后就可以直接调用process()。
public static void main(String[] args) throws Exception { int port = 9001; String hostname = "192.168.174.136"; String delimiter = "\n"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); //连接socket获取输入数据 DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter); //字符串数据转换为实体类 data.map(new MapFunction<String, DriverMileages>() { @Override public DriverMileages map(String value) throws Exception { String[] split = value.split(","); DriverMileages driverMileages = new DriverMileages(); driverMileages.driverId = split[0]; driverMileages.currentMileage = Double.parseDouble(split[1]); driverMileages.timestamp = Long.parseLong(split[2]); return driverMileages; } }) .keyBy(DriverMileages::getDriverId) .timeWindow(Time.seconds(5)) .apply(new WindowFunction<DriverMileages, Object, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<DriverMileages> input, Collector<Object> out) throws Exception { Iterator<DriverMileages> iterator = input.iterator(); long total = 0; int count = 0; while (iterator.hasNext()) { DriverMileages next = iterator.next(); total += next.currentMileage; count++; } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); out.collect("key:" + s + ",window[" + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()) + "],count:" + count + ",sum is:" + total); } }) .print(); //启动计算任务 environment.execute("window demo1"); }
10)案例:自定义数据源,每10秒钟统计一下窗口内所有输入数据的最大值
public class MyDataSource implements SourceFunction<Integer> {
private boolean isRunning = true;
/**
* run方法里编写数据产生逻辑
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
SecureRandom secureRandom = new SecureRandom();
while (isRunning) {
int i = secureRandom.nextInt(1000);
ctx.collect(i);
System.out.println("source time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")) + ",value:" + i);
Thread.sleep(1000);//1秒钟生成一次数据
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public class WindowDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> dataStreamSource = env.addSource(new MyDataSource());
dataStreamSource
.timeWindowAll(Time.seconds(10))
.apply(new AllWindowFunction<Integer, Object, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Integer> values, Collector<Object> out) throws Exception {
List<Integer> items = new ArrayList<>();
Iterator<Integer> iterator = values.iterator();
while (iterator.hasNext()) {
items.add(iterator.next());
}
Collections.sort(items);
Integer max = items.get(items.size() - 1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
out.collect("window[" + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()) + "),max:" + max);
}
})
.print();
env.execute("window demo");
}
}
Flink中窗口时间的划分与数据无关,是系统定义好了的。Flink先按照自然时间将 window 划分,比如定义的 window 大小是 5秒,那么 1 分钟内会把 window 划分为如下的区间[左闭右开):
[00:00:00 00:00:05)
[00:00:05 00:00:10)
[00:00:10 00:00:15)
...
[00:00:55 00:01:00)
运行上述window api相关的demo,便会发现此特征。比如设置的window时间是5秒,则每次都是在05,10,15…60这些时间点触发窗口计算。并不是真正意义上的"每隔"5秒,如果数据达到时间很接近窗口边界,那么很快就会触发。
Flink中的时间语义
前文讲解window时我们提到了设置窗口时间,那么这个时间到底是什么时间呢?事件发生的时间还是数据被处理的时间呢?这就取决于在flink应用程序中设置的时间语义了。
在flink中,有3中不同的时间语义
- event time:事件创建的时间。比如用户点击按钮触发时。
- ingestion time:数据进入flink的时间。即经过一系列的网络传输,进入flink source的时间。
- processing time:flink执行operator操作时本地系统时间。从source进来到分配到TaskManager中的slot处理也是耗时的,比如数据重分区,因此存在理论上的时间差。即理论上processing time晚于ingestion time。
3种时间语义中Flink默认使用ProcessTime,实际业务开发中一般更关心事件发生的时间,即event time。
事件时间event time是数据本身携带的时间,需要从业务数据中提取。比如日志数据中,一般都会自带timestamp业务字段。
使用EventTime要在应用程序中显式告诉flink如何从输入数据中提取事件时间戳,否则应用程序启动报错。
相比Event Time ,使用Ingestion Time 时应用程序无法处理任何乱序事件或延迟数据,当然程序中也不必指定如何生成watermark,会自动分配时间戳和生成水印。
flink中设置流的时间特性:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
如果不设置时间特性,flink默认使用processing time:这是数据一致性和处理性能之间权衡的结果。
原文链接:https://download.csdn.net/blog/column/10645207/119920873 标签:窗口,String,Flink,window,Window,详解,split,driverMileages,DriverMileages From: https://www.cnblogs.com/sunny3158/p/18026066