首页 > 其他分享 >Flink中的创建Watermark水位线

Flink中的创建Watermark水位线

时间:2023-02-19 11:44:14浏览次数:51  
标签:WatermarkStrategy Watermark Flink Event 水位 new import public

在Flink中,水位线可大致分为乱序流水位线和有序流水位线。实际开发中用的最多的就是乱序流水位线
在此之前,你已了解Flink在分布式环境下Watermark的传播方式

Flink官方提供的设置水位线的方法有Source之前和Source之后,这里主要介绍Source之后的方法

默认方法

通过assignTimestampsAndWatermarks()方法来设置。
assignTimestampsAndWatermarks()方法主要依赖WatermarkStrategy,通过 WatermarkStrategy 我们可以为每条数据设置时间戳并生成 Watermark 。
assignTimestampsAndWatermarks() 方法源码

其中 WatermarkStrategy 实现了两个接口,分别是TimestampAssignerSupplier<T>设置时间戳和WatermarkGeneratorSupplier<T>生产水位线

基于使用场景的普遍,Flink官方已经封装好了两个方法供我们调用,大致使用方式如下:

dataStreamSource.assignTimestampsAndWatermarks(
                    WatermarkStrategy   
                      //.<T>forBoundedOutOfOrderness(...) // 乱序流    
                        .<T>forMonotonousTimestamps(...) // 有序流
                        .withTimestampAssigner(...)
                 );                
  • forBoundedOutOfOrderness()/forMonotonousTimestamps(...):为具体水位线生产的策略
  • T:为数据流的元素类型
  • withTimestampAssigner(...):为指定事件时间戳

实际用法

package com.peng.time;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;


/**
 * @author 海绵先生
 * @Description TODO 生成简单的乱序流和有序流水位线
 * @date 2022/10/17-16:11
 */
public class WatermarkTest01 {
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Event {
        private String user;
        private String url;
        private Long timestamp;
    }
    public static void main(String[] args) {
        // TODO env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置时间水位线——生成间隔100毫秒
        env.getConfig().setAutoWatermarkInterval(100);

        // TODO source
        DataStreamSource<Event> dataStreamSource = env.fromElements(new Event("Sponge", "./user", 100L),
                new Event("Tom", "./user", 150L),
                new Event("Jrak", "./lookspouce", 300L),
                new Event("Sponge", "./like", 400L));

        // TODO 有序流的水平线生成方法
        //forMonotonousTimestamps是一个泛型方法,所以前面要指定一个泛型
        SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                //forMonotonousTimestamps 是WatermarkStrategy接口里的静态方法,为设置有序流的水位线
                .<Event>forMonotonousTimestamps()// 内部设置的是允许延迟时间为0,源码:super(Duration.ofMillis(0));返回的类型是WatermarkStrategy<T>
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    // 抽取时间戳的逻辑
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));
        /*
        * WatermarkStrategy.forMonotonousTimestamps()里只有一个水位线生成器,没有时间戳提取器
        * 所以还要在调用个withTimestampAssigner
        * */

        // TODO 乱序流生成水位线方法
        SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator1 = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))//Duration.ofSeconds(2)允许迟到2秒
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));
    }
}

自定义水位线

通过实现WatermarkStrategy<T>接口下的WatermarkGenerator<T>TimestampAssigner<T>两个方法

结合实例看看

package com.peng.time;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.metrics.stats.Max;

/**
 * @author 海绵先生
 * @Description TODO    自定义水位线
 * @date 2022/10/22-17:26
 */
public class WatermarkTest02 {
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Event {
        private String user;
        private String url;
        private Long timestamp;
    }

    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 更改默认水位线设置,设置成5000毫秒发射一次
        //env.getConfig().setAutoWatermarkInterval(5000L);

        // source
        DataStreamSource<Event> dataStreamSource = env.fromElements(new Event("Sponge", "./user", 100L),
                new Event("Tom", "./user", 150L),
                new Event("Jrak", "./lookspouce", 300L),
                new Event("Sponge", "./like", 400L));

