首页 > 其他分享 >flink版本: 1.14.6 flink水位生成以及基于水位触发窗口的计算

flink版本: 1.14.6 flink水位生成以及基于水位触发窗口的计算

时间:2024-06-20 17:12:05浏览次数:12  
标签:1.14 api flink 水位 org apache import

Flink是间断性(punctuate)或者周期性(periodic)生成水位线的
1. 定义和用途
* punctuate:为每条消息都尝试生成watermark,这提供了更细粒度的控制,但增加了不必要的计算开销
* periodic:周期性的生成watermark,可以通过env.getConfig().setAutoWatermarkInterval(1 * 1000L)设置周期间隔,默认是200ms。这样更加高效,并且对于大多数场景已经够用
2. 社区趋势
* punctuate已经过期,
* periodic:通常被作为推荐的方式来生成watermark

点击查看代码
package flink.shangguigu_test;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class Tublingwindow_pv {
    private static final String KAFKA_SERVER = " 10.210.44.10:9092,10.210.44.33:9092,10.210.44.17:9092";
    private static final String KAFKA_TOPIC = "flink_test";
    private static final String KAFKA_GROUP_ID = "flink_test_consumer_group_id_001";


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(1L); //每个一分钟分配一条水位线,默认是200毫秒 设置为1,是为了让每一条数据都生成watermark,便于分析结果数据


        env.enableCheckpointing(5* 1000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3*1000,5* 1000));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop-ha/user/flink/cluster_yarn/checkpoints");










        DataStream<String> input = env.socketTextStream("127.0.0.1", 8085).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value != null && value != "";
            }
        });

        SingleOutputStreamOperator<String> assign_event_time = input
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                                    @Override
                                    public long extractTimestamp(String element, long recordTimestamp) {
                                        JSONObject jsonObject = JSON.parseObject(element);
                                        Long operate_time = jsonObject.getLong("operate_time");
                                        return operate_time * 1000L;
                                    }
                                })
                );


        SingleOutputStreamOperator<Tuple2<String,Long>> process = assign_event_time.process(new ProcessFunction<String, Tuple2<String,Long>>() {
            @Override
            public void processElement(String value, Context ctx, Collector<Tuple2<String,Long>> out) throws Exception {

               Long watermark =  ctx.timerService().currentWatermark();
                out.collect(Tuple2.of("时间戳:" + JSON.parseObject(value).getLong("operate_time") +
                        " 事件时间:" + ctx.timestamp() +
                        " 上一条水位:" + ctx.timerService().currentWatermark() +
                        " 当前水位:" + (ctx.timestamp() - 2000 -1 >  watermark ? ctx.timestamp() - 2000 -1 : watermark)
                        ,1L)
                );

            }


        });


        SingleOutputStreamOperator<String> aggregate = process.windowAll(TumblingEventTimeWindows.of(Time.seconds(2))).trigger(EventTimeTrigger.create()).aggregate(new AggregateFunction<Tuple2<String, Long>, Tuple2<StringBuilder,Long>, String>() {
            @Override
            public Tuple2<StringBuilder, Long> createAccumulator() {
                return Tuple2.of(new StringBuilder(),0L);
            }

            @Override
            public Tuple2<StringBuilder, Long> add(Tuple2<String, Long> value, Tuple2<StringBuilder, Long> accumulator) {
                return Tuple2.of(accumulator.f0.append(value.f0).append("-->\n"),value.f1 + accumulator.f1);
            }

            @Override
            public String getResult(Tuple2<StringBuilder, Long> accumulator) {
                return accumulator.f0 + "==" + accumulator.f1;
            }

            @Override
            public Tuple2<StringBuilder, Long> merge(Tuple2<StringBuilder, Long> a, Tuple2<StringBuilder, Long> b) {
                return null;
            }
        });



        aggregate.print();
        env.execute();


    }
}

样例数据

{"UUID":"","operate_time":"1718718279"}

{"UUID":"","operate_time":"1718718279"}

{"UUID":"","operate_time":"1718718280"}

{"UUID":"","operate_time":"1718718280"}

{"UUID":"","operate_time":"1718718281"}

{"UUID":"","operate_time":"1718718285"}

{"UUID":"","operate_time":"1718718280"}

{"UUID":"","operate_time":"1718718279"}

执行后的结果:

时间戳:1718718260 事件时间:1718718260000 上一条水位:-9223372036854775808 当前水位:1718718257999-->
==1
时间戳:1718718279 事件时间:1718718279000 上一条水位:1718718257999 当前水位:1718718276999-->
时间戳:1718718279 事件时间:1718718279000 上一条水位:1718718276999 当前水位:1718718276999-->
==2
时间戳:1718718280 事件时间:1718718280000 上一条水位:1718718276999 当前水位:1718718277999-->
时间戳:1718718280 事件时间:1718718280000 上一条水位:1718718277999 当前水位:1718718277999-->
时间戳:1718718281 事件时间:1718718281000 上一条水位:1718718277999 当前水位:1718718278999-->
==3
(时间戳:1718718285事件时间:1718718285000 水位:1718718278999 当前水位:1718718282999,1)

