首页 > 其他分享 >【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例 - 完整版

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例 - 完整版

时间:2024-01-04 12:37:01浏览次数:31  
标签:示例 sNo flink enterTime watermark Subway 完整版 userCount



文章目录

  • Flink 系列文章
  • 一、watermark介绍
  • 1、watermark介绍
  • 2、Watermark 策略简介
  • 3、使用 Watermark 策略
  • 4、处理空闲数据源
  • 5、自定义 WatermarkGenerator
  • 1)、自定义周期性 Watermark 生成器
  • 2)、自定义标记 Watermark 生成器
  • 6、Watermark 策略与 Kafka 连接器
  • 7、算子处理 Watermark 的方式
  • 二、maven依赖
  • 三、示例:每5s统计一次地铁进站每个入口人数
  • 1、实现
  • 1)、bean
  • 2)、实现
  • 2、验证
  • 四、示例:处理延迟数据超过能接受的时间,每10s统计一次地铁进站每个入口人数
  • 1、实现
  • 1)、java bean
  • 2)、实现
  • 2、验证
  • 五、示例-Flink 1.13.6版本:kafka数据源,每10s统计一次地铁进站每个入口人数
  • 1、maven依赖
  • 2、实现
  • 1)、java bean
  • 2)、实现
  • 3、验证
  • 1)、验证步骤
  • 2)、验证
  • 六、示例-Flink 1.17.0版本:kafka数据源,每10s统计一次地铁进站每个入口人数
  • 1、maven依赖
  • 2、实现
  • 1)、java bean
  • 2)、实现
  • 3、验证
  • 1)、验证步骤
  • 2)、验证



本文介绍了Flink WaterMark的基本情况、基本使用、kafka数据源的水印和超过最大延迟数据处理的示例,其中包含详细的验证步骤与验证结果。

本文除了maven依赖外,没有其他依赖。

本文需要依赖kafka、nc的环境可用。

一、watermark介绍

1、watermark介绍

watermark就是给数据再额外的加的一个时间列,watermark是个时间戳。

watermark = 数据的事件时间 - 最大允许的延迟时间或乱序时间

watermark = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间

这样可以保证watermark水位线会一直上升(变大),不会下降

窗口计算的触发条件为

  • 1、窗口中有数据
  • 2、watermark>= 窗口的结束时间

2、Watermark 策略简介

使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。

WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。

