文章目录
- Flink 系列文章
- 一、maven依赖
- 二、示例:每5s统计一次地铁进站每个入口人数
- 1、实现
- 1)、bean
- 2)、实现
- 2、验证
- 三、示例:处理延迟数据超过能接受的时间,每10s统计一次地铁进站每个入口人数
- 1、实现
- 1)、java bean
- 2)、实现
- 2、验证
本文介绍了Flink WaterMark的基本用法以及超过最大延迟允许时间的数据处理示例,其中包含详细的验证步骤与验证结果。
本文除了maven依赖外,没有其他依赖。
一、maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
</dependencies>
二、示例:每5s统计一次地铁进站每个入口人数
每5s统计一次地铁进站每个入口人数,最多接受延迟3s的数据
数据结构:进站口、人数和进入时间
1、实现
1)、bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
private String sNo;
private Integer userCount;
private Long enterTime;
}
2)、实现
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
* 每10s统计一次地铁进站每个入口人数
*/
public class WatermarkDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
private boolean flag = true;
@Override
public void run(SourceContext<Subway> ctx) throws Exception {
Random random = new Random();
while (flag) {
String sNo = "No"+random.nextInt(3);
int userCount = random.nextInt(100);
// 模拟延迟数据
long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
Subway subway = new Subway(sNo, userCount, eventTime);
System.err.println(subway + " ,格式化后时间 " + df.format(subway.getEnterTime()));
ctx.collect(subway);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
});
// transformation
// 设置watermark= 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允许的延迟时间
.withTimestampAssigner((subway, timestamp) -> subway.getEnterTime()));// 指定eventtime事件时间列
// 计算窗口
SingleOutputStreamOperator<Subway> result = subwayWithWatermark
.keyBy(Subway::getSNo)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("userCount");
// sink
result.print();
// execute
env.execute();
}
}
2、验证
输出结果,按照预期计算出了结果
Subway(sNo=No0, userCount=7, enterTime=1689148937760) ,格式化后时间 16:02:17
Subway(sNo=No1, userCount=93, enterTime=1689148933782) ,格式化后时间 16:02:13
Subway(sNo=No2, userCount=21, enterTime=1689148940783) ,格式化后时间 16:02:20
10> Subway(sNo=No1, userCount=93, enterTime=1689148933782)
Subway(sNo=No0, userCount=7, enterTime=1689148936784) ,格式化后时间 16:02:16
Subway(sNo=No0, userCount=53, enterTime=1689148944788) ,格式化后时间 16:02:24
10> Subway(sNo=No0, userCount=14, enterTime=1689148937760)
Subway(sNo=No2, userCount=66, enterTime=1689148944790) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=97, enterTime=1689148944803) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=79, enterTime=1689148946807) ,格式化后时间 16:02:26
Subway(sNo=No2, userCount=83, enterTime=1689148945821) ,格式化后时间 16:02:25
Subway(sNo=No0, userCount=84, enterTime=1689148941836) ,格式化后时间 16:02:21
Subway(sNo=No2, userCount=50, enterTime=1689148947852) ,格式化后时间 16:02:27
Subway(sNo=No1, userCount=10, enterTime=1689148942864) ,格式化后时间 16:02:22
Subway(sNo=No0, userCount=20, enterTime=1689148944866) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=1, enterTime=1689148945877) ,格式化后时间 16:02:25
Subway(sNo=No0, userCount=62, enterTime=1689148953888) ,格式化后时间 16:02:33
3> Subway(sNo=No2, userCount=184, enterTime=1689148940783)
10> Subway(sNo=No0, userCount=157, enterTime=1689148944788)
3> Subway(sNo=No2, userCount=213, enterTime=1689148946807)
10> Subway(sNo=No1, userCount=10, enterTime=1689148942864)
。。。。。。
三、示例:处理延迟数据超过能接受的时间,每10s统计一次地铁进站每个入口人数
每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据,超过可接受范围则另外接收。
一般而言,针对延迟超过计算窗口的数据处理方式不同,视具体的情况而定。
本示例仅仅是打印出来。
数据结构:进站口、人数和进入时间
1、实现
1)、java bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
private String sNo;
private Integer userCount;
private Long enterTime;
}
2)、实现
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
*
*/
public class WatermarkLatenessDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
private boolean flag = true;
@Override
public void run(SourceContext<Subway> ctx) throws Exception {
Random random = new Random();
while (flag) {
String sNo = "No" + random.nextInt(3);
int userCount = random.nextInt(100);
// 模拟延迟数据
long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;
Subway subway = new Subway(sNo, userCount, eventTime);
System.err.println(subway + " ,格式化后时间 " + df.format(subway.getEnterTime()));
ctx.collect(subway);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
});
// transformation
// 设置最大允许延迟时间3s
SingleOutputStreamOperator<Subway> orderDSWithWatermark = subwayDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((subway, timestamp) -> subway.getEnterTime())// 指定事件时间列
);
// 接收延迟超过允许范围内的数据,供其他方式处理
OutputTag<Subway> latenessData = new OutputTag<Subway>("seriousLateData", TypeInformation.of(Subway.class));
SingleOutputStreamOperator<Subway> result1 = orderDSWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(3)).sideOutputLateData(latenessData).sum("userCount");
DataStream<Subway> result2 = result1.getSideOutput(latenessData);
// sink
result1.print("延迟(含正常)在计算窗口内数据");
result2.print("延迟不在计算窗口内数据");
// execute
env.execute();
}
}
2、验证
验证比较简单,就是由系统自己生成数据,然后观察应用程序的控制台的输出是否与预期一致,经验证,本示例与预期一致。
Subway(sNo=No1, userCount=44, enterTime=1689152768384) ,格式化后时间 17:06:08
Subway(sNo=No1, userCount=24, enterTime=1689152764407) ,格式化后时间 17:06:04
Subway(sNo=No1, userCount=29, enterTime=1689152774418) ,格式化后时间 17:06:14
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=68, enterTime=1689152768384)
Subway(sNo=No1, userCount=83, enterTime=1689152767430) ,格式化后时间 17:06:07
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=151, enterTime=1689152768384)
Subway(sNo=No0, userCount=19, enterTime=1689152759435) ,格式化后时间 17:05:59
延迟不在计算窗口内数据:10> Subway(sNo=No0, userCount=19, enterTime=1689152759435)
Subway(sNo=No1, userCount=32, enterTime=1689152760443) ,格式化后时间 17:06:00
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=183, enterTime=1689152768384)
Subway(sNo=No0, userCount=56, enterTime=1689152775459) ,格式化后时间 17:06:15
Subway(sNo=No1, userCount=12, enterTime=1689152778472) ,格式化后时间 17:06:18
Subway(sNo=No1, userCount=26, enterTime=1689152776475) ,格式化后时间 17:06:16
Subway(sNo=No0, userCount=49, enterTime=1689152772488) ,格式化后时间 17:06:12
Subway(sNo=No2, userCount=93, enterTime=1689152767500) ,格式化后时间 17:06:07
延迟不在计算窗口内数据:3> Subway(sNo=No2, userCount=93, enterTime=1689152767500)
Subway(sNo=No1, userCount=26, enterTime=1689152779502) ,格式化后时间 17:06:19
Subway(sNo=No2, userCount=63, enterTime=1689152782512) ,格式化后时间 17:06:22
Subway(sNo=No1, userCount=73, enterTime=1689152775526) ,格式化后时间 17:06:15
Subway(sNo=No0, userCount=46, enterTime=1689152783539) ,格式化后时间 17:06:23
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=105, enterTime=1689152775459)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=166, enterTime=1689152774418)
Subway(sNo=No2, userCount=72, enterTime=1689152779552) ,格式化后时间 17:06:19
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=72, enterTime=1689152779552)
Subway(sNo=No0, userCount=43, enterTime=1689152774553) ,格式化后时间 17:06:14
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=148, enterTime=1689152775459)
Subway(sNo=No2, userCount=1, enterTime=1689152781567) ,格式化后时间 17:06:21
Subway(sNo=No2, userCount=49, enterTime=1689152775582) ,格式化后时间 17:06:15
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=121, enterTime=1689152779552)
Subway(sNo=No1, userCount=4, enterTime=1689152782591) ,格式化后时间 17:06:22
Subway(sNo=No2, userCount=79, enterTime=1689152778604) ,格式化后时间 17:06:18
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=200, enterTime=1689152779552)
Subway(sNo=No2, userCount=11, enterTime=1689152794608) ,格式化后时间 17:06:34
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=64, enterTime=1689152782512)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=4, enterTime=1689152782591)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=46, enterTime=1689152783539)
Subway(sNo=No1, userCount=47, enterTime=1689152784620) ,格式化后时间 17:06:24
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=51, enterTime=1689152782591)
Subway(sNo=No0, userCount=12, enterTime=1689152788634) ,格式化后时间 17:06:28
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=58, enterTime=1689152783539)
Subway(sNo=No0, userCount=71, enterTime=1689152790634) ,格式化后时间 17:06:30
Subway(sNo=No0, userCount=83, enterTime=1689152799635) ,格式化后时间 17:06:39
Subway(sNo=No0, userCount=11, enterTime=1689152799649) ,格式化后时间 17:06:39
Subway(sNo=No1, userCount=77, enterTime=1689152786650) ,格式化后时间 17:06:26
延迟不在计算窗口内数据:10> Subway(sNo=No1, userCount=77, enterTime=1689152786650)
Subway(sNo=No0, userCount=6, enterTime=1689152802662) ,格式化后时间 17:06:42
Subway(sNo=No1, userCount=87, enterTime=1689152791668) ,格式化后时间 17:06:31
以上,本文介绍了Flink WaterMark的基本用法以及超过最大延迟允许时间的数据处理示例,其中包含详细的验证步骤与验证结果。
标签:格式化,示例,sNo,flink,enterTime,Subway,延迟,userCount From: https://blog.51cto.com/alanchan2win/9013122