首页 > 其他分享 >Flink-水位线的设置以及传递

Flink-水位线的设置以及传递

时间:2022-11-19 19:00:26浏览次数:54  
标签:Flink Event 传递 水位 env new public WatermarkGenerator

6.2 水位线

6.2.1 概述

  1. 分类
  • 有序流

image.png

  • 无序流 image.png 判断的时间延迟
  1. 延迟时间判定

6.2.2 水位线的设置

  1. 分析

image.png DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本质还是个算子,传入的参数是WatermarkStrategy的生成策略

image.png 但是WatermarkStrategy是一个接口

  • 有序流

image.png

因此调用静态方法forMonotonousTimeStamps后new AscendingTimestampsWatermarks返回WatermarkGernerator image.png

AscendingTimestampsWatermarks这个继承自BoundOutOfOrdernessWatermarks

image.png image.png image.png

BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口

image.png

然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy,参数是new SerializableTimestampAssigner的对象,重写extractTimestamp方法,这个方法作用是怎么样从数据里面提取时间戳

image.png

  • 乱序流

image.png 因此调用静态方法forBoundedOutOfOrderness(参数为最大乱序程度,也就是延迟时间)后new BoundOutOfOrdernessWatermarks返回WatermarkGernerator

image.png

BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口(跟上面一样了)

image.png

后面也跟有序一样,然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy

  1. 完整代码
public class WatermarkTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.输入
        SingleOutputStreamOperator<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L))
            
//                //有序流的watermark生成
//                //forMonotonousTimestamps前指定泛型
//                .assignTimestampsAndWatermarks(WatermarkStrategy
//                        .<Event>forMonotonousTimestamps()//得到WatermarkGenerator
//                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {//返回WatermarkStrategy
//                            @Override
//                            //参数是当前传过来的数据element,另一个传出的recordTimestamp是时间戳
//                            public long extractTimestamp(Event element, long recordTimestamp) {
//                                return element.timestamp;
//                            }
//                        })
//                )
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    //forMonotonousTimestamps前指定泛型
                    //forMonotonousTimestamps参数是最大乱序时间
                    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator
                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                        @Override
                        public long extractTimestamp(Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    })
            );
        env.execute();
    }
}

6.2.3 自定义水位线

  1. 分析

image.png

或者直接new 一个接口WatermarkStrategy重写createWatermarkGenerator的watermark生成器的方法(生成WatermarkGenerator)以及createTimeStampAssigner提取时间戳分配器的方法(生成TimeStampAssigner)创建watermark

image.png

image.png

image.png

image.png

WatermarkGenerator是个接口,有两个方法分别是onEvent方法,主要目标是要发出一个WatermarkOutput,另一个是onperiodicEmit方法,表示周期性的生成,周期性生成时间默认是2秒,env调用getConfig后调用setAutoWatermarkInterval后可以更改周期性生成时间

image.png image.png

WatermarkOutput也是一个接口,调用emitWatermark就能发出一个watermark,

image.png

image.png

除了WatermarkGenerator接口还有TimeStampAssigner也是个接口,里面只有一个方法叫做extractTimestamp,目的是从当前数据提取时间戳,同时也会作为WatermarkGenerator这个接口中onEvent方法中传入的参数eventTimestamp时间戳(见上上上上上上图)

  1. 代码
  • 正常水位线
// 自定义水位线的产生
public class CustomWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                .print();
        env.execute();
    }
    //内部静态类
    public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
        @Override
        //createTimestampAssigner方法生成TimeStampAssigner
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                //extractTimestamp,目的是从当前数据提取时间戳
                public long extractTimestamp(Event element, long recordTimestamp)
                {
                    return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
                }
            };
        }
        @Override
        //createWatermarkGenerator生成WatermarkGenerator
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomPeriodicGenerator();
        }
    }
    //CustomPeriodicGenerator实现WatermarkGenerator接口,并重写方法
    public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
        private Long delayTime = 5000L; // 延迟时间
        private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
        @Override
        //更新当前时间戳,这边不发送水位线,目的是保存时间戳
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput
                output) {
            // 每来一条数据就调用一次
            maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
        }
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 发射水位线,默认 200ms 调用一次
            //-1毫秒都是为了贴切窗口闭合的时候左闭右开设计
            output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }
}

  • 断点水位线