WatermarkStrategy 接口如下:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {

    // ------------------------------------------------------------------------
    //  实现者需要实现的方法.
    // ------------------------------------------------------------------------
    /** 实例化根据此策略生成水印的WatermarkGenerator. */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

    /**
     * 实例化{@link TimestampAssigner},用于根据此策略分配时间戳.
     */
    @Override
    default TimestampAssigner<T> createTimestampAssigner(
            TimestampAssignerSupplier.Context context) {
		//默认情况下,这是{@link RecordTimestampAssigner},用于记录来自具有有效时间戳的源的情况,例如来自Kafka。
        return new RecordTimestampAssigner<>();
    }

    @Experimental
    default WatermarkAlignmentParams getAlignmentParameters() {
        return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
    }

    // ------------------------------------------------------------------------
    //  用于丰富基础水印策略的生成器方法
    // ------------------------------------------------------------------------
    /**
     * 创建一个新的{@code WatermarkStrategy},该策略包装此策略,但使用给定的{@link TimestampAssigner}(通过{@linkTimestampassignerSupplier})
     *
     * <pre>
     * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
     *   .forMonotonousTimestamps()
     *   .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
     * }</pre>
     */
    default WatermarkStrategy<T> withTimestampAssigner(
            TimestampAssignerSupplier<T> timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
    }

    /**
     * 创建一个新的{@code WatermarkStrategy},该策略包装此策略,但使用给定的{@link SerializableTimestampAssigner}。
     *
     * <pre>
     * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
     *   .<CustomObject>forMonotonousTimestamps()
     *   .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
     * }</pre>
     */
    default WatermarkStrategy<T> withTimestampAssigner(
            SerializableTimestampAssigner<T> timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        return new WatermarkStrategyWithTimestampAssigner<>(
                this, TimestampAssignerSupplier.of(timestampAssigner));
    }

    /**
     *创建一个新的丰富的{@link WatermarkStrategy},该策略还可以在创建的{@linkWatermarkGenerator}中进行空闲检测。
     *
     * <p>Idleness can be important if some partitions have little data and might not have events
     * during some periods. Without idleness, these streams can stall the overall event time
     * progress of the application.
     */
    default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
    }

    /**
     * 创建一个新的{@link WatermarkStrategy},用于配置来自同一水印组中其他源/任务/分区的最大水印漂移。
     * 该组可能包含完全独立的来源(例如File和Kafka)。
     *
     * @param watermarkGroup A group of sources to align watermarks
     * @param maxAllowedWatermarkDrift Maximal drift, before we pause consuming from the
     *     source/task/partition
     */
    @Experimental
    default WatermarkStrategy<T> withWatermarkAlignment(
            String watermarkGroup, Duration maxAllowedWatermarkDrift) {
        return withWatermarkAlignment(
                watermarkGroup,
                maxAllowedWatermarkDrift,
                WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
    }

    /**
     * 创建一个新的{@link WatermarkStrategy},用于配置来自同一水印组中其他源/任务/分区的最大水印漂移。
     * 该组可能包含完全独立的来源(例如File和Kafka)。
     *
     * @param watermarkGroup A group of sources to align watermarks
     * @param maxAllowedWatermarkDrift Maximal drift, before we pause consuming from the
     *     source/task/partition
     * @param updateInterval How often tasks should notify coordinator about the current watermark
     *     and how often the coordinator should announce the maximal aligned watermark.
     */
    @Experimental
    default WatermarkStrategy<T> withWatermarkAlignment(
            String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
        return new WatermarksWithWatermarkAlignment<T>(
                this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
    }

    // ------------------------------------------------------------------------
    //  常用水印策略的方便方法
    // ------------------------------------------------------------------------
    /**
     * 为时间戳单调递增的情况创建水印策略。
     * @see AscendingTimestampsWatermarks
     */
    static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
        return (ctx) -> new AscendingTimestampsWatermarks<>();
    }

    /**
     * 为记录无序的情况创建水印策略,但可以设置事件无序程度的上限
     * @see BoundedOutOfOrdernessWatermarks
     */
    static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
    }

    /** 基于现有的{@link WatermarkGeneratorSupplier}创建水印策略. */
    static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
        return generatorSupplier::createWatermarkGenerator;
    }

    /**
     * 创建一个根本不生成水印的水印策略。这在进行纯处理基于时间的流处理的场景中可能很有用。
     */
    static <T> WatermarkStrategy<T> noWatermarks() {
        return (ctx) -> new NoWatermarksGenerator<>();
    }
}

通常情况下,不用实现此接口,而是可以使用 WatermarkStrategy 工具类中通用的 watermark 策略,或者可以使用这个工具类将自定义的 TimestampAssigner 与 WatermarkGenerator 进行绑定。

例如,想要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);
//其中 TimestampAssigner 的设置与否是可选的,大多数情况下,可以不用去特别指定。例如,当使用 Kafka 或 Kinesis 数据源时,你可以直接从 Kafka/Kinesis 数据源记录中获取到时间戳。

3、使用 Watermark 策略

WatermarkStrategy 可以在 Flink 应用程序中的两处使用:

  • 第一种是直接在数据源上使用,相比第二种会更好。因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着必须使用特定数据源接口,参考下文的kafka部分,以及有关每个分区的 watermark 是如何生成以及工作的。
  • 第二种是直接在非数据源的操作之后使用,仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

使用 WatermarkStrategy 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。

4、处理空闲数据源

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。

为了解决这个问题,可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。

WatermarkStrategy 为此提供了一个工具接口:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

5、自定义 WatermarkGenerator

可以针对每个事件去生成 watermark。但是由于每个 watermark 都会在下游做一些计算,因此过多的 watermark 会降低程序性能。

