首页 > 其他分享 >flink水位线案例

flink水位线案例

时间:2024-03-30 12:00:31浏览次数:23  
标签:pv String flink value throws 案例 window 水位 public

前言:

        结合上个水位线知识点做出的题目案例给予以下代码作为参考。

例题:

1.创建Flink流处理环境。

//创建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


env.execute();

2.从“access.txt”文件中获取数据源。

        资源中查询数据源

3.从中过滤出包含有page字段的数据。

//过滤数据,转换数据
        SingleOutputStreamOperator<UserEvent> stream1 = stream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                //value是一个json字符串
                JSONObject jsonObject = JSONObject.parseObject(value);
                JSONObject page = jsonObject.getJSONObject("page");
                JSONObject common = jsonObject.getJSONObject("common");
                //只保留page和common不为空数据
                return page != null && common != null;
            }
        }).map(new MapFunction<String, UserEvent>() {
            @Override
            public UserEvent map(String value) throws Exception {
                //value是一个JSON字符串
                JSONObject jsonObject = JSONObject.parseObject(value);
                //先取uid
                String uid = jsonObject.getJSONObject("common").getString("uid");
                //再取page_id
                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                //取ts
                Long ts = jsonObject.getLong("ts");
                return new UserEvent(uid, pageId, ts);
            }
        });

4.设置时间戳及单调递增水位线。

SingleOutputStreamOperator<UserEvent> stream2 = stream1.assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserEvent>forMonotonousTimestamps()
                        .withTimestampAssigner((event, ts) -> event.getTs())
        );

5.统计每十秒钟的用户访客量pv,以及窗口开始时间和结束时间,并输出至控制台。

SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1001 = stream2.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessAllWindowFunction<UserEvent, Tuple3<String, String, Long>, TimeWindow>() {
                    /**
                     *
                     * @param context 上下文对象 -- 拿到窗口信息
                     * @param iterable 10秒钟累积的数据
                     * @param collector 采集器
                     * @throws Exception
                     */
                    @Override
                    public void process(Context context, Iterable<UserEvent> iterable, Collector<Tuple3<String, String, Long>> collector) throws Exception {
                        TimeWindow window = context.window();//窗口对象
                        long start = window.getStart();//窗口开始时间戳
                        long end = window.getEnd();//窗口结束时间戳
                        long pv = 0L;
                        for (UserEvent event : iterable) {
                            pv++;//统计页面浏览量
                        }
                        //格式话成字符串输出
                        String strStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                        String strEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
                        //采集器输出
                        collector.collect(Tuple3.of(strStart, strEnd, pv));
                    }
                });
        stream1001.print();

6.统计每十秒钟的独立用户访客量uv,以及窗口开始时间和结束时间,并输出至控制台。

SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1002 = stream2.keyBy(value -> value.getUid()).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<UserEvent, Tuple3<String, String, Long>, String, TimeWindow>() {
                    /**
                     *
                     * @param s 键名(组名)
                     * @param context
                     * @param iterable
                     * @param collector
                     * @throws Exception
                     */
                    @Override
                    public void process(String s, Context context, Iterable<UserEvent> iterable, Collector<Tuple3<String, String, Long>> collector) throws Exception {
                        TimeWindow window = context.window();//窗口对象
                        long start = window.getStart();//窗口开始时间戳
                        long end = window.getEnd();//窗口结束时间戳
                        long uv = 1;
                        //格式话成字符串输出
                        String strStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                        String strEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
                        //采集器输出三元组
                        collector.collect(Tuple3.of(strStart, strEnd, 1L));
                    }
                }).keyBy(value -> value.f0).sum(2);
        stream1002.print();

7.根据窗口开始和结束时间,使用join或者union将pv,uv数据进行汇总。

stream1001.join(stream1002)
                .where(value1 -> value1.f0)
                .equalTo(value2 -> value2.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
                    @Override
                    public String join(Tuple3<String, String, Long> first, Tuple3<String, String, Long> second) throws Exception {
                        return "开始时间:"+first.f0+"结束时间:"+first.f1+",PV="+first.f2+",UV="+second.f2;
                    }
                }).print();

 8.统计每十秒钟较前十秒PV的变化,如果增加则输出正数,减少则输出负数

