首页 > 其他分享 >【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(2) - 基本使用和超过最大延迟数据处理

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(2) - 基本使用和超过最大延迟数据处理

时间:2023-12-28 13:38:18浏览次数:35  
标签:格式化 示例 sNo flink enterTime Subway 延迟 userCount




文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、示例:每5s统计一次地铁进站每个入口人数
  • 1、实现
  • 1)、bean
  • 2)、实现
  • 2、验证
  • 三、示例:处理延迟数据超过能接受的时间,每10s统计一次地铁进站每个入口人数
  • 1、实现
  • 1)、java bean
  • 2)、实现
  • 2、验证



本文介绍了Flink WaterMark的基本用法以及超过最大延迟允许时间的数据处理示例,其中包含详细的验证步骤与验证结果。

本文除了maven依赖外,没有其他依赖。

一、maven依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
</dependencies>

二、示例:每5s统计一次地铁进站每个入口人数

每5s统计一次地铁进站每个入口人数,最多接受延迟3s的数据

数据结构:进站口、人数和进入时间

1、实现

1)、bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;
}

2)、实现

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 每10s统计一次地铁进站每个入口人数
 */
public class WatermarkDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No"+random.nextInt(3);
					int userCount = random.nextInt(100);
					// 模拟延迟数据
					long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后时间 " + df.format(subway.getEnterTime()));
					
					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 设置watermark= 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
		SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
				.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允许的延迟时间
				.withTimestampAssigner((subway, timestamp) -> subway.getEnterTime()));// 指定eventtime事件时间列

		// 计算窗口
		SingleOutputStreamOperator<Subway> result = subwayWithWatermark
				.keyBy(Subway::getSNo)
				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
				.sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();
	}

}

2、验证

输出结果,按照预期计算出了结果

Subway(sNo=No0, userCount=7, enterTime=1689148937760) ,格式化后时间 16:02:17
Subway(sNo=No1, userCount=93, enterTime=1689148933782) ,格式化后时间 16:02:13
Subway(sNo=No2, userCount=21, enterTime=1689148940783) ,格式化后时间 16:02:20
10> Subway(sNo=No1, userCount=93, enterTime=1689148933782)
Subway(sNo=No0, userCount=7, enterTime=1689148936784) ,格式化后时间 16:02:16
Subway(sNo=No0, userCount=53, enterTime=1689148944788) ,格式化后时间 16:02:24
10> Subway(sNo=No0, userCount=14, enterTime=1689148937760)
Subway(sNo=No2, userCount=66, enterTime=1689148944790) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=97, enterTime=1689148944803) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=79, enterTime=1689148946807) ,格式化后时间 16:02:26
Subway(sNo=No2, userCount=83, enterTime=1689148945821) ,格式化后时间 16:02:25
Subway(sNo=No0, userCount=84, enterTime=1689148941836) ,格式化后时间 16:02:21
Subway(sNo=No2, userCount=50, enterTime=1689148947852) ,格式化后时间 16:02:27
Subway(sNo=No1, userCount=10, enterTime=1689148942864) ,格式化后时间 16:02:22
Subway(sNo=No0, userCount=20, enterTime=1689148944866) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=1, enterTime=1689148945877) ,格式化后时间 16:02:25
Subway(sNo=No0, userCount=62, enterTime=1689148953888) ,格式化后时间 16:02:33
3> Subway(sNo=No2, userCount=184, enterTime=1689148940783)
10> Subway(sNo=No0, userCount=157, enterTime=1689148944788)
3> Subway(sNo=No2, userCount=213, enterTime=1689148946807)
10> Subway(sNo=No1, userCount=10, enterTime=1689148942864)
。。。。。。

三、示例:处理延迟数据超过能接受的时间,每10s统计一次地铁进站每个入口人数

每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据,超过可接受范围则另外接收。

一般而言,针对延迟超过计算窗口的数据处理方式不同,视具体的情况而定。

本示例仅仅是打印出来。

数据结构:进站口、人数和进入时间

1、实现

1)、java bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;

}

2)、实现

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class WatermarkLatenessDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No" + random.nextInt(3);
					int userCount = random.nextInt(100);
					// 模拟延迟数据
					long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后时间 " + df.format(subway.getEnterTime()));

					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 设置最大允许延迟时间3s
		SingleOutputStreamOperator<Subway> orderDSWithWatermark = subwayDS.assignTimestampsAndWatermarks(
				WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((subway, timestamp) -> subway.getEnterTime())// 指定事件时间列
		);

		// 接收延迟超过允许范围内的数据,供其他方式处理
		OutputTag<Subway> latenessData = new OutputTag<Subway>("seriousLateData", TypeInformation.of(Subway.class));

		SingleOutputStreamOperator<Subway> result1 = orderDSWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10)))
				.allowedLateness(Time.seconds(3)).sideOutputLateData(latenessData).sum("userCount");

		DataStream<Subway> result2 = result1.getSideOutput(latenessData);

		// sink
		result1.print("延迟(含正常)在计算窗口内数据");
		result2.print("延迟不在计算窗口内数据");

		// execute
		env.execute();

	}

}