TimestampAssigner 是一个可以从事件数据中提取时间戳字段的简单函数,但是 WatermarkGenerator 的编写相对就要复杂一些了,将在接下来的两小节中介绍如何实现此接口。

WatermarkGenerator 接口代码如下:

/**
 * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
 *
 * <p><b>注意:</b>  WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} 
 * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性的调用,也许会生成新的 watermark,也许不会。
     *
     * <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}

watermark 的生成方式本质上是有两种:周期性生成和标记生成。

  • 周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark。
  • 标记生成器将查看 onEvent() 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark。

1)、自定义周期性 Watermark 生成器

周期性生成器会观察流事件数据并定期生成 watermark(其生成可能取决于流数据,或者完全基于处理时间)。

生成 watermark 的时间间隔(每 n 毫秒)可以通过 ExecutionConfig.setAutoWatermarkInterval(…) 指定。每次都会调用生成器的 onPeriodicEmit() 方法,如果返回的 watermark 非空且值大于前一个 watermark,则将发出新的 watermark。

如下是两个使用周期性 watermark 生成器的简单示例。

Flink 已经附带了 BoundedOutOfOrdernessWatermarks,它实现了 WatermarkGenerator,其工作原理与下面的 BoundedOutOfOrdernessGenerator 相似。可以在这里参阅如何使用它的内容。

/**
 * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
 * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
    private final long maxOutOfOrderness = 3500; // 3.5 秒
    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
    private final long maxTimeLag = 5000; // 5 秒

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // 处理时间场景下不需要实现
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}

2)、自定义标记 Watermark 生成器

标记 watermark 生成器观察流事件数据并在获取到带有 watermark 信息的特殊事件元素时发出 watermark。

如下是实现标记生成器的方法,当事件带有某个指定标记时,该生成器就会发出 watermark:

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // onEvent 中已经实现
    }
}

6、Watermark 策略与 Kafka 连接器

当使用 Apache Kafka 连接器作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)。然而,当使用 Kafka 数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式(这是 Kafka 消费客户端所固有的)。

在这种情况下,你可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。

例如,如果每个 Kafka 分区中的事件时间戳严格递增,则使用时间戳单调递增按分区生成的 watermark 将生成完美的全局 watermark。

注意,在示例中未使用 TimestampAssigner,而是使用了 Kafka 记录自身的时间戳。

  • Flink 1.13.6版本使用示例
//kafka数据源示例,没有使用withTimestampAssigner
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));

DataStream<MyType> stream = env.addSource(kafkaSource);

//非kafka数据源示例,使用了withTimestampAssigner
WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);
  • Flink 1.13.6版本使用示例
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("my-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(kafkaSource, 
									WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), 
									"mySource");

7、算子处理 Watermark 的方式

一般情况下,在将 watermark 转发到下游之前,需要算子对其进行触发的事件完全进行处理。

例如,WindowOperator 将首先计算该 watermark 触发的所有窗口数据,当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后,其才会被发送到下游。换句话说,由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出。

相同的规则也适用于 TwoInputStreamOperator。但是,在这种情况下,算子当前的 watermark 会取其两个输入的最小值。

二、maven依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
</dependencies>

三、示例:每5s统计一次地铁进站每个入口人数

每5s统计一次地铁进站每个入口人数,最多接受延迟3s的数据

数据结构:进站口、人数和进入时间

1、实现

1)、bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;
}

2)、实现

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 每10s统计一次地铁进站每个入口人数
 */
public class WatermarkDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No"+random.nextInt(3);
					int userCount = random.nextInt(100);
					// 模拟延迟数据
					long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后时间 " + df.format(subway.getEnterTime()));
					
					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 设置watermark= 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
		SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
				.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允许的延迟时间
				.withTimestampAssigner((subway, timestamp) -> subway.getEnterTime()));// 指定eventtime事件时间列

		// 计算窗口
		SingleOutputStreamOperator<Subway> result = subwayWithWatermark
				.keyBy(Subway::getSNo)
				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
				.sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();
	}

}

2、验证

输出结果,按照预期计算出了结果