//另外编写一个方法 调用即可
public static class pvSumFunction extends ProcessAllWindowFunction<UserEvent, String, TimeWindow>{

        private transient ValueState<Long> lastPvValueState;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            //定义描述器对象
            ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<Long>(
                    "lastPv", // the state name
//                    TypeInformation.of(new TypeHint<Long>() {}), // 描述类型
                    Types.LONG

            );
            //在这里初始化状态对象
            lastPvValueState = getRuntimeContext().getState(descriptor);
        }

        //处理函数
        @Override
        public void process(Context context, Iterable<UserEvent> iterable, Collector<String> collector) throws Exception {
            //统计当前窗口的pv
            Long pv = 0L;
            for (UserEvent event : iterable) {
                pv++;
            }
            //上一个窗口的pv
            Long lastPv = 0L;
            Long value = lastPvValueState.value();
            if (value!=null){
                lastPv = value;
            }
            //差值
            long chaZhi = pv - lastPv;
            //把当前窗口的pv写回去
            lastPvValueState.update(pv);
            //输出结果(开始时间,结束时间)
            String start = DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:mm:ss");
            String ebd = DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss");
            collector.collect("["+start+"-->"+ebd+")的pv是:"+pv+"-->与上一个10秒的差值是"+chaZhi);
        }
    }

9.统计每个十秒pv的前三值

public static class pvTopNFunction extends ProcessAllWindowFunction<UserEvent, String, TimeWindow>{

        //所有窗口的pv值都存里面
        private transient MapState<Long,Long> pvMapState;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //初始化
            pvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>(
                    "pvMapState",
                    Types.LONG,
                    Types.LONG
            ));
        }

        @Override
        public void process(Context context, Iterable<UserEvent> iterable, Collector<String> collector) throws Exception {
            //map是通过键来存取的
            //拿到窗口开始的时间戳
            long start = context.window().getStart();

            //计算本窗口的pv
            Long pv=0L;
            for (UserEvent event : iterable) {
                pv++;
            }
            //获取pvMapState里面的值
            pvMapState.put(start,pv);

            //排序并且获取Top3
            Iterable<Map.Entry<Long, Long>> entries = pvMapState.entries();//取出map的键值对
            //定义一个Map
            Map<Long,Long>pvMap=new HashMap<>();

            for (Map.Entry<Long, Long> entry : entries) {
                pvMap.put(entry.getKey(),entry.getValue());
            }
            //排序(用的是Java里面的stream Api)
            List<Map.Entry<Long, Long>> top3 = pvMap.entrySet().stream()
                    .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))//comparingByValue按值排序,reverseOrder降序排序
                    .limit(3)//取前3条
                    .collect(Collectors.toList());//采集

            //转换为字符串输出
            String rs="前三名是:\n";
            for (Map.Entry<Long, Long> longLongEntry : top3) {
                Long startMilli=longLongEntry.getKey();
                String strStart = DateFormatUtils.format(startMilli, "yyyy-MM-dd HH:mm:ss");
                rs+="开始时间是:"+strStart+",pv是:"+longLongEntry.getValue()+"\n";
            }
            collector.collect(rs);
        }

    }

代码总结:

