首页 > 其他分享 >Flink (九):DataStream API (六) Process Function

Flink (九):DataStream API (六) Process Function

时间:2025-01-17 15:31:52浏览次数:3  
标签:... DataStream 定时器 Process ctx state API 时间 apache

1. ProcessFunction

ProcessFunction 是一种底层的流处理操作,基于它用户可以访问(无环)流应用程序的所有基本构建块

  • 事件(流元素)
  • 状态(容错,一致性,仅在 keyed stream 上)
  • 定时器(事件时间和处理时间,仅在 keyed stream 上)

可以将 ProcessFunction 视为一种可以访问 keyed state 和定时器的 FlatMapFunction。Flink 为收到的输入流中的每个事件都调用该函数来进行处理。对于容错,与其它有状态的函数类似,ProcessFunction 可以通过 RuntimeContext 访问 Flink 的keyed state。定时器允许应用程序对处理时间和 事件时间中的更改做出反应。 每次调用 processElement(...) 时参数中都会提供一个 Context 对象,该对象可以访问元素的事件时间戳和 TimerService。 

TimerService 可用于为将来特定的事件时间/处理时间注册回调。 特定事件时间的 onTimer(...) 回调函数会在当前对齐的 watermark 超过所注册的时间戳时调用。 特定处理时间的 onTimer(...) 回调函数则会在系统物理时间超过所注册的时间戳时调用。 在该调用期间,所有状态会被再次绑定到创建定时器时的键上,从而允许定时器操作与之对应的 keyed state。如果想要访问 keyed state 和定时器,需要在 keyed stream 上使用 ProcessFunction

stream.keyBy(...).process(new MyProcessFunction());

2. 底层 Join

为了在两个输入上实现底层操作,应用程序可以使用 CoProcessFunction 或 KeyedCoProcessFunction。 这些函数绑定两个不同的输入,从两个不同的输入中获取元素并分别调用 processElement1(...) 和 processElement2(...) 进行处理。

实现底层 join 一般需要遵循以下模式:

  • 为一个输入(或两者)创建状态对象。
  • 从某个输入接收元素时更新状态。
  • 从另一个输入接收元素时,查询状态并生成 join 结果。

例如,你可能会将客户数据与金融交易进行 join,同时想要保留客户数据的状态。如果你希望即使在出现乱序事件时仍然可以得到完整且确定的 join 结果,你可以通过注册一个定时器在客户数据流的 watermark 已经超过当前这条金融交易记录时计算和发送 join 结果。

在下面的例子中,KeyedProcessFunction 维护每个键的计数,并且每次超过一分钟(事件时间)没有更新时输出一次键/计数对。

  • 计数,键和最后修改时间存储在 ValueState 中,它由键隐式限定范围。
  • 对于每条记录,KeyedProcessFunction 递增计数器并设置最后修改时间。
  • 对于每条记录,该函数还会注册了一个一分钟后(事件时间)的回调函数。
  • 在每次回调时,它会根据注册的时间和最后修改时间进行比较,如果正好差一分钟则 输出键/计数对(即,在该分钟内没有进一步更新)

这个简单的例子本身可以用会话窗口(session window)实现, 这里我们使用 KeyedProcessFunction 来展示使用它的基本模式。

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// 源数据流
DataStream<Tuple2<String, String>> stream = ...;

// 使用 process function 来处理一个 Keyed Stream 
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(value -> value.f0)
    .process(new CountWithTimeoutFunction());

/**
 * 在状态中保存的数据类型
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * 用来维护数量和超时的 ProcessFunction 实现
 */
public class CountWithTimeoutFunction 
        extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {

    /** 由 process function 管理的状态 */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(OpenContext openContext) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(
            Tuple2<String, String> value, 
            Context ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // 获得当前的数量
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // 更新状态中的数量
        current.count++;

        // 将状态中的最后修改时间改为记录的事件时间
        current.lastModified = ctx.timestamp();

        // 将更新后的状态写回
        state.update(current);

        // 注册一个 60s 之后的事件时间回调 
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(
            long timestamp, 
            OnTimerContext ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // 获得注册该回调时使用的键对应的状态
        CountWithTimestamp result = state.value();

        // 检查当前回调时否是最新的回调还是后续注册了新的回调
        if (timestamp == result.lastModified + 60000) {
            // 超时后发送状态
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}

在 Flink 1.4.0 之前,在调用处理时间定时器时,ProcessFunction.onTimer() 方法将当前的处理时间设置为事件时间的时间戳。此行为非常不明显,用户可能不会注意到。 然而,这样做是有害的,因为处理时间的时间戳是不确定的,并且和 watermark 不一致。此外,用户依赖于此错误的时间戳来实现逻辑很有可能导致非预期的错误。 因此,我们决定对其进行修复。在 1.4.0 后,使用此错误的事件时间时间戳的 Flink 作业将失败,用户应将其作业更正为正确的逻辑。

3. KeyedProcessFunction

KeyedProcessFunction 是 ProcessFunction 的一个扩展, 可以在其 onTimer(...) 方法中访问定时器的键。

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
    K key = ctx.getCurrentKey();
    // ...
}

4. Timers

两种定时器(处理时间定时器和事件时间定时器)都在 TimerService 内部维护,并排队等待执行。对于相同的键和时间戳,TimerService 会删除重复的定时器,即每个键和时间戳最多有一个定时器。如果为同一时间戳注册了多个定时器,则只调用一次 onTimer() 方法。Flink 会同步 onTimer() 和 processElement() 的调用,因此用户不必担心状态的并发修改。

4.1 Fault Tolerance

定时器支持容错,它会和应用程序的状态一起进行 checkpoint。当进行故障恢复或从保存点启动应用程序时,定时器也会被恢复。当应用程序从故障中恢复或从保存点启动时,可能会发生这种情况。即:在恢复之前就应该触发的处理时间定时器会立即触发。

除了使用基于 RocksDB backend 的增量 snapshots 并使用基于 Heap 的定时器的情况外,Flink 总是会异步执行计算器的快照操作。 大量定时器会增加 checkpoint 的时间,因为定时器是需要 checkpoint 的状态的一部分。

4.2 Timer Coalescing

由于 Flink 中每个键和时间戳只保存一个定时器,因此可以通过降低定时器的精度来合并它们,从而减少定时器的数量。

对于精度为 1 秒(事件或处理时间)的定时器,可以将目标时间向下舍入为整秒。定时器最多会提前 1 秒,但不迟于要求的毫秒精度。 这样,每个键在每秒内最多有一个定时器。

long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);

由于事件时间定时器仅在 watermark 到来时才触发,因此还可以将下一个 watermark 到达前的定时器与当前定时器合并:

long coalescedTime = ctx.timerService().currentWatermark() + 1;
ctx.timerService().registerEventTimeTimer(coalescedTime);

定时器也可以按照以下方式被停止或者删除:

停止处理时间定时器:

long timestampOfTimerToStop = ...;
ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);

