全量和增量是可以共用的,这种场景还是比较常见的。
例如,分组后,先对每个组进行增量计算,然后对每个增量计算,做一个全窗口的计算。
这个时候,就需要互相结合使用了。
一:窗口函数说明
1:apply与process的区别
- apply和process都是处理全量计算,但工作中正常用process。
- process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。
2.reduce与aggregate的区别
- reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T>
- maxBy、minBy、sum这3个底层都是由reduce实现的
- aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>
二:AggregateFunction和ProcessWindowFunction结合使用
1.
2.结合
在reduce和aggregate中,都有一个可以把增量函数和全量函数结合使用的方法,就是上面图中标红色五角星的。
对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
3.示例
import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.compress.utils.Lists; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @Slf4j public class OffLineAlarmStrategy implements AlarmStrategy<MonitoringIndex, DataOfflineResult> { @Override public DataStream<DataOfflineResult> execute(DataStream<MonitoringIndex> dataInterruptTimesStream) { // 判读出数据是否离线 SingleOutputStreamOperator<DataOfflineResult> aggregateDS = dataInterruptTimesStream.assignTimestampsAndWatermarks(WatermarkStrategy.<MonitoringIndex>forBoundedOutOfOrderness(Duration.ofMinutes(1)) .withTimestampAssigner((SerializableTimestampAssigner<MonitoringIndex>) (element, recordTimestamp) -> { if (element.getEventTime() > System.currentTimeMillis()) { log.error("=======> the event time is larger than the current time,event time = {}", element.getEventTime()); return System.currentTimeMillis(); } return element.getEventTime(); })) .keyBy((KeySelector<MonitoringIndex, Long>) MonitoringIndex::getCustomerId) .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(20))) .aggregate(new AggregateFunction<MonitoringIndex, MonitorAggregate, MonitorAggregate>() { @Override public MonitorAggregate createAccumulator() { return new MonitorAggregate(); } @Override public MonitorAggregate add(MonitoringIndex value, MonitorAggregate accumulator) { accumulator.getSignalSet().add(value.getCustomerId() + "::" + value.getSignalNo() + "::" + value.getDeviceNo()); return accumulator; } @Override public MonitorAggregate getResult(MonitorAggregate accumulator) { return accumulator; } @Override public MonitorAggregate merge(MonitorAggregate a, MonitorAggregate b) { return null; } }, new ProcessWindowFunction<MonitorAggregate, DataOfflineResult, Long, TimeWindow>() { @Override public void process(Long customerId, ProcessWindowFunction<MonitorAggregate, DataOfflineResult, Long, TimeWindow>.Context context, Iterable<MonitorAggregate> elements, Collector<DataOfflineResult> out) throws Exception { long windowStart = context.window().getStart(); long windowEnd = context.window().getEnd(); log.info("==============窗口开始时间:{} 窗口结束时间:{}, customerId={}", windowStart, windowEnd, customerId); List<MonitorAggregate> monitoringIndexList = Lists.newArrayList(elements.iterator()); Set<String> monitorAggregateSet = new HashSet<>(); if (!CollectionUtils.isEmpty(monitoringIndexList)) { for (MonitorAggregate monitorAggregate : monitoringIndexList) { monitorAggregateSet.addAll(monitorAggregate.getSignalSet()); } } if (!CollectionUtils.isEmpty(monitorAggregateSet)) { List<SimpleMonitor> simpleMonitorList = new ArrayList<>(); for (String signalNoKey : monitorAggregateSet) { String[] splitResult = signalNoKey.split("::"); SimpleMonitor simpleMonitor = new SimpleMonitor(); simpleMonitor.setCustomerId(Long.parseLong(splitResult[0])); simpleMonitor.setSignalNo(splitResult[1]); simpleMonitor.setDeviceNo(splitResult[2]); simpleMonitorList.add(simpleMonitor); } if (!CollectionUtils.isEmpty(simpleMonitorList)) { List<DataOfflineResult> dataOfflineResults = OneHourOffLineCalculate.getCalculateSignalStatus(customerId, simpleMonitorList, windowStart, windowEnd); if (!CollectionUtils.isEmpty(dataOfflineResults)) { dataOfflineResults.forEach(out::collect); } } } } });return aggregateDS; } }
标签:flink,窗口,增量,api,全量,org,apache,import,MonitorAggregate From: https://www.cnblogs.com/juncaoit/p/17278775.html