public class UvPvDemo1 {
    public static void main(String[] args) throws Exception {
        //创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置平行度
        env.setParallelism(5);
        //把处理方式设置为批处理模式
//        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        //读取文件
        DataStreamSource<String> stream = env.readTextFile("D:\\workidea\\Flink1001\\input\\access.txt");
        //过滤数据,转换数据
        SingleOutputStreamOperator<UserEvent> stream1 = stream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                //value是一个json字符串
                JSONObject jsonObject = JSONObject.parseObject(value);
                JSONObject page = jsonObject.getJSONObject("page");
                JSONObject common = jsonObject.getJSONObject("common");
                //只保留page和common不为空数据
                return page != null && common != null;
            }
        }).map(new MapFunction<String, UserEvent>() {
            @Override
            public UserEvent map(String value) throws Exception {
                //value是一个JSON字符串
                JSONObject jsonObject = JSONObject.parseObject(value);
                //先取uid
                String uid = jsonObject.getJSONObject("common").getString("uid");
                //再取page_id
                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                //取ts
                Long ts = jsonObject.getLong("ts");
                return new UserEvent(uid, pageId, ts);
            }
        });

        //设置水位线策略
        SingleOutputStreamOperator<UserEvent> stream2 = stream1.assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserEvent>forMonotonousTimestamps()
                        .withTimestampAssigner((event, ts) -> event.getTs())
        );

        //开窗(统计每十秒钟的用户访客量pv,以及窗口开始时间和结束时间,并输出至控制台)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1001 = stream2.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessAllWindowFunction<UserEvent, Tuple3<String, String, Long>, TimeWindow>() {
                    /**
                     *
                     * @param context 上下文对象 -- 拿到窗口信息
                     * @param iterable 10秒钟累积的数据
                     * @param collector 采集器
                     * @throws Exception
                     */
                    @Override
                    public void process(Context context, Iterable<UserEvent> iterable, Collector<Tuple3<String, String, Long>> collector) throws Exception {
                        TimeWindow window = context.window();//窗口对象
                        long start = window.getStart();//窗口开始时间戳
                        long end = window.getEnd();//窗口结束时间戳
                        long pv = 0L;
                        for (UserEvent event : iterable) {
                            pv++;//统计页面浏览量
                        }
                        //格式话成字符串输出
                        String strStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                        String strEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
                        //采集器输出
                        collector.collect(Tuple3.of(strStart, strEnd, pv));
                    }
                });
//        stream1001.print();

        //(开始时间,结束数据,1)-》分组聚合(开始时间,结束时间,uv总和)
        //(统计每十秒钟的独立用户访客量uv,以及窗口开始时间和结束时间,并输出至控制台)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1002 = stream2.keyBy(value -> value.getUid()).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<UserEvent, Tuple3<String, String, Long>, String, TimeWindow>() {
                    /**
                     *
                     * @param s 键名(组名)
                     * @param context
                     * @param iterable
                     * @param collector
                     * @throws Exception
                     */
                    @Override
                    public void process(String s, Context context, Iterable<UserEvent> iterable, Collector<Tuple3<String, String, Long>> collector) throws Exception {
                        TimeWindow window = context.window();//窗口对象
                        long start = window.getStart();//窗口开始时间戳
                        long end = window.getEnd();//窗口结束时间戳
                        long uv = 1;
                        //格式话成字符串输出
                        String strStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                        String strEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
                        //采集器输出三元组
                        collector.collect(Tuple3.of(strStart, strEnd, 1L));
                    }
                }).keyBy(value -> value.f0).sum(2);
//        stream1002.print();
//
        //(根据窗口开始和结束时间,使用join或者union将pv,uv数据进行汇总)
        stream1001.join(stream1002)
                .where(value1 -> value1.f0)
                .equalTo(value2 -> value2.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
                    @Override
                    public String join(Tuple3<String, String, Long> first, Tuple3<String, String, Long> second) throws Exception {
                        return "开始时间:"+first.f0+"结束时间:"+first.f1+",PV="+first.f2+",UV="+second.f2;
                    }
                }).print();



        env.execute();
    }
}
public class UvPvDemo2 {
    public static void main(String[] args) throws Exception {
        //创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置平行度
        env.setParallelism(1);
        //把处理方式设置为批处理模式
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        //读取文件
        DataStreamSource<String> stream = env.readTextFile("D:\\workidea\\Flink1001\\input\\access.txt");
        //过滤数据,转换数据
        SingleOutputStreamOperator<UserEvent> stream1 = stream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                //value是一个json字符串
                JSONObject jsonObject = JSONObject.parseObject(value);
                JSONObject page = jsonObject.getJSONObject("page");
                JSONObject common = jsonObject.getJSONObject("common");
                //只保留page和common不为空数据
                return page != null && common != null;
            }
        }).map(new MapFunction<String, UserEvent>() {
            @Override
            public UserEvent map(String value) throws Exception {
                //value是一个JSON字符串
                JSONObject jsonObject = JSONObject.parseObject(value);
                //先取uid
                String uid = jsonObject.getJSONObject("common").getString("uid");
                //再取page_id
                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                //取ts
                Long ts = jsonObject.getLong("ts");
                return new UserEvent(uid, pageId, ts);
            }
        });

        //设置水位线策略
        SingleOutputStreamOperator<UserEvent> stream2 = stream1.assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserEvent>forMonotonousTimestamps()
                        .withTimestampAssigner((event, ts) -> event.getTs())
        );

        //统计每十秒钟较前十秒PV的变化,如果增加则输出正数,减少则输出负数
        stream2
                //滚动窗口
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                //滑动窗口
//                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                //会话窗口,需要指定会话中断间隔时间
//                .windowAll(EventTimeSessionWindows.withGap(Time.seconds(3)))
                .process(new pvTopNFunction()).print();

