文章目录
前言
在之前的文章中,有在FllinkSQL来实现窗口和水位线—flink中水位线和窗口的工作原理,这次使用DataStream API的方式来实现窗口和水位线,主要代码来自尚硅谷的课件资料。
Watermark的代码
1.WatermarkStrategy. forBoundedOutOfOrderness()方法来实现延迟时间,在该代码中设置的延迟时间是2s,
2.withTimestampAssigner()指定时间戳分配器,从数据中提取,该代码是如果"operate_time"不为空,则用"operate_time"生成时间戳,否则使用"create_time"。
//TODO 4.提取事件时间生成Watermark
SingleOutputStreamOperator<JSONObject> jsonObjWithWmDS = jsonObjDS.assignTimestampsAndWatermarks (WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness (Duration.ofSeconds (2)).withTimestampAssigner (new SerializableTimestampAssigner<JSONObject> () {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
String operateTime = element.getString ("operate_time");
if (operateTime != null) {
return DateFromatUtil.toTs (operateTime, true);
} else {
return DateFromatUtil.toTs (element.getString ("create_time"), true);
}
}
}));
Window的代码
windowAll(TumblingEventTimeWindows) 为滚动事件时间窗口,调用它的静态方法.of(),窗口事件为10s,
ReduceFunction是进行增量的聚合,每来一个数据就做一次聚合;到窗口需要触发计算时,AllWindowFunction进行全量的聚合,直接将增量聚合函数的结果拿来当作了Iterable类型的输入。
//TODO 7.开窗、聚合
SingleOutputStreamOperator<CartAddUuBean> resultDS = cartAddDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
.reduce(new ReduceFunction<CartAddUuBean> () {
@Override
public CartAddUuBean reduce(CartAddUuBean value1, CartAddUuBean value2) throws Exception {
value1.setCartAddUuCt(value1.getCartAddUuCt() + value2.getCartAddUuCt());
return value1;
}
}, new AllWindowFunction<CartAddUuBean, CartAddUuBean, TimeWindow> () {
@Override
public void apply(TimeWindow window, Iterable<CartAddUuBean> values, Collector<CartAddUuBean> out) throws Exception {
CartAddUuBean next = values.iterator().next();
next.setEdt(DateFromatUtil.toYmdHms(window.getEnd()));
next.setStt(DateFromatUtil.toYmdHms(window.getStart()));
next.setTs(System.currentTimeMillis());
out.collect(next);
}
});
实践
输入的数据
{"id":100924,"user_id":"94","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:10","operate_time":"2021-10-17 09:28:52","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}
{"id":100924,"user_id":"92","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:20","operate_time":"2021-10-17 09:28:55","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}
{"id":100924,"user_id":"90","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:30","operate_time":"2021-10-17 09:29:00","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}
{"id":100924,"user_id":"91","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:30","operate_time":"2021-10-17 09:29:01","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}
{"id":100924,"user_id":"91","sku_id":16,"cart_price":4488.00,"sku_num":2,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:29:30","operate_time":"2021-10-17 09:29:02","is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null}
输出的数据
*****:2> CartAddUuBean(stt=2021-10-17 09:28:50, edt=2021-10-17 09:29:00, cartAddUuCt=2, ts=1729082773882)
由输出数据可知,operate_time为2021-10-17 09:28:52,2021-10-17 09:28:55 在【2021-10-17 09:28:50, 2021-10-17 09:29:00)的窗口里,统计数为2
等到"operate_time":"2021-10-17 09:29:02"的数据到来时,窗口才会关闭计算。