首页 > 其他分享 >Flink高级特性(2)

Flink高级特性(2)

时间:2023-09-04 10:07:52浏览次数:37  
标签:watermark Flink 高级 特性 并行度 split env new result

watermark 水位线 处理乱序

数据流从数据产生到DataSource,再到具体的算子,中间是有一个过程和时间,有可能会导致数据乱序问题,通过watermark + EventTime来处理。

作用由于网络延迟等原因,一条数据会迟到计算,比如使用event time来划分窗口,我们知道窗口中的数据是计算一段时间的数据,如果一个数据来晚了,它的时间范围已经不属于这个窗口了,则会被丢弃,但他的event time实际上是属于这个窗口的。引入watermark机制则会等待晚到的数据一段时间,等待时间到则触发计算,如果数据延迟很大,通常也会被丢弃或者另外处理。

生成方式

watermark生成方式有两种:

  • 周期性生成watermark【常用】:每隔N秒自动向流中注入一个watermark,由executionConfig.setAutoWatermarkInterval()决定,默认是200毫秒。注意:假如设置3秒延时,使用事件的时间戳,如果有窗口的停止时间等于maxEventTime – 3,那么这个窗口被触发执行,否则一直等待
  • 基于事件生成:基于某些事件触发watermark的生成,每个元素都有机会判断是否生成一个watermark


public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 设置并行度为1 用来演示单并行度下的watermark
        env.setParallelism(1);
        // 设置周期性生成watermark 默认200ms
        env.getConfig().setAutoWatermarkInterval(200);
        // 传输过来的数据格式:10001,19980009908(id,时间戳)
        DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001);
        // 将数据转成Tuple2(id,时间戳)
        SingleOutputStreamOperator<Tuple2<Long, Long>> streamOperator = streamSource.map(new MapFunction<String, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(String values) throws Exception {
                String[] split = values.split(",");
                return new Tuple2<>(Long.valueOf(split[0]), Long.valueOf(split[1]));
            }
        });
        // 分配时间戳和 watermark 设置允许的最大乱序时间范围10秒
        SingleOutputStreamOperator<Tuple2<Long, Long>> watermarkStream = streamOperator.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Long, Long>>(Time.seconds(10)) {
            // 从数据流中抽取时间戳作为EventTime
            @Override
            public long extractTimestamp(Tuple2<Long, Long> longLongTuple2) {
                System.out.println("key:" + longLongTuple2.f0 + ",eventTime:" + longLongTuple2.f1);
                return longLongTuple2.f1;
            }
        });
        watermarkStream.keyBy(0)
                // 按照消息的EventTime分配窗口 和调用TimeWindow效果一样
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                // 全量聚合 参数1:接收的数据 参数2:输出的数据类型
                .apply(new WindowFunction<Tuple2<Long, Long>, String, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<Long, Long>> input, Collector<String> out) throws Exception {
                        ArrayList<Long> result = new ArrayList<>();
                        // 将时间戳放入队列 便于排序
                        input.forEach(in->{
                            result.add(in.f1);
                        });
                        // 对时间戳排序  小的在前
                        Collections.sort(result);
                        // 将目前window内排序后的数据以及window的开始和结束时间打印出来
                        System.out.println("result:"+ JSON.toJSONString(result)+",windowStart:"+timeWindow.getStart()+",windowEnd:"+timeWindow.getEnd());
                        out.collect(JSON.toJSONString(result));
                    }
                }).print();

        env.execute();
    }


延迟数据的处理方式

三种方式:

  • 直接丢弃【默认】:Flink默认的策略就是对迟到的数据直接丢弃
  • 重新激活已经关闭的窗口并重新计算以修正结果。
  • 将迟到数据收集起来另外处理。


并行度

TaskManager和Slot,TaskManager是从节点,Slot个数一般对应的就是TaskManager的cpu数量,Slot用来执行具体的算子。

Flink高级特性(2)_数据

Flink可以通过四种方式设置并行度,优先级从高到低排序:算子层面>执行环境层面>客户端层面>系统层面

// 算子层面
streamSource.map(new MapFunction<String, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(String values) throws Exception {
                String[] split = values.split(",");
                return new Tuple2<>(Long.valueOf(split[0]), Long.valueOf(split[1]));
            }
        }).setParallelism(2)