//        stream2.print();


        env.execute();
    }

    public static class pvSumFunction extends ProcessAllWindowFunction<UserEvent, String, TimeWindow>{

        private transient ValueState<Long> lastPvValueState;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            //定义描述器对象
            ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<Long>(
                    "lastPv", // the state name
//                    TypeInformation.of(new TypeHint<Long>() {}), // 描述类型
                    Types.LONG

            );
            //在这里初始化状态对象
            lastPvValueState = getRuntimeContext().getState(descriptor);
        }

        //处理函数
        @Override
        public void process(Context context, Iterable<UserEvent> iterable, Collector<String> collector) throws Exception {
            //统计当前窗口的pv
            Long pv = 0L;
            for (UserEvent event : iterable) {
                pv++;
            }
            //上一个窗口的pv
            Long lastPv = 0L;
            Long value = lastPvValueState.value();
            if (value!=null){
                lastPv = value;
            }
            //差值
            long chaZhi = pv - lastPv;
            //把当前窗口的pv写回去
            lastPvValueState.update(pv);
            //输出结果(开始时间,结束时间)
            String start = DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:mm:ss");
            String ebd = DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss");
            collector.collect("["+start+"-->"+ebd+")的pv是:"+pv+"-->与上一个10秒的差值是"+chaZhi);
        }
    }

    public static class pvTopNFunction extends ProcessAllWindowFunction<UserEvent, String, TimeWindow>{

        //所有窗口的pv值都存里面
        private transient MapState<Long,Long> pvMapState;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //初始化
            pvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>(
                    "pvMapState",
                    Types.LONG,
                    Types.LONG
            ));
        }

        @Override
        public void process(Context context, Iterable<UserEvent> iterable, Collector<String> collector) throws Exception {
            //map是通过键来存取的
            //拿到窗口开始的时间戳
            long start = context.window().getStart();

            //计算本窗口的pv
            Long pv=0L;
            for (UserEvent event : iterable) {
                pv++;
            }
            //获取pvMapState里面的值
            pvMapState.put(start,pv);

            //排序并且获取Top3
            Iterable<Map.Entry<Long, Long>> entries = pvMapState.entries();//取出map的键值对
            //定义一个Map
            Map<Long,Long>pvMap=new HashMap<>();

            for (Map.Entry<Long, Long> entry : entries) {
                pvMap.put(entry.getKey(),entry.getValue());
            }
            //排序(用的是Java里面的stream Api)
            List<Map.Entry<Long, Long>> top3 = pvMap.entrySet().stream()
                    .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))//comparingByValue按值排序,reverseOrder降序排序
                    .limit(3)//取前3条
                    .collect(Collectors.toList());//采集

            //转换为字符串输出
            String rs="前三名是:\n";
            for (Map.Entry<Long, Long> longLongEntry : top3) {
                Long startMilli=longLongEntry.getKey();
                String strStart = DateFormatUtils.format(startMilli, "yyyy-MM-dd HH:mm:ss");
                rs+="开始时间是:"+strStart+",pv是:"+longLongEntry.getValue()+"\n";
            }
            collector.collect(rs);
        }

    }
}

