首页 > 其他分享 >Flink(八):DataStream API (五) Join

Flink(八):DataStream API (五) Join

时间:2025-01-17 14:29:29浏览次数:3  
标签:... DataStream 元素 join timestamp Flink API flink 窗口

1. Window Join

Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunction,用户可以用它们输出符合 join 要求的结果。常见的用例可以总结为以下代码

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>);

语义上有一些值得注意的地方:

  • 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
  • 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为 [5, 10) 窗口中的元素在 join 之后的 timestamp 为 9。

1.1 滚动 Window Join

使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!

如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ... 的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction。注意,滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

1.2 滑动 Window Join

当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。

本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

1.3 会话 Window Join

使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!

这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

2. Interval Join

Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。当一对元素被传递给 ProcessJoinFunction,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context 访问)。Interval join 目前仅支持 event time。

上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive() 和 .upperBoundExclusive() 可以将它们排除在外。

图中三角形所表示的条件也可以写成更加正式的表达式:orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String>(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });

标签:...,DataStream,元素,join,timestamp,Flink,API,flink,窗口
From: https://blog.csdn.net/weixin_41914554/article/details/145186085

相关文章

  • RestAPI实现聚合
    API语法聚合条件与query条件同级别,因此需要使用request.source()来指定聚合条件。聚合的结果解析:@OverridepublicMap<String,List<String>>filters(RequestParamsparams){try{//1.准备RequestSearchRequestrequest=newSearchRequest("h......
  • 老照片修复API——老照片修复、老照片上色,修复划痕、斑点
    老照片修复:让记忆重现光彩随着数字化技术的飞速发展,老照片的修复已经不再是一个费时费力的任务。许多人怀念过去的时光,希望能够将珍贵的老照片修复,保存那些珍贵的记忆。今天,我们的网站提供了一款高效的老照片修复API,致力于让用户轻松修复老照片,恢复过去的美好瞬间。传统......
  • [Babel] Intro Babel - 05. API
    APIs关于babel里面的APIs主要位于@babel/core这个依赖里面,你可以在官网左下角的ToolingPackages分类下找到这个依赖包。这里顺便介绍一下每一种依赖包的作用:@babel/parser:是Babel的解析器,用于将源代码转换为AST。@babel/core:Babel的核心包,它提供了Babel的......
  • 【微服务】使用 Apifox、Postman 测试 Dubbo 服务,Apache Dubbo OpenAPI 即将发布
    ApacheDubboOpenAPI简介1.1设计背景在微服务体系中,RPC服务的文档管理、测试、调用协作一直都是影响研发效能的关键一环,这些难题通常是由于RPC的特性所决定的:RPC服务的定义方式、RPC协议格式不一,缺少放之宇宙而皆准的统一规范。长期以来,ApacheDubbo的开发者们也面临同......
  • 使用Python爬虫获取1688网站item_get_company API接口的公司档案信息
    一、引言在当今的商业环境中,获取供应商的详细信息对于采购决策、市场分析和供应链管理至关重要。1688作为中国领先的B2B电子商务平台,提供了丰富的供应商档案信息。通过使用1688的item_get_companyAPI接口,我们可以方便地获取这些信息。本文将详细介绍如何使用Python爬虫来调用该A......
  • php禁止跨域调用api(来自文心快码)
    在PHP中,禁止跨域调用API通常涉及到设置正确的HTTP响应头,以告知浏览器不允许来自不同源的请求。跨域资源共享(CORS)是一个W3C标准,它允许服务器放宽同源策略(SOP),从而允许某些跨站请求。要禁止跨域调用,你需要配置服务器以拒绝这些请求。以下是一些在PHP中禁止跨域调用的方法:1.使用.hta......
  • 深入探索Vue.js 3中基于Composition API的动态组件开发
    在前端开发中,组件是构建用户界面的基础,而Vue.js作为一种流行的前端框架,也提供了灵活强大的组件机制。在本文中,我们将深入探索基于Vue.js3的CompositionAPI,开发一个动态组件加载的技术方案。这项技术对于那些需要高可维护性和按需加载的应用来说尤其重要。什么是动态组件加......
  • 一步一步教你打造实用API接口
    在当今的软件开发领域,API(应用程序编程接口)已经成为不同系统之间数据交互的基石。一个设计良好、功能实用的API接口不仅能够提升系统的可扩展性和灵活性,还能极大地提高开发效率。本文将详细指导你如何一步一步地打造一个实用的API接口。一、明确API接口的需求与目标在动手之前,首......
  • 【答题系统可参考】php 禁止api被跨域调用
    在PHP中,防止API被跨域调用可以通过设置适当的HTTP响应头来实现。跨域资源共享(CORS,Cross-OriginResourceSharing)机制允许或拒绝来自不同源的请求。为了禁止跨域调用,你可以在你的PHP脚本中设置 Access-Control-Allow-Origin 头为 null 或者不设置这个头。下面是一个......
  • API Parrot:破解没有公开接口的网站
    在数字化浪潮中,开发者们常常面临一个挑战:如何高效地自动化、集成或抓取那些没有公开API的网站数据?今天,我要给大家介绍一款强大的工具——APIParrot,它绝对是开发者们的新利器!APIParrotAPIParrot是一款专门设计用于反向工程任何网站HTTPAPI的工具。它为开发者提供了一站......