// 执行环境层面
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

客户端层面:提交任务时通过-p 指定并行度

系统层面:通过设置flink-conf.yaml文件中的parallelism.default属性来设置默认并行度

标签:watermark,Flink,高级,特性,并行度,split,env,new,result
From: https://blog.51cto.com/u_13589027/7345798

相关文章

  • Android并发编程高级面试题汇总(含详细解析 十八)
    Android并发编程高级面试题汇总最全最细面试题讲解持续更新中......
  • 33、Flink之hive介绍与简单示例
    文章目录Flink系列文章一、Table&SQLConnectors示例:ApacheHive1、支持的Hive版本2、依赖项1)、使用Flink提供的Hivejar2)、用户定义的依赖项3)、移动plannerjar包3、Maven依赖4、连接到Hive5、DDL&DML本文介绍了ApacheHive连接器的使用,以具体的示例演示了通过java和......
  • MySQL的优化,三大范式和事务的四大特性
    优化1.对查询进行优化,要尽量避免全表扫描,首先应考虑在where及orderby涉及的列上建立索引。2.应尽量避免在where子句中对字段进行null值判断,否则将导致引擎放弃使用索引而进行全表扫描3.应尽量避免在where子句中使用notin或or或!=或<>操作符,否则将引擎放......
  • java面向对象高级(根据青空的霞光总结)
    #面向对象高级(青空)基本类型包装类前置:虽然java是面向对象的语言,但是基本类型不是面向对象的,如果想要让基本类型也能像面向对象的形式进行表达,就可以是用包装类包装类实际上就是将我们的基本数据类型,封装成一个类(运用了封装的思想)类型:byte->Byteboolean->Booleans......
  • Flink教程:并行度
    Flink的并行度介绍一下?概述Flink的并行度(Parallelism)是指在Flink作业中并行执行任务的程度。它决定了作业中任务的数量以及任务之间的数据划分和分配方式。并行度是一个重要的概念,对于实现高吞吐量和低延迟的流处理非常关键。在Flink中,有两个级别的并行度可以进行配置:作业级别并行......
  • flink教程:Flink的架构包含哪些?介绍下技术架构和运行架构
    Flink的架构包含哪些?介绍下技术架构和运行架构Flink架构分为技术架构和运行架构两部分。技术架构如下图为Flink技术架构:Flink作为流批一体的分布式计算引擎,必须提供面向开发人员的API层,同时还需要跟外部数据存储进行交互,需要连接器,作业开发、测试完毕后,需要提交集群执行,需要......
  • flink教程:flink的有界、无界数据流、流批一体、容错能力等概念
    能否详细解释一下其中的数据流、流批一体、容错能力等概念?概述数据流:所有产生的数据都天然带有时间概念,把事件按照时间顺序排列起来,就形成了一个事件流,也被称作数据流。流批一体:首先必须先明白什么是有界数据和无界数据有界数据,就是在一个确定的时间范围内的数据流,有开始,......
  • flink教程:Flink 和 Spark Streaming的区别?
    Flink和SparkStreaming的区别?Flink和SparkSreaming最大的区别在于:Flink是标准的实时处理引擎,基于事件驱动,以流为核心,而SparkStreaming的RDD实际是一组小批次的RDD集合,是微批(Micro-Batch)的模型,以批为核心。概述下面我们介绍两个框架的主要区别:1.架构模型SparkStreamin......
  • flink基础:什么是Flink?
    什么是Flink?描述一下Flink是一个以流为核心的高可用、高性能的分布式计算引擎。具备流批一体,高吞吐、低延迟,容错能力,大规模复杂计算等特点,在数据流上提供数据分发、通信等功能。ApacheFlink是一个开源的流式处理和批处理框架,旨在处理高吞吐量和低延迟的大规模数据流。它提供了......
  • 函数高级
    函数默认参数、占位参数,函数重载1#include<iostream>2usingnamespacestd;34//1、函数默认参数5//如果传入数据,使用传入的数据,没有则用默认的6//函数声明与实现只能由一个有默认参数7intfunc(inta,intb=20,intc=30)8{9returna+b+c;10}1......