标签:pv,String,flink,value,throws,案例,window,水位,public
From: https://blog.csdn.net/2301_78959404/article/details/137169228

相关文章

  • flink水位线
    一、什么是水位线    在Flink中,用来衡量事件时间进展的标记,就被称为“水位线(Watermark)”。    水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来......
  • node.js 入门案例 安装教程
    前言Node.js是一个基于ChromeJavaScript运行时建立的一个平台。Node.js是一个事件驱动I/O服务端JavaScript环境,基于Google的V8引擎,V8引擎执行Javascript的速度非常快,性能非常好。可以让JavaScript在服务器端运行。它具有轻量级、高效、事件驱动、非阻塞I/O等特点,被广泛应......
  • 兼容模式下导致数值类型发生隐式转换,SQL在生产上无法正常使用案例
    兼容模式下导致数值类型发生隐式转换,SQL在生产上无法正常使用案例本文出处:https://www.modb.pro/db/403148基于MogDB版本V2.0.1问题现象厂商研发描述InsertSQL在生产上无法执行,而测试环境中同版本的数据库该SQL可以正常运行。检查SQL后,发现是很简单的insertinto......
  • 抽象类、案例
    抽象类必须使用abstract修饰:   修饰符 abstract class 类名{}抽象方法:就是抽象类中定义的子类必须完成的功能的基本要求。没有方法体,只有方法签名,必须abstract修饰例子:父类:packageabstract_class;publicabstractclassAnimal{publicabstractv......
  • JavaSE_方法method 定义时的注意事项 案例分析
    在定义方法时,需要注意以下几个重要事项:1.**方法定义的位置**:方法不能定义在另一个方法内部。2.**方法名及参数列表**:确保方法名的拼写正确,并且参数列表也要准确无误。参数列表包括参数的类型、顺序和数量,这些都必须与方法调用时的实参匹配。3.**返回值类型**:如果方法声明......
  • JavaSE If执行流程 案例分析
      在一个程序执行的过程中,各条语句的执行顺序对程序的结果是有直接影响的。所以,   我们必须清楚每条语句的执行流程。而且,很多时候我们要通过控制语句的执行顺序   来实现我们要完成的功能。      顺序结构(最常用):代码从上往下一行一行执行publicc......
  • C++精品小案例:C++中的多态性及其实现、模板元编程及其在C++中的应用
    1.C++中的多态性及其实现多态性是面向对象编程的三大特性之一,它允许使用父类类型的指针或引用来指向子类对象,并通过这个父类类型的指针或引用来调用实际子类的成员函数。这样,就可以在运行时确定应该调用哪个具体的函数实现,从而实现一个接口多种形态。多态性主要通过虚函数来......
  • KingbaseES V8R3集群运维案例之---failover切换后新主库启动过程
    案例说明:KingbaseESV8R3集群failover切换后,在生产环境中,新主库启动过程中可能会有业务访问,出现‘系统只读’的问题。如下图所示:适用版本:KingbaseESV8R3一、问题分析1、如下所示,failover切换过程:1)在master节点执行failover_stream.sh脚本执行failover切换。2)ping网关地......
  • KingbaseES V8R6集群运维案例之---PGPASSWORD变量导致esrep用户连接主库失败
    案例说明:KingbaseESV8R6集群,在备库执行clone时,esrep用户认证失败,导致clone失败。适用版本:KingbaseESV8R6一、问题现象如下所示,在执行备库clone是,esrep认证失败:备库sys_log日志:(esrep用户认证失败)二、问题分析对于KingbaseESV8R6集群,esrep的用户通过~/.encpwd建立认证(......
  • KingbaseES数据库案例之---输出数据库日志到syslog服务器
    案例说明:生产中心需对数据库日志建立审计,需要将数据库服务器的日志发送到日志服务器集中存储并建立审计。适用版本:KingbaseESV8R3/R6案例主机架构:node201192.168.1.201#数据库主机、syslog客户端node202192.168.1.202#syslog服务器一、构建syslog服务器S......