Subway(sNo=No0, userCount=7, enterTime=1689148937760) ,格式化后时间 16:02:17
Subway(sNo=No1, userCount=93, enterTime=1689148933782) ,格式化后时间 16:02:13
Subway(sNo=No2, userCount=21, enterTime=1689148940783) ,格式化后时间 16:02:20
10> Subway(sNo=No1, userCount=93, enterTime=1689148933782)
Subway(sNo=No0, userCount=7, enterTime=1689148936784) ,格式化后时间 16:02:16
Subway(sNo=No0, userCount=53, enterTime=1689148944788) ,格式化后时间 16:02:24
10> Subway(sNo=No0, userCount=14, enterTime=1689148937760)
Subway(sNo=No2, userCount=66, enterTime=1689148944790) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=97, enterTime=1689148944803) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=79, enterTime=1689148946807) ,格式化后时间 16:02:26
Subway(sNo=No2, userCount=83, enterTime=1689148945821) ,格式化后时间 16:02:25
Subway(sNo=No0, userCount=84, enterTime=1689148941836) ,格式化后时间 16:02:21
Subway(sNo=No2, userCount=50, enterTime=1689148947852) ,格式化后时间 16:02:27
Subway(sNo=No1, userCount=10, enterTime=1689148942864) ,格式化后时间 16:02:22
Subway(sNo=No0, userCount=20, enterTime=1689148944866) ,格式化后时间 16:02:24
Subway(sNo=No2, userCount=1, enterTime=1689148945877) ,格式化后时间 16:02:25
Subway(sNo=No0, userCount=62, enterTime=1689148953888) ,格式化后时间 16:02:33
3> Subway(sNo=No2, userCount=184, enterTime=1689148940783)
10> Subway(sNo=No0, userCount=157, enterTime=1689148944788)
3> Subway(sNo=No2, userCount=213, enterTime=1689148946807)
10> Subway(sNo=No1, userCount=10, enterTime=1689148942864)
。。。。。。

四、示例:处理延迟数据超过能接受的时间,每10s统计一次地铁进站每个入口人数

每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据,超过可接受范围则另外接收。

一般而言,针对延迟超过计算窗口的数据处理方式不同,视具体的情况而定。

本示例仅仅是打印出来。

数据结构:进站口、人数和进入时间

1、实现

1)、java bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;

}

2)、实现

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class WatermarkLatenessDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No" + random.nextInt(3);
					int userCount = random.nextInt(100);
					// 模拟延迟数据
					long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后时间 " + df.format(subway.getEnterTime()));

					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 设置最大允许延迟时间3s
		SingleOutputStreamOperator<Subway> orderDSWithWatermark = subwayDS.assignTimestampsAndWatermarks(
				WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((subway, timestamp) -> subway.getEnterTime())// 指定事件时间列
		);

		// 接收延迟超过允许范围内的数据,供其他方式处理
		OutputTag<Subway> latenessData = new OutputTag<Subway>("seriousLateData", TypeInformation.of(Subway.class));

		SingleOutputStreamOperator<Subway> result1 = orderDSWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10)))
				.allowedLateness(Time.seconds(3)).sideOutputLateData(latenessData).sum("userCount");

		DataStream<Subway> result2 = result1.getSideOutput(latenessData);

		// sink
		result1.print("延迟(含正常)在计算窗口内数据");
		result2.print("延迟不在计算窗口内数据");

		// execute
		env.execute();

	}

}

2、验证

验证比较简单,就是由系统自己生成数据,然后观察应用程序的控制台的输出是否与预期一致,经验证,本示例与预期一致。

