首页 > 其他分享 >flink中窗口和水位线-基于DataStream API

flink中窗口和水位线-基于DataStream API

时间:2024-10-16 23:17:23浏览次数:9  
标签:10 DataStream 09 17 flink API 2021 time id

文章目录

前言

在之前的文章中,有在FllinkSQL来实现窗口和水位线—flink中水位线和窗口的工作原理,这次使用DataStream API的方式来实现窗口和水位线,主要代码来自尚硅谷的课件资料。

Watermark的代码

1.WatermarkStrategy. forBoundedOutOfOrderness()方法来实现延迟时间,在该代码中设置的延迟时间是2s,
2.withTimestampAssigner()指定时间戳分配器,从数据中提取,该代码是如果"operate_time"不为空,则用"operate_time"生成时间戳,否则使用"create_time"。

//TODO 4.提取事件时间生成Watermark

        SingleOutputStreamOperator<JSONObject> jsonObjWithWmDS = jsonObjDS.assignTimestampsAndWatermarks (WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness (Duration.ofSeconds (2)).withTimestampAssigner (new SerializableTimestampAssigner<JSONObject> () {
            @Override
            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                String operateTime = element.getString ("operate_time");

                if (operateTime != null) {
                    return DateFromatUtil.toTs (operateTime, true);
                } else {
                    return DateFromatUtil.toTs (element.getString ("create_time"), true);
                }
            }
        }));

Window的代码

windowAll(TumblingEventTimeWindows) 为滚动事件时间窗口,调用它的静态方法.of(),窗口事件为10s,
ReduceFunction是进行增量的聚合,每来一个数据就做一次聚合;到窗口需要触发计算时,AllWindowFunction进行全量的聚合,直接将增量聚合函数的结果拿来当作了Iterable类型的输入。

     //TODO 7.开窗、聚合
        SingleOutputStreamOperator<CartAddUuBean> resultDS = cartAddDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<CartAddUuBean> () {
                    @Override
                    public CartAddUuBean reduce(CartAddUuBean value1, CartAddUuBean value2) throws Exception {
                        value1.setCartAddUuCt(value1.getCartAddUuCt() + value2.getCartAddUuCt());
                        return value1;
                    }
                }, new AllWindowFunction<CartAddUuBean, CartAddUuBean, TimeWindow> () {
                    @Override
                    public void apply(TimeWindow window, Iterable<CartAddUuBean> values, Collector<CartAddUuBean> out) throws Exception {
                        CartAddUuBean next = values.iterator().next();

                        next.setEdt(DateFromatUtil.toYmdHms(window.getEnd()));
                        next.setStt(DateFromatUtil.toYmdHms(window.getStart()));
                        next.setTs(System.currentTimeMillis());

                        out.collect(next);
                    }
                });

实践

输入的数据