停止事件时间定时器:

long timestampOfTimerToStop = ...;
ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);

如果没有注册给定时间戳的定时器,则停止定时器不会产生影响。

标签:...,DataStream,定时器,Process,ctx,state,API,时间,apache
From: https://blog.csdn.net/weixin_41914554/article/details/145206216

相关文章

  • 对接美团外卖霸王餐api接口需要考虑那些因素
    美团外卖霸王餐是美团平台上推出的一种营销活动,用户可以通过参与此活动以极低的价格甚至免费获得外卖餐品。这种活动通常由商家为了宣传或提高知名度而赞助。对于商家来说,霸王餐是一种有效的营销手段,可以增加销量、提升店铺评分和品牌曝光度。对于消费者而言,参与霸王餐活动可......
  • 餐饮行业点餐API接口对接开发详细介绍
    一、点餐API接口的核心功能1.菜单管理允许餐厅管理员或员工通过API添加、编辑、删除菜单项。支持设置菜单项的价格、描述、图片等属性。2.订单处理顾客可以通过API提交订单,包括选择菜品、数量、特殊要求等。餐厅端可以接收、查看、确认或拒绝订单。3.支付集成API可以与......
  • Flink(八):DataStream API (五) Join
    1. WindowJoinWindowjoin作用在两个流中有相同key且处于相同窗口的元素上。这些窗口可以通过 windowassigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunction,用户可以用它们输......
  • RestAPI实现聚合
    API语法聚合条件与query条件同级别,因此需要使用request.source()来指定聚合条件。聚合的结果解析:@OverridepublicMap<String,List<String>>filters(RequestParamsparams){try{//1.准备RequestSearchRequestrequest=newSearchRequest("h......
  • Issac Gym出现error: subprocess-exited-with-error报错
    1.前言一方面便于日后自己的温故学习,另一方面也便于大家的学习和交流。如有不对之处,欢迎评论区指出错误,你我共同进步学习!2.正文我在安装humanoidgympipinstall-e.的时候,出现下列问题:解决方法:pipinstall--upgradesetuptools没解决就先卸载setuptools,再直接安......
  • 老照片修复API——老照片修复、老照片上色,修复划痕、斑点
    老照片修复:让记忆重现光彩随着数字化技术的飞速发展,老照片的修复已经不再是一个费时费力的任务。许多人怀念过去的时光,希望能够将珍贵的老照片修复,保存那些珍贵的记忆。今天,我们的网站提供了一款高效的老照片修复API,致力于让用户轻松修复老照片,恢复过去的美好瞬间。传统......
  • [Babel] Intro Babel - 05. API
    APIs关于babel里面的APIs主要位于@babel/core这个依赖里面,你可以在官网左下角的ToolingPackages分类下找到这个依赖包。这里顺便介绍一下每一种依赖包的作用:@babel/parser:是Babel的解析器,用于将源代码转换为AST。@babel/core:Babel的核心包,它提供了Babel的......
  • 【微服务】使用 Apifox、Postman 测试 Dubbo 服务,Apache Dubbo OpenAPI 即将发布
    ApacheDubboOpenAPI简介1.1设计背景在微服务体系中,RPC服务的文档管理、测试、调用协作一直都是影响研发效能的关键一环,这些难题通常是由于RPC的特性所决定的:RPC服务的定义方式、RPC协议格式不一,缺少放之宇宙而皆准的统一规范。长期以来,ApacheDubbo的开发者们也面临同......
  • IAR编译工程报错:CreateProcess failed:The system cannot find the file specified
    IAR安装完成后,编译STM32的工程时报如下错误信息:CreateProcessfailed:Thesystemcannotfindthefilespecified全网检索各种答案都有,尝试重新安装、版本升级等都没能解决。因为之前有过安装8.32版本,卸载后又进行安装的9.2版本,抱着试探的心理,打开注册表将和IAR相关......
  • 使用Python爬虫获取1688网站item_get_company API接口的公司档案信息
    一、引言在当今的商业环境中,获取供应商的详细信息对于采购决策、市场分析和供应链管理至关重要。1688作为中国领先的B2B电子商务平台,提供了丰富的供应商档案信息。通过使用1688的item_get_companyAPI接口,我们可以方便地获取这些信息。本文将详细介绍如何使用Python爬虫来调用该A......