首页 > 其他分享 >全量窗口与增量窗口

全量窗口与增量窗口

时间:2023-04-01 16:12:12浏览次数:45  
标签:flink 窗口 增量 api 全量 org apache import MonitorAggregate

  全量和增量是可以共用的,这种场景还是比较常见的。

  例如,分组后,先对每个组进行增量计算,然后对每个增量计算,做一个全窗口的计算。

  这个时候,就需要互相结合使用了。

 

  

 

 

 

一:窗口函数说明

1:apply与process的区别

  

 

  • apply和process都是处理全量计算,但工作中正常用process。
  • process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。

 

2.reduce与aggregate的区别

  • reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T>
  • maxBy、minBy、sum这3个底层都是由reduce实现的
  • aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>

 

二:AggregateFunction和ProcessWindowFunction结合使用

1.

  

 

 

 

2.结合 

  在reduce和aggregate中,都有一个可以把增量函数和全量函数结合使用的方法,就是上面图中标红色五角星的。

  对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。

public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)

 

3.示例

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Slf4j
public class OffLineAlarmStrategy implements AlarmStrategy<MonitoringIndex, DataOfflineResult> {
    @Override
    public DataStream<DataOfflineResult> execute(DataStream<MonitoringIndex> dataInterruptTimesStream) {
        // 判读出数据是否离线
        SingleOutputStreamOperator<DataOfflineResult> aggregateDS = dataInterruptTimesStream.assignTimestampsAndWatermarks(WatermarkStrategy.<MonitoringIndex>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                        .withTimestampAssigner((SerializableTimestampAssigner<MonitoringIndex>) (element, recordTimestamp) -> {
                            if (element.getEventTime() > System.currentTimeMillis()) {
                                log.error("=======> the event time is larger than the current time,event time = {}", element.getEventTime());
                                return System.currentTimeMillis();
                            }
                            return element.getEventTime();
                        }))
                .keyBy((KeySelector<MonitoringIndex, Long>) MonitoringIndex::getCustomerId)

                .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(20)))
                .aggregate(new AggregateFunction<MonitoringIndex, MonitorAggregate, MonitorAggregate>() {

                    @Override
                    public MonitorAggregate createAccumulator() {
                        return new MonitorAggregate();
                    }

                    @Override
                    public MonitorAggregate add(MonitoringIndex value, MonitorAggregate accumulator) {
                        accumulator.getSignalSet().add(value.getCustomerId() + "::" + value.getSignalNo() + "::" + value.getDeviceNo());
                        return accumulator;
                    }

                    @Override
                    public MonitorAggregate getResult(MonitorAggregate accumulator) {
                        return accumulator;
                    }

                    @Override
                    public MonitorAggregate merge(MonitorAggregate a, MonitorAggregate b) {
                        return null;
                    }
                }, new ProcessWindowFunction<MonitorAggregate, DataOfflineResult, Long, TimeWindow>() {

                    @Override
                    public void process(Long customerId, ProcessWindowFunction<MonitorAggregate, DataOfflineResult, Long, TimeWindow>.Context context, Iterable<MonitorAggregate> elements, Collector<DataOfflineResult> out) throws Exception {
                        long windowStart = context.window().getStart();
                        long windowEnd = context.window().getEnd();
                        log.info("==============窗口开始时间:{} 窗口结束时间:{}, customerId={}", windowStart, windowEnd, customerId);
                        List<MonitorAggregate> monitoringIndexList = Lists.newArrayList(elements.iterator());
                        Set<String> monitorAggregateSet = new HashSet<>();
                        if (!CollectionUtils.isEmpty(monitoringIndexList)) {
                            for (MonitorAggregate monitorAggregate : monitoringIndexList) {
                                monitorAggregateSet.addAll(monitorAggregate.getSignalSet());
                            }
                        }
                        if (!CollectionUtils.isEmpty(monitorAggregateSet)) {
                            List<SimpleMonitor> simpleMonitorList = new ArrayList<>();
                            for (String signalNoKey : monitorAggregateSet) {
                                String[] splitResult = signalNoKey.split("::");
                                SimpleMonitor simpleMonitor = new SimpleMonitor();
                                simpleMonitor.setCustomerId(Long.parseLong(splitResult[0]));
                                simpleMonitor.setSignalNo(splitResult[1]);
                                simpleMonitor.setDeviceNo(splitResult[2]);
                                simpleMonitorList.add(simpleMonitor);
                            }
                            if (!CollectionUtils.isEmpty(simpleMonitorList)) {
                                List<DataOfflineResult> dataOfflineResults = OneHourOffLineCalculate.getCalculateSignalStatus(customerId, simpleMonitorList, windowStart, windowEnd);
                                if (!CollectionUtils.isEmpty(dataOfflineResults)) {
                                    dataOfflineResults.forEach(out::collect);
                                }
                            }
                        }

                    }
                });return aggregateDS;
    }



}

 