结论:
flink的初始化水位是Long.min_value = -9223372036854775808,后序自动生成的watermark必须是大于0的,比如你设置第一条的event_time,计算出来的watermark=-2001,这个值不会被保留,也就是你第二条数据打印出来的上一条watermark还是-9223372036854775808
Flink中使用的都是毫秒,抽取数据中的event_time(如果是秒)时要转换成毫秒
水位线的计算方式是:按照当前最大时间戳-允许乱序的时间-1
在上述代码i中,watermark分配使用的是forBoundedOutOfOrderness,这个函数是周期性分配watermark,我们可以通过env.getConfig().setAutoWatermarkInterval(200L)设置水位周期性生成的时间间隔,默认是200ms。
watermark作为单独的流数据在flink中流转,ctx.timerService().currentWatermark()打印的是上一个水位值,所以我们要根据最新的event_time计算最新的水位值。水位值单调递增的,如果计算出来最新的水位值比当前的水位值小,则抛弃,否则更新逻辑时钟的水位值。
窗口的关闭和计算是根据最新的水位值判断的。当watermark大于窗口的关闭时间则触发窗口的执行。

标签:1.14,api,flink,水位,org,apache,import
From: https://www.cnblogs.com/datadevelop/p/18258987

相关文章

  • flink 如果是有序流,还需要 forMonotonousTimestamps吗
    如果数据是有序的,即数据完全按照时间发生的顺序到达,那么在flink中,虽然理论上不需要额外的Watermark策略来标识数据的有序性,但使用forMonotonousTimestamps策略仍然有其必要性。以下是详细解释:水位的作用即使数据完全有序,flink的窗口计算仍然需要watermark来触发。watermark提......
  • Flink状态(一)
    key状态和算子状态key状态key状态总是与key有关,只能被用于keyedStream类型的函数与算子。你可以认为key状态是一种被分区的算子状态,每一个key有一个状态分区。每一个key状态逻辑上由<parellel-operator-instance,key>唯一确定,由于每一个key只分布在key算子的多个并发实例中的一......
  • Flink状态(二)
    Flink提供了不同的状态存储方式,并说明了状态如何存和存储在哪里。状态可以被存储在Jvm的堆和堆外。根据状态存储方式的不同,Flink也能代替应用管理状态,意思是Flink能够进行内存管理(有必要的时候,可能会溢出到硬盘),允许应用保存非常大的状态。默认情况下,在配置文件flink-conf.yam......
  • Flink 窗口计算
    Flink窗口计算1.背景2.Watermark3.Watermark与Window之间的关系4.Window窗口计算1.背景在当今大数据时代,实时数据处理的需求日益增长,Flink的窗口计算在这一领域中发挥着至关重要的作用。窗口计算使得我们能够将无界的数据流切分成有意义的片段,从而进行......
  • Flink - [08] 状态一致性
    题记部分 一、什么是状态一致性  有状态的流处理,内部每个算子任务都可以有自己的状态。对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确、一条数据也不应该丢失,也不应该重复计算,在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正......
  • Flink1.17.0-报错: java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.De
    背景:启动Flink的sql-client.sh,创建Kafka的source端表,然后查询Kafka的数据时报错。报错信息:2024-06-1816:10:12org.apache.flink.util.FlinkException:GlobalfailuretriggeredbyOperatorCoordinatorfor'Source:kafka_rmc_cust_analog_u[1]'(operatorbc764cd8ddf7a0c......
  • Flink - [07] 容错机制
    题记部分 一、一致性检查点  Flink故障恢复机制的核心,就是应用状态的一致性检查点。有状态流应用的一致性检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。 二、从检查点恢复状态  在......
  • Flink - [06] 状态管理
    题记部分 一、Flink中的状态由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态。可以认为状态就是一个本地变量,可以被任务的业务逻辑访问。Flink会进行状态管理,包括状态一致性、故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑在Flin......
  • Flink - [05] 时间语义 & Watermark
    题记部分 一、时间语义Flink中的时间语义分为以下,(1)EventTime:事件创建的时间(2)IngestionTime:数据进入Flink的时间(3)ProcessingTime:执行操作算子的本地系统事件,与机器相关 哪种时间语义更重要?不同的时间语义有不同的应用场合,我们往往更关心事件时间(Event Time)某些......
  • Flink - [03] API
    使用scala编写flinkapi从不同的数据源(源端)读取数据,并进行无界流/有界流的数据处理,最终将处理好的数据sink到对应的目标端 一、maven配置<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.or......