{"id":100924,"user_id":"94","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:10","operate_time":"2021-10-17 09:28:52","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}


{"id":100924,"user_id":"92","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:20","operate_time":"2021-10-17 09:28:55","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}


{"id":100924,"user_id":"90","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:30","operate_time":"2021-10-17 09:29:00","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}


{"id":100924,"user_id":"91","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:30","operate_time":"2021-10-17 09:29:01","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}

{"id":100924,"user_id":"91","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:30","operate_time":"2021-10-17 09:29:02","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}

输出的数据

*****:2> CartAddUuBean(stt=2021-10-17 09:28:50, edt=2021-10-17 09:29:00, cartAddUuCt=2, ts=1729082773882)

由输出数据可知,operate_time为2021-10-17 09:28:52,2021-10-17 09:28:55 在【2021-10-17 09:28:50, 2021-10-17 09:29:00)的窗口里,统计数为2
等到"operate_time":"2021-10-17 09:29:02"的数据到来时,窗口才会关闭计算。

标签:10,DataStream,09,17,flink,API,2021,time,id
From: https://blog.csdn.net/weixin_44437669/article/details/142992521

相关文章

  • 具有GPS功能的监控设备接入as-v1000视频监控平台后,上层应用软件通过as-v1000视频监控
    目录一、什么是设备GPS轨迹二、设备GPS轨迹的应用场景        1、车辆追踪        2、个人定位        3、物流运输        4、资产安全        5、数据分析三、查询设备GPS轨迹的作用        1、实时监控与追踪 ......
  • 【FastAPI】入门基础
    FastAPI介绍和安装FastAPI是一个基于Python3.6+版本的异步WEB应用框架,使用Python类型注解构建webAPI。它的主要特点如下:高性能:与NodeJS和Go相当。编码快:将开发功能的速度提高2~3倍。Bug少:减少大约40%的由开发人员导致的错误。直观:强大的编辑器支持......
  • Java算法竞赛之HashMap常用API--哈西表!
    在Java算法竞赛中,HashMap是一个非常重要的数据结构,它提供了许多有用的API来方便地进行键值对的存储、检索和更新。除了getOrDefault方法外,HashMap还有其他一些常用的API。以下是一些主要的HashMapAPI及其在算法竞赛中的常见用法:put(Kkey,Vvalue)作用:将指定的键与值放入H......
  • [数据集成/数据同步] 基于数据库增量日志的数据同步方案 : Flink CDC/Debezium/DataX/
    1概述简述:CDC/增量数据同步CDC的全称是ChangeDataCapture(增量数据捕获)在广义的概念上,只要能捕获数据变更的技术,我们都可以称为CDC。我们目前通常描述的CDC技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC的技术实现方案基于查询......
  • Python应用指南:利用高德地图API获取公交可达圈
    参考文章:城市公交可达圈绘制方法(一)-知乎(zhihu.com)本篇文章我们聚焦于通过公共交通出行方式(包括公交、地铁、公交+地铁的组合)来获取一定时间内可以到达的范围。为了实现这一目标,我们将使用高德地图API中的公交到达圈功能,对城市某一点的公交可达圈进行详细分析。通过这一......
  • RESTful API常用的HTTP请求方法
    RESTfulAPI常用的HTTP请求方法1.GET获取资源在RESTfulAPI中,一般用来获取数据,例如列表,详情等。对应CRUD中的R,即查找操作。请求参数通常在URL中传递(如查询字符串)。2.POST用途:创建新资源。特点:可以提交数据到服务器进行处理,通常会改变服务器的状态或数据。例如提交表单信息......
  • API和SDK的区别
    API和SDK有以下区别:定义与功能:API(应用程序编程接口):是一组定义了软件组件之间交互规范的接口,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而无需访问源码或理解内部工作机制的细节。它就像是一个“门”,规定了进入和获取特定功能或数据的方式。例如,第......
  • APP视频活体检测API代码示例-动作活体检测开发
    视频活体检测技术是一种用于判断摄像头前的对象是否为真实活人的方法。传统的面部识别技术虽然能够有效地识别面部特征,但在面对照片、视频甚至是高精度的3D面具时,却显得力不从心。视频活体检测技术通过分析面部的微表情、皮肤纹理以及光线反射等多维度信息,能够有效地区分真实......
  • 2024年顶级免费货币转换API推荐
    如果您与国际客户合作,那么您需要掌握最新的货币信息。然而,汇率每天都在迅速变化,因此很难获得准确的价值。通过使用 货币转换器API,您可以毫不费力地获得实时货币汇率。此外,它也非常容易集成到您的网络应用程序中。您只需编写几行代码即可。通过这种方式,API可以帮助您节省时......
  • 淘宝/天猫API接口:商品详情的实时管理
    在当今数字化时代,电商平台已成为人们日常生活中不可或缺的一部分。作为国内电商领域的佼佼者,淘宝和天猫凭借其庞大的商品资源和便捷的购物体验,赢得了亿万消费者的青睐。为了进一步推动电商生态的发展,淘宝和天猫开放平台推出了商品详情API,为开发者和合作伙伴提供了一扇通往丰富......