Subway(sNo=No1, userCount=44, enterTime=1689152768384) ,格式化后时间 17:06:08
Subway(sNo=No1, userCount=24, enterTime=1689152764407) ,格式化后时间 17:06:04
Subway(sNo=No1, userCount=29, enterTime=1689152774418) ,格式化后时间 17:06:14
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=68, enterTime=1689152768384)
Subway(sNo=No1, userCount=83, enterTime=1689152767430) ,格式化后时间 17:06:07
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=151, enterTime=1689152768384)
Subway(sNo=No0, userCount=19, enterTime=1689152759435) ,格式化后时间 17:05:59
延迟不在计算窗口内数据:10> Subway(sNo=No0, userCount=19, enterTime=1689152759435)
Subway(sNo=No1, userCount=32, enterTime=1689152760443) ,格式化后时间 17:06:00
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=183, enterTime=1689152768384)
Subway(sNo=No0, userCount=56, enterTime=1689152775459) ,格式化后时间 17:06:15
Subway(sNo=No1, userCount=12, enterTime=1689152778472) ,格式化后时间 17:06:18
Subway(sNo=No1, userCount=26, enterTime=1689152776475) ,格式化后时间 17:06:16
Subway(sNo=No0, userCount=49, enterTime=1689152772488) ,格式化后时间 17:06:12
Subway(sNo=No2, userCount=93, enterTime=1689152767500) ,格式化后时间 17:06:07
延迟不在计算窗口内数据:3> Subway(sNo=No2, userCount=93, enterTime=1689152767500)
Subway(sNo=No1, userCount=26, enterTime=1689152779502) ,格式化后时间 17:06:19
Subway(sNo=No2, userCount=63, enterTime=1689152782512) ,格式化后时间 17:06:22
Subway(sNo=No1, userCount=73, enterTime=1689152775526) ,格式化后时间 17:06:15
Subway(sNo=No0, userCount=46, enterTime=1689152783539) ,格式化后时间 17:06:23
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=105, enterTime=1689152775459)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=166, enterTime=1689152774418)
Subway(sNo=No2, userCount=72, enterTime=1689152779552) ,格式化后时间 17:06:19
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=72, enterTime=1689152779552)
Subway(sNo=No0, userCount=43, enterTime=1689152774553) ,格式化后时间 17:06:14
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=148, enterTime=1689152775459)
Subway(sNo=No2, userCount=1, enterTime=1689152781567) ,格式化后时间 17:06:21
Subway(sNo=No2, userCount=49, enterTime=1689152775582) ,格式化后时间 17:06:15
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=121, enterTime=1689152779552)
Subway(sNo=No1, userCount=4, enterTime=1689152782591) ,格式化后时间 17:06:22
Subway(sNo=No2, userCount=79, enterTime=1689152778604) ,格式化后时间 17:06:18
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=200, enterTime=1689152779552)
Subway(sNo=No2, userCount=11, enterTime=1689152794608) ,格式化后时间 17:06:34
延迟(含正常)在计算窗口内数据:3> Subway(sNo=No2, userCount=64, enterTime=1689152782512)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=4, enterTime=1689152782591)
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=46, enterTime=1689152783539)
Subway(sNo=No1, userCount=47, enterTime=1689152784620) ,格式化后时间 17:06:24
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No1, userCount=51, enterTime=1689152782591)
Subway(sNo=No0, userCount=12, enterTime=1689152788634) ,格式化后时间 17:06:28
延迟(含正常)在计算窗口内数据:10> Subway(sNo=No0, userCount=58, enterTime=1689152783539)
Subway(sNo=No0, userCount=71, enterTime=1689152790634) ,格式化后时间 17:06:30
Subway(sNo=No0, userCount=83, enterTime=1689152799635) ,格式化后时间 17:06:39
Subway(sNo=No0, userCount=11, enterTime=1689152799649) ,格式化后时间 17:06:39
Subway(sNo=No1, userCount=77, enterTime=1689152786650) ,格式化后时间 17:06:26
延迟不在计算窗口内数据:10> Subway(sNo=No1, userCount=77, enterTime=1689152786650)
Subway(sNo=No0, userCount=6, enterTime=1689152802662) ,格式化后时间 17:06:42
Subway(sNo=No1, userCount=87, enterTime=1689152791668) ,格式化后时间 17:06:31

五、示例-Flink 1.13.6版本:kafka数据源,每10s统计一次地铁进站每个入口人数

注意该示例中,发送数据时不需要eventtime,flink会以kafka发送数据的时间戳为eventtime,也不需要withTimestampAssigner指定eventtime,详见实现源码。

每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据

数据结构:进站口、人数

