首页 > 其他分享 >Flink 双流联结——窗口联结(Window Join)

Flink 双流联结——窗口联结(Window Join)

时间:2022-09-06 11:13:55浏览次数:53  
标签:join Flink 联结 JoinFunction Tuple2 window Join 窗口

对于两条流的合并,很多情况我们并不是简单地将所有数据放在一起,而是希望根据某个字段的值将它们联结起来,“配对”去做处理。例如用传感器监控火情时,我们需要将大量温度传感器和烟雾传感器采集到的信息,按照传感器ID分组、再将两条流中数据合并起来,如果同时超过设定阈值就要报警。我们发现,这种需求与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义状态和TimerService灵活实现各种需求,其实已经能够处理双流合并的大多数场景。基于时间的操作,最基本的当然就是时间窗口了。我们之前已经介绍过Window API的用法,主要是针对单一数据流在某些时间段内的处理计算。那如果我们希望将两条流的数据进行合并、且同样针对某段时间进行处理和统计,Flink为这种场景专门提供了一个窗口联结(window join)算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

1、窗口联结的调用

窗口联结在代码中的实现,首先需要调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下:

        stream1.join(stream2)
                .where(<KeySelector>)
                .equalTo(<KeySelector>)
                .window(<WindowAssigner>)
                .apply(<JoinFunction>)

上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的key;而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。传入的JoinFunction也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。JoinFunction在源码中的定义如下:

        public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
            OUT join(IN1 first, IN2 second) throws Exception;
        }

这里需要注意,JoinFunciton并不是真正的“窗口函数”,它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑。当然,既然是窗口计算,在.window()和.apply()之间也可以调用可选API去做一些自定义,比如用.trigger()定义触发器,用.allowedLateness()定义允许延迟时间,等等。

2、窗口联结的处理流程

JoinFunction中的两个参数,分别代表了两条流中的匹配的数据。这里就会有一个问题:什么时候就会匹配好数据,调用.join()方法呢?接下来我们就来介绍一下窗口join的具体处理流程。两条流的数据到来之后,首先会按照key分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数(first,second)传入JoinFunction的.join()方法进行计算处理,得到的结果直接输出如图8-8所示。所以窗口中每有一对数据成功联结匹配,JoinFunction的.join()方法就会被调用一次,并输出一个结果。

除了JoinFunction,在.apply()方法中还可以传入FlatJoinFunction,用法非常类似,只是内部需要实现的.join()方法没有返回值。结果的输出是通过收集器(Collector)来实现的,所以对于一对匹配数据可以输出任意条结果。其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:

SELECT *
FROM table1 t1,
     table2 t2
WHERE t1.id = t2.id;

这句SQL中where子句的表达,等价于inner join ... on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。

3、窗口联结实例

在电商网站中,往往需要统计用户不同行为之间的转化,这就需要对不同的行为数据流,按照用户ID进行分组后再合并,以分析它们之间的关联。如果这些是以固定时间周期(比如1小时)来统计的,那我们就可以使用窗口join来实现这样的需求

public class WindowJoinTest {
    public static void main(String[] args) throws Exception {
        //1、获取执行时间
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.1、便于测试,测试环境设置并行度为 1,生产环境记得设置为 kafka topic 的分区数
        env.setParallelism(1);
        //2.1、读取数据 声明水位线
        SingleOutputStreamOperator<Tuple2<String, Long>> stream = env.fromElements(
                Tuple2.of("a", 1000L),
                Tuple2.of("b", 1000L),
                Tuple2.of("a", 2000L),
                Tuple2.of("b", 2000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                return element.f1;
                            }
                        }));
        //2.2、读取数据 声明水位线
        SingleOutputStreamOperator<Tuple2<String, Integer>> stream1 = env.fromElements(
                Tuple2.of("a", 3000),
                Tuple2.of("b", 4000),
                Tuple2.of("a", 4500),
                Tuple2.of("b", 5500))
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
                                return element.f1;
                            }
                        }));
        //进行 join 连接
        stream.join(stream1)
                //连接的条件
                .where(data -> data.f0)
                .equalTo(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> first, Tuple2<String, Integer> second) throws Exception {
                        return first + " -> " + second;
                    }
                }).print();
        env.execute();
    }
}

运行效果

(a,1000) -> (a,3000)
(a,1000) -> (a,4500)
(a,2000) -> (a,3000)
(a,2000) -> (a,4500)
(b,1000) -> (b,4000)
(b,2000) -> (b,4000)

可以看到,窗口的联结是笛卡尔积。

标签:join,Flink,联结,JoinFunction,Tuple2,window,Join,窗口
From: https://www.cnblogs.com/wdh01/p/16650173.html

相关文章

  • Flink-状态管理
     流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过90度时发......
  • sql语法:inner join on, left join on, right join on详细使用方法
    sql语法:innerjoinon,leftjoinon,rightjoinon详细使用方法 innerjoin(等值连接)只返回两个表中联结字段相等的行leftjoin(左联接)返回包括左表中的所有......
  • Flink1.12学习笔记
    一、Flink简介Flink是有状态的流式计算。Flink是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink可以部署在任意地方,Apacheflink是一......
  • Stream流中使用的Fork/Join框架
            Fork/Join简单案例:使用Fork/join计算1-10000的和当一个任务的计算数量大于3000的时候拆分任务,数量小于3000的时候就计算packagecom.tuling.st......
  • 十一、Flink中的时间和窗口之水位线
    水位线在介绍事件时间语义时,提到了“水位线”的概念,已经知道了它其实就是用来度量事件时间的。那么水位线具体有什么含义,又跟数据的时间戳有什么关系呢?接下来就来深入探讨......
  • 十、Flink中的时间和窗口之时间语义
    时间语义“时间”,从理论物理和哲学的角度解释,可能有些玄妙;但对于我们来说,它其实是生活中再熟悉不过的一个概念。一年365天,每天24小时,时间就像缓缓流淌的河,不疾不徐、无休......
  • Flink 合流操作——Union
    应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。Union最简单的合流操作就是直接将多条流合......
  • Java 使用flink读写kafka中的数据(windows下)
    一、启动服务(网上查)1、启动zookeeper2、启动kafka3、启动flink二、写producerpublicvoidkafkaProducer(List<ResultBean>opcValue)throwsException{......
  • prometheus监控flink
    背景很久没写博客了,今天也算完成了一个小测试。由于flink没有监控的平台,只是自己写了python脚本去监控发报警。flink自己的ui界面其实已经有很多的指标可以看了,但是......
  • 使用docker-compose搭建flink集群
    第一步:安装docker和docker-compose并赋予权限第二步:利用docker-compose构建容器1version:"2.1"2services:3jobmanager:4image:flink:1.9.2-scal......