在onevent根据条件触发,onPeriodicEmit这个方法中就不用做了

    public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
        @Override
        public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
        // 只有在遇到特定的 itemId 时,才发出水位线
            if (r.user.equals("Mary")) {
                output.emitWatermark(new Watermark(r.timestamp - 1));
            }
        }
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
        }
    }

  • 在自定义数据源中发送水位线

使用 collectWithTimestamp 方法将数据发送出去,原来直接out.collect()的

image.png

参数是当前数据还有当前数据的时间戳,跟水位线生成中extractTimestamp(Event element, long recordTimestamp)这个类似,也是一个数据是什么,一个时间戳是啥

然后发送水位线,用emitWatermark方法生成

public class CustomWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSourceWithWatermark()).print();
        env.execute();
    }
    // 泛型是数据源中的类型
    public static class ClickSourceWithWatermark implements SourceFunction<Event>
    {
        private boolean running = true;
        @Override
        public void run(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
            String[] userArr = {"Mary", "Bob", "Alice"};
            String[] urlArr = {"./home", "./cart", "./prod?id=1"};
            while (running) {
                long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                String username = userArr[random.nextInt(userArr.length)];
                String url = urlArr[random.nextInt(urlArr.length)];
                Event event = new Event(username, url, currTs);
                // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
                sourceContext.collectWithTimestamp(event, event.timestamp);
                // 发送水位线
                sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                Thread.sleep(1000L);
            }
        }
        @Override
        public void cancel() {
            running = false;
        }
    }
}

6.2.4 水位线的传递

针对多个分区,上游需要告诉下游水位线情况,采用的是广播的方式给所有下游子任务

但是上游如果也是并行的,向下传输的水位线可能有多个,以上游发过来最小的时钟为准,并且下游会有一个分区专门保存上游发过来的水位线最小的数据

image.png

image.png

标签:Flink,Event,传递,水位,env,new,public,WatermarkGenerator
From: https://blog.51cto.com/u_15316078/5870609

相关文章

  • vue-router参数传递
    通过query传递   使用$route获取query对象   ......
  • TransmittableThreadLocal传递ServletRequestAttributes对象在主线程和线程池,避坑指南
    关于HttpServletRequest对象在主线程和线程池传递过程的问题一,针对一般对象,解决主线程和线程池内线程对象解决方案是用阿里的插件TransmittableThreadLocal使用案例(1)将线程......
  • 76:参数的传递_不可变对象含可变子对象_内存分析
    ###传递不可变对象包含的子对象是可变的情况#传递不可变对象时。不可变对象里面包含的子对象是可变的。则方法内修改了这个可变对象,源对象也发生了变化。a=(10,20,......
  • Flink基于State做千万用户的pv
    需求:记录每天某一页面下所有用户的访问次数和第一次访问的时间解法:redis做缓存,每天一个map,设置ttl,用户访问次数做累积,过滤完先存到redis,sink的时候读redis,查出这个用户的总......
  • Flink广播变量
    应用场景实时更新配置,例如:任务在统计3个页面的uv,又要统计另外三个页面的uv,那我是不是可以通过配置的方式,快速实现类似需求实时加载维表,例如:kafka里用户购买的订单信息的binl......
  • Flink/Spark中ETL的简单模版
    我们往往可以忽略外界的干扰因素,避免焦虑,专心做自己想做的事情,反正焦虑又解决不了问题引言使用flink或者spark的时候,写好固定的模版很重要,对于一下etl的实时任务,只需要执行......
  • Apache Flink架构及其工作原理
    ApacheFlink架构及其工作原理1、定义:Apacheflink是一个实时计算框架和分布式处理引擎,用于再无边界和有边界数据流上进行有状态的计算,Flink能在所有的集群环境中运行,......
  • 在macbook m1上调试flink1.14.3
    前置条件1:首先先用homebrew安装一下flink1.14.3版本,安装完成后,/usr/local/Celler/apache-flink/1.14.3是主路径。可以看看有没有类似的文件夹来确定有没有安装上。前置条......
  • Android 中通过Intent传递类对象,通过实现Serializable和Parcelable接口两种方式传递对
    方式一:通过实现Serializable接口传递对象用一个小的Demo去理解,通过实现Serializable接口传递对象。效果图:具体讲解在代码注释中已经写出先创建一个对象:packagecom.exampl......
  • 初识Flink简单介绍
    Flink是实时计算框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。有界流和无界流都是基于Datastream这个Flink的编程模型。Flink自己管理内存机制,批......