1、maven依赖

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.13.6</flink.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-scala_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-scala_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java_2.11</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- flink连接器 -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-sql-connector-kafka_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-kafka</artifactId>
		<version>${flink.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
	</dependency>

	<!-- 日志 -->
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
		<version>1.7.7</version>
		<scope>runtime</scope>
	</dependency>
	<dependency>
		<groupId>log4j</groupId>
		<artifactId>log4j</artifactId>
		<version>1.2.17</version>
		<scope>runtime</scope>
	</dependency>

</dependencies>

2、实现

1)、java bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
}

2)、实现

import java.time.Duration;
import java.util.Properties;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author alanchan
 *
 */
public class KafkaWatermarkDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		// source
		// 准备kafka连接参数
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "server1:9092");
		props.setProperty("group.id", "flink");
		props.setProperty("auto.offset.reset", "latest");
		props.setProperty("flink.partition-discovery.interval-millis", "5000");
		props.setProperty("enable.auto.commit", "true");
		props.setProperty("auto.commit.interval.ms", "2000");
		// 使用连接参数创建FlinkKafkaConsumer/kafkaSource
		FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("t_kafkasource", new SimpleStringSchema(), props);
		// 使用kafkaSource
		DataStream<Subway> subwayDS = env.addSource(kafkaSource).map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");

				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// transformation
		// 设置watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
		SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
		.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允许的延迟时间
		);

		// 计算窗口
		SingleOutputStreamOperator<Subway> result = subwayWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();

	}

}

3、验证

1)、验证步骤

1、启动kafka,创建topic
2、启动应用程序
3、在kafka命令行中输入符合格式要求的数据
4、观察应用程序的控制台输出

2)、验证

启动kafka、创建topic、启动应用程序不再赘述,如果不清楚的参考本人kafka专栏。
1、在kafka命令控制台输入数据

kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource

[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6

2、观察应用程序中的控制台输出

7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)

六、示例-Flink 1.17.0版本:kafka数据源,每10s统计一次地铁进站每个入口人数

注意该示例中,发送数据时不需要eventtime,flink会以kafka发送数据的时间戳为eventtime,也不需要withTimestampAssigner指定eventtime,详见实现源码。

每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据

数据结构:进站口、人数

1、maven依赖

<!-- flink连接器 -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-sql-connector-kafka_2.12</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka</artifactId>
	<version>${flink.version}</version>
</dependency>

2、实现

1)、java bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
}

2)、实现

import java.time.Duration;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.datastreamapi.watermarker.bean.Subway;

/**
 * @author alanchan
 *
 */
public class TestKafkaWatermarkDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// 1、env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 2、 source
		KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
				.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
				.setTopics("alan_kafkasource")
				.setGroupId("flink_kafka")
				.setStartingOffsets(OffsetsInitializer.earliest())
				.setValueOnlyDeserializer(new SimpleStringSchema())
				.build();

		// 3、 transformation
		// 设置watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
		// 使用kafkaSource
		DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "Kafka Source");

		DataStream<Subway> subwayDS = kafkaDS.map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 计算窗口
		SingleOutputStreamOperator<Subway> result = subwayDS.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");

		// 4、 sink
		result.print();

		// 5、execute
		env.execute();

	}

}

3、验证

1)、验证步骤

1、启动kafka,创建topic
2、启动应用程序
3、在kafka命令行中输入符合格式要求的数据
4、观察应用程序的控制台输出

2)、验证

启动kafka、创建topic、启动应用程序不再赘述,如果不清楚的参考本人kafka专栏。

1、在kafka命令控制台输入数据

kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource

[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6

2、观察应用程序中的控制台输出

7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)

以上,本文介绍了Flink WaterMark的基本情况、基本使用、kafka数据源的水印和超过最大延迟数据处理的示例,其中包含详细的验证步骤与验证结果。


标签:示例,sNo,flink,enterTime,watermark,Subway,完整版,userCount
From: https://blog.51cto.com/alanchan2win/9098733

