文章目录
- Flink 系列文章
- 一、maven依赖
- 二、示例:基于数量的滚动窗口与滑动窗口
- 1、滚动窗口实现地铁进站口人数
- 1)、实现
- 2)、验证步骤
- 2、滑动窗口实现地铁进站口人数
- 1)、实现
- 2)、验证步骤
- 三、示例:会话窗口
- 1、实现
- 2、验证步骤
本文介绍了Flink window基于数量的滚动、滑动窗口和会话窗口示例,其中包含详细的验证步骤与验证结果。
一、maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
二、示例:基于数量的滚动窗口与滑动窗口
1、滚动窗口实现地铁进站口人数
实现统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
本示例仅以面向对象方式实现,一般实现在具体的开发中视情况而定。
1)、实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author alanchan
* 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
*/
public class TumblingCountWindowDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
SingleOutputStreamOperator<Subway> subwayDS = lines.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(Subway::getNo);
KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(new KeySelector<Subway, String>() {
@Override
public String getKey(Subway value) throws Exception {
return value.getNo();
}
});
// 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
SingleOutputStreamOperator<Subway> result1 = keyedDS.countWindow(5).sum("userCount");
// sink
result1.print();
// execute
env.execute();
}
}
2)、验证步骤
1、启动nc
nc -lk 9999
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
1,5
1,6
1,2
1,4
2,4
3,6
23,11
1,8
4、查看应用程序控制台输出
通过查看输出结果与预期一致。
2、滑动窗口实现地铁进站口人数
统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现3次进行统计
本示例仅以面向对象方式实现,一般实现在具体的开发中视情况而定。
1)、实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author alanchan
*
*/
public class SlidingCountWindowDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
SingleOutputStreamOperator<Subway> subwayDS = lines.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(Subway::getNo);
KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(new KeySelector<Subway, String>() {
@Override
public String getKey(Subway value) throws Exception {
return value.getNo();
}
});
// 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现3次进行统计
// public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
// return window(GlobalWindows.create())
// .evictor(CountEvictor.of(size))
// .trigger(CountTrigger.of(slide));
// }
SingleOutputStreamOperator<Subway> result1 = keyedDS.countWindow(5, 3).sum("userCount");
// sink
result1.print();
// execute
env.execute();
}
}
2)、验证步骤
1、启动nc
nc -lk 9999
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
2,3
3,4
1,2
4、查看应用程序控制台输出
通过查看输出结果与预期一致。
三、示例:会话窗口
实现设置会话超时时间为10s,如果上一个窗口有数据,10s内没有数据则触发上个窗口的计算。
1、实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
*
*/
public class TimeSessionWindowsDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
SingleOutputStreamOperator<Subway> carDS = lines.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
//KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(Subway::getNo);
KeyedStream<Subway, String> keyedDS = carDS.keyBy(new KeySelector<Subway, String>() {
@Override
public String getKey(Subway value) throws Exception {
return value.getNo();
}
});
// 设置会话超时时间为10s,10s内没有数据则触发上个窗口的计算,如果上一个窗口有数据
SingleOutputStreamOperator<Subway> result = keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum("userCount");
// sink
result.print();
// execute
env.execute();
}
}
2、验证步骤
1、启动nc
nc -lk 9999
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
2,3
3,4
1,2
4、查看应用程序控制台输出
通过查看输出结果与预期一致。
以上,本文介绍了Flink window基于数量的滚动、滑动窗口和会话窗口示例,其中包含详细的验证步骤与验证结果。
标签:窗口,示例,flink,streaming,api,import,apache,org From: https://blog.51cto.com/alanchan2win/9013132