        dataStreamSource.assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();

        env.execute();

    }
    public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {

        @Override
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            // 该方法下要返回一个 WatermarkGenerator 的实例类
            return new CustomPeriodicGenerator();
        }

        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    // 指定事件时间戳
                    return element.timestamp;
                }
            };
        }
    }

    public static class CustomPeriodicGenerator implements WatermarkGenerator<Event>{

        private Long delayTime = 5000L;//设置延迟时间 ms
        private Long maxTs = Long.MIN_VALUE + delayTime + 1L;//观察到的最大时间戳,先设个初始值

        //每来一条数据,就会调用一次onEvent()方法
        @Override
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// output 为发射水位线对象
            maxTs = Math.max(event.timestamp,maxTs); //更新最大时间戳
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 发射水位线,默认200ms调用一次
            output.emitWatermark(new Watermark(maxTs - delayTime -1L));
        }
    }
}

/*
* 水位线是由上游广播到下游,下游的并行度会同时接收到上游水位线
*
* */

另外,通过上面的WatermarkGenerator实现类我们可以看到,生产水位线的方法有两种,一种为周期性的,一种为逐个式的。逐个式的生产为来一条就生产一个水位线,只需要将onPeriodicEmit方法里的 output.emitWatermark放到 onEvent 中就行了。

标签:WatermarkStrategy,Watermark,Flink,Event,水位,new,import,public
From: https://www.cnblogs.com/Mr-Sponge/p/17134453.html

相关文章

  • Flink CDC 监听 Postgresql表的变化
    前言最近看文章说如何把Postgresql的数据同步给别的数据源,可以利用它的WAL,具体怎么操作没有说,我自己找到一篇文章https://www.cnblogs.com/xiongmozhou/p/14817641.html......
  • 智慧水利雨量水位报警站
    智慧水利雨量水位报警站产品简介雨量水位报警站由水位探测器、雨量传感器、报警灯、扩音器、太阳能板和采集传输控制器组成。实时采集水位等级,三个水位探测器对应3个水位等......
  • Flink On Yarn集群搭建
    一、环境准备:1.1jdk1.8、yarn集群环境 1.2下载Flink1.15.21.3解压到/opt/soft/1.4下载 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar并放在/opt/soft/flink1.15......
  • flink -windows下面运行
    1.官方下载地址  https://archive.apache.org/dist/flink/下载并解压到指定目录 2. 解压,到bin文件夹下面建两个文件     flink.bat::###############......
  • dinky-binlog-kafka-flinksql流程处理
    准确阶段:mysql:开启mysql日志kafka:需检查服务是否正常maxwell:这里采用19版本,过新的版本对java版本要求高,我这里是java8maxwell-1.19.0maxwell操作:cd/root/tar_temp/maxwell-......
  • Flink学习笔记目录
    FlinkDataStreamAPI学习笔记连接数据库大数据中各种框架的连接器(Spark,Flink,MongoDB,Kafka,Hive,Hbase等)......
  • Flink-On-Yarn部署
    FlinkOnYarn集群部署1.集群配置安装Yarn-Flink前置环境需要hadoop集群,至少三台,组件布局如下:组件masterslave1slave2IP192.168.2.21192.168.2.22192.168......
  • Flink 积压问题排查
    Flink作业运行时,最常见的问题就是积压问题,当作业出现积压时,如何才能快速定位到积压原因,并针对性解决呢?积压的发现通过我们会通过配置作业的积压报警来及时发现作用的积......
  • 【Flink】详解JobGraph
    ​ 【Flink】详解JobGraph大家好,我们的gzh是朝阳三只大明白,满满全是干货,分享近期的学习知识以及个人总结(包括读研和IT),跪求一波关注,希望和大家一起努力、进步!!概述JobGraph......
  • 袋鼠云 ChunJun(FlinkX)
    Hi,我是ChunJun,一个有趣好用的开源项目-OSCHINA-中文开源技术交流社区https://www.oschina.net/news/206630ChunJun首页、文档和下载-基于Flink的数据集成工具-......