相关文章

  • 构建高效外卖配送系统:技术要点与实际代码示例
    随着外卖服务需求的不断增长,构建一个智能化、高效的外卖配送系统成为餐饮业务成功的关键。在本文中,我们将重新审视外卖配送系统,着重思考技术架构,并提供一些实际代码示例,以展示系统中一些先进的技术要点。技术架构设计一个现代的外卖配送系统应该具备以下关键特性:实时配送调度、智能......
  • 构建高效外卖配送系统:技术要点与示例代码
    随着外卖服务的普及,构建一个高效的外卖配送系统成为餐饮业务成功的关键。在这篇文章中,我们将探讨外卖配送系统的关键技术要点,并提供一些示例代码,演示其中的一些实现方法。1.订单处理与管理在外卖配送系统中,订单处理是一个核心环节。以下是一个简化的订单类的示例代码,用Python语言......
  • 汽车之家车型车系配置参数采集示例
    汽车之家是一个提供车型信息的网站,如果您想采集车型、车系和配置参数等信息,可以使用网络抓取技术。以下是一个简单的示例,使用Python语言和BeautifulSoup库进行汽车之家车型车系配置参数的基本数据采集。请注意,这个示例只是一个入门级的例子,实际情况可能需要更多的处理和细化。i......
  • netty: LengthFieldBasedFrameDecoder的用法示例
    一、服务器端启动类:packagecn.edu.tju;importio.netty.bootstrap.ServerBootstrap;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketCh......
  • 【C++】STL 容器 - stack 堆栈容器 ① ( stack 堆栈容器特点 | stack 堆栈容器与 dequ
    文章目录一、stack堆栈容器简介1、stack堆栈容器引入2、stack堆栈容器特点3、stack堆栈容器与deque双端数组容器对比二、代码示例-stack堆栈容器简单示例1、代码示例2、执行结果一、stack堆栈容器简介1、stack堆栈容器引入C++语言中的STL标准模板库中的stac......
  • 上传到东萍象棋网大师对局的UBB示例【重要提示:同一盘棋千万不要书写重复字段内容】
    [DhtmlXQ][DhtmlXQ_ver]www_dpxq_com[/DhtmlXQ_ver][DhtmlXQ_title]北京刘永富和北京左治[/DhtmlXQ_title][DhtmlXQ_event]鑫聚缘大奖赛[/DhtmlXQ_event][DhtmlXQ_date]2023/12/30[/DhtmlXQ_date][DhtmlXQ_init]800,600[/DhtmlXQ_init][DhtmlXQ_result]和局[/DhtmlXQ_......
  • JavaScript圆形转多边形经纬度数组算法及示例
    前言在地理信息系统(GIS)和地图应用中,有时需要将圆形区域表示为多边形的经纬度数组对象。本文将介绍如何使用JavaScript实现圆形转多边形经纬度数组的算法,并提供一个示例来演示其用法。概述圆形转多边形经纬度数组的算法的目标是将给定的圆形区域表示为多边形的经纬度数组对象。这......
  • Python NumPy 生成随机数的方法及示例
    ​ NumPy是一个强大的库,用于数值计算,包括生成各种随机数。可以使用random.rand()、random.randn()、random.randint()、random.uniform()、random.normal()和random.seed()函数方法生成随机数。本文介绍生成随机数的方法,以及相关的示例代码。1、numpy.random.rand()numpy.ra......
  • 二进制、位运算和掩码运算、如何取某几位掩码,小白鼠测试示例
    1.二进制二进制是一种基于两个数字0和1的数制系统。它可以表示两种状态,即开和关。所有输入电脑的任何信息最终都要转化为二进制。目前通用的是ASCII码。最基本的单位为bit。在计算机科学中,二进制是最常用的数制系统,因为计算机内部的所有数据都是以二进制形式存储和处理的。在二......
  • Flink侧输出流解析
    在实时数据处理领域,ApacheFlink已成为一个不可或缺的工具。它以其高吞吐量和低延迟处理能力而闻名。而在Flink的众多特性中,侧输出流(SideOutputs)提供了一种灵活的方式来处理复杂的数据流。本文将探讨如何在Flink的ScalaAPI中有效使用侧输出流。1.侧输出流的基本概念侧......