2、验证

验证比较简单,就是由系统自己生成数据,然后观察应用程序的控制台的输出是否与预期一致,经验证,本示例与预期一致。

Subway(sNo=No1, userCount=44, enterTime=1689152768384) ,格式化后时间 17:06:08
Subway(sNo=No1, userCount=24, enterTime=1689152764407) ,格式化后时间 17:06:04
Subway(sNo=No1, userCount=29, enterTime=1689152774418) ,格式化后时间 17:06:14
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=68, enterTime=1689152768384)
Subway(sNo=No1, userCount=83, enterTime=1689152767430) ,格式化后时间 17:06:07
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=151, enterTime=1689152768384)
Subway(sNo=No0, userCount=19, enterTime=1689152759435) ,格式化后时间 17:05:59
延迟不在计算窗口内数据:10> Subway(sNo=No0, userCount=19, enterTime=1689152759435)
Subway(sNo=No1, userCount=32, enterTime=1689152760443) ,格式化后时间 17:06:00
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=183, enterTime=1689152768384)
Subway(sNo=No0, userCount=56, enterTime=1689152775459) ,格式化后时间 17:06:15
Subway(sNo=No1, userCount=12, enterTime=1689152778472) ,格式化后时间 17:06:18
Subway(sNo=No1, userCount=26, enterTime=1689152776475) ,格式化后时间 17:06:16
Subway(sNo=No0, userCount=49, enterTime=1689152772488) ,格式化后时间 17:06:12
Subway(sNo=No2, userCount=93, enterTime=1689152767500) ,格式化后时间 17:06:07
延迟不在计算窗口内数据:3> Subway(sNo=No2, userCount=93, enterTime=1689152767500)
Subway(sNo=No1, userCount=26, enterTime=1689152779502) ,格式化后时间 17:06:19
Subway(sNo=No2, userCount=63, enterTime=1689152782512) ,格式化后时间 17:06:22
Subway(sNo=No1, userCount=73, enterTime=1689152775526) ,格式化后时间 17:06:15
Subway(sNo=No0, userCount=46, enterTime=1689152783539) ,格式化后时间 17:06:23
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=105, enterTime=1689152775459)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=166, enterTime=1689152774418)
Subway(sNo=No2, userCount=72, enterTime=1689152779552) ,格式化后时间 17:06:19
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=72, enterTime=1689152779552)
Subway(sNo=No0, userCount=43, enterTime=1689152774553) ,格式化后时间 17:06:14
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=148, enterTime=1689152775459)
Subway(sNo=No2, userCount=1, enterTime=1689152781567) ,格式化后时间 17:06:21
Subway(sNo=No2, userCount=49, enterTime=1689152775582) ,格式化后时间 17:06:15
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=121, enterTime=1689152779552)
Subway(sNo=No1, userCount=4, enterTime=1689152782591) ,格式化后时间 17:06:22
Subway(sNo=No2, userCount=79, enterTime=1689152778604) ,格式化后时间 17:06:18
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=200, enterTime=1689152779552)
Subway(sNo=No2, userCount=11, enterTime=1689152794608) ,格式化后时间 17:06:34
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=64, enterTime=1689152782512)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=4, enterTime=1689152782591)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=46, enterTime=1689152783539)
Subway(sNo=No1, userCount=47, enterTime=1689152784620) ,格式化后时间 17:06:24
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=51, enterTime=1689152782591)
Subway(sNo=No0, userCount=12, enterTime=1689152788634) ,格式化后时间 17:06:28
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=58, enterTime=1689152783539)
Subway(sNo=No0, userCount=71, enterTime=1689152790634) ,格式化后时间 17:06:30
Subway(sNo=No0, userCount=83, enterTime=1689152799635) ,格式化后时间 17:06:39
Subway(sNo=No0, userCount=11, enterTime=1689152799649) ,格式化后时间 17:06:39
Subway(sNo=No1, userCount=77, enterTime=1689152786650) ,格式化后时间 17:06:26
延迟不在计算窗口内数据:10> Subway(sNo=No1, userCount=77, enterTime=1689152786650)
Subway(sNo=No0, userCount=6, enterTime=1689152802662) ,格式化后时间 17:06:42
Subway(sNo=No1, userCount=87, enterTime=1689152791668) ,格式化后时间 17:06:31

以上,本文介绍了Flink WaterMark的基本用法以及超过最大延迟允许时间的数据处理示例,其中包含详细的验证步骤与验证结果。

标签:格式化,示例,sNo,flink,enterTime,Subway,延迟,userCount
From: https://blog.51cto.com/alanchan2win/9013122

相关文章