标签:flink,窗口,增量,api,全量,org,apache,import,MonitorAggregate
From: https://www.cnblogs.com/juncaoit/p/17278775.html

相关文章

  • GNOME 窗口添加最大化、最小化按钮
    1、安装工具使用终端命令安装优化工具yuminstallgnome-tweak-tool 2、配置gnome-tweak-tool安装完毕后,在应用程序的“工具”中找到“优化”程序打开。然后选择“窗口标题栏”,将里面的“最大化”、“最小化”选项打开即可。  转载:https://www.likecs.com/show-308......
  • C# 当前进程是否有控制台窗口
    WPF应用程序,在VS的项目属性中,可以设置输出类型:那我们在代码中,如何判断应用的类型呢。有没有控制台?是否Windows应用程序还是控制台应用程序?Kernel32下函数GetConsoleWindow可以解决这个问题:[DllImport("kernel32.dll")]privatestaticexternIntPtrGetConsoleWindow();1......
  • 如果用awt在指定窗口画画
    packagecom.xiangwen;importjavax.imageio.ImageIO;importjavax.swing.*;importjava.awt.*;importjava.awt.image.BufferedImage;importjava.io.IOException;importjava.util.Timer;importjava.util.TimerTask;publicclassTestPain{publicstaticv......
  • 窗口函数 oracle_11g
    窗口函数oracle_11g数据库中的窗口函数也叫分析函数,顾名思义,窗口函数可用于一些复杂的统计分析计算,另外,窗口函数还具有优越的性能表现,可以节约时间和资源,因此窗口函数经常用于数据仓库和大型报表应用中。窗口函数的结构窗口函数由四部分组成,分别是分析函数名、分区子句、排......
  • selenium之关闭窗口,指定窗口大小,前进,后退,刷新等操作
    关闭窗口1、仅关闭当前窗口(Tab页),其他窗口不退出关闭用户当前正在使用的Web浏览器窗口,即WebDriver当前正在访问的窗口。.close()方法既不需要任何参数,也无任何返回值。driver.close()相当于浏览器中每个Tab页中的叉叉。2、关闭所有的浏览器窗口(WebDriver初始化的整个浏览器进程)同于......
  • 获取浏览器窗口尺寸及监听浏览器变化
    原JS获取:窗口可视高度:window.innerWidth窗口可视宽度:window.innerHeight窗口文档高度:document.body.clientWidth窗口文档宽度:document.body.clientHeightJQuery获取:窗口可视高度:$(window).height()窗口文档高度:$(window).height()窗口body高度:$(document.body).height()窗口文......
  • CAD命令行怎么恢复到初始状态?CAD命令行窗口恢复步骤
    CAD制图过程中,为了提高绘图效率经常会用到各种命令,很多命令信息及操作提示会在CAD命令行中显示。可当不小心改变了命令行的状态时,该怎么办呢?下面就和小编来了解一下CAD命令行怎么恢复到初始状态吧!CAD命令行窗口恢复初始步骤:1、启动浩辰CAD软件后,调用OP命令,即可打开【选项】对话......
  • 用C#调用Windows API向指定窗口发送按键消息
    用C#调用WindowsAPI向指定窗口发送一、调用WindowsAPI。C#下调用WindowsAPI方法如下:1、引入命名空间:usingSystem.Runtime.InteropServices;2、引用需要使用的方法,格式:[DllImport("DLL文件")]方法的声明;[DllImport("user32.dll")]privatestaticexternboolShow......
  • C# 当前进程是否有控制台窗口
    WPF应用程序,在VS的项目属性中,可以设置输出类型:那我们在代码中,如何判断应用的类型呢。有没有控制台?是否Windows应用程序还是控制台应用程序?Kernel32下函数GetConsoleWindow可以解决这个问题:[DllImport("kernel32.dll")]privatestaticexternIntPtrGetConsoleWindow();......
  • 增量查数据库
    Person:IhopethatIcanincrementallyquerythedatabasetoobtaindata.Iwillstoretheresultsofeachdatabasequeryinredisandrecordthetimeaoft......