首页 > 其他分享 >54、Flink 测试工具测试 Flink 作业详解

54、Flink 测试工具测试 Flink 作业详解

时间:2024-07-01 09:55:54浏览次数:28  
标签:pipeline 54 Flink 集群 测试 测试工具 MiniClusterWithClientResource public

测试 Flink 作业
a)JUnit 规则 MiniClusterWithClientResource

Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.19.0</version>    
    <scope>test</scope>
</dependency>

示例:MapFunction

public class IncrementMapFunction implements MapFunction<Long, Long> {

    @Override
    public Long map(Long record) throws Exception {
        return record + 1;
    }
}

在本地 Flink 集群使用这个 MapFunction 的简单 pipeline,如下所示。

public class ExampleIntegrationTest {

     @ClassRule
     public static MiniClusterWithClientResource flinkCluster =
         new MiniClusterWithClientResource(
             new MiniClusterResourceConfiguration.Builder()
                 .setNumberSlotsPerTaskManager(2)
                 .setNumberTaskManagers(1)
                 .build());

    @Test
    public void testIncrementPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new IncrementMapFunction())
                .addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
    }

    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {

        // must be static
        public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            values.add(value);
        }
    }
}

使用 MiniClusterWithClientResource 进行集成测试的注意

  • 为了不将整个 pipeline 代码从生产复制到测试,请将 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
  • 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,可以使用测试的 sink 将数据写入临时目录的文件中。
  • 如果作业使用事件时间定时器,则可以实现自定义的 并行 源函数来发出 watermark。
  • 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
  • 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。
  • 如果 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。

标签:pipeline,54,Flink,集群,测试,测试工具,MiniClusterWithClientResource,public
From: https://blog.csdn.net/m0_50186249/article/details/140093154

相关文章

  • 53、Flink 测试工具测试用户自定义函数详解
    1.测试用户自定义函数a)单元测试无状态、无时间限制的UDF示例:无状态的MapFunction。publicclassIncrementMapFunctionimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longrecord)throwsException{returnrecord+1;}......
  • 【Flink metric(3)】chunjun是如何实现脏数据管理的
    文章目录一.基础逻辑二.DirtyManager1.初始化2.收集脏数据并check3.关闭资源三.DirtyDataCollector1.初始化2.收集脏数据并check3.run:消费脏数据4.释放资源四.LogDirtyDataCollector一.基础逻辑脏数据管理模块的基本逻辑是:当数据消费失败时,将脏数据......
  • Codeforces Round 954 (Div. 3)
    A.XAxis题意:给3个x轴上的点xi,我们要放置一个点到x轴上,到这3个点的距离最短。(1<=xi<=10)思路:直接暴力破解即可inta,b,c;inlineintcal(intx){ returnabs(x-a)+abs(x-b)+abs(x-c);}voidsolve(){ cin>>a>>b>>c; intans=(int)1e9; for......
  • Postman接口测试工具详解
    个人名片......
  • Postman接口测试工具详解
    一、引言在现代软件开发和测试流程中,接口测试占据了举足轻重的地位。接口作为系统与系统之间、模块与模块之间数据交互的桥梁,其稳定性和可靠性直接关系到整个系统的性能和用户体验。Postman作为一款强大的接口测试工具,凭借其简单易用、功能丰富等特点,深受开发者和测试人员......
  • [题解]CF154B Colliders
    思路首先我们将两种操作分开讨论:Part1加入操作那么,我们可以用一个数组\(vis_i=0/1\)表示\(i\)是关闭/开启状态,\(p_i\)表示因数有\(i\)的数。如果$vis_x=1$,说明此机器在之前已经启动过了,输出Success。然后,对\(x\)分解质因数,将质因数全部塞进一个集合\(a\)......
  • [题解]AT_agc054_b [AGC054B] Greedy Division
    思路首先不难发现一个规律,当\(sum\)为奇数时不可能有解。定义\(dp_{i,j,k,0/1}\)表示A在前\(i\)个数中选出和为\(j\)的\(k\)个数,且第\(i\)个不选/选的方案数。那么,我们只需要对于第\(i\)个数的状态分类讨论就能得到状态转移方程:不选\(i\),\(dp_{i,j,k,0}=......
  • 从工具产品体验对比spark、hadoop、flink
    作为一名大数据开发,从工具产品的角度,对比一下大数据工具最常使用的框架spark、hadoop和flink。工具无关好坏,但人的喜欢有偏好。目录评价标准1效率2用户体验分析从用户的维度来看从市场的维度来看从产品的维度来看3用户体验的基本原则成本和产出是否成正比操作是否“......
  • Apache Flink 和 Apache Spark详细介绍、优缺点、使用场景以及选型抉择?
    ApacheFlink和ApacheSpark我该投入谁的怀抱?ApacheFlink简介:ApacheFlink是一个用于分布式流处理和批处理的开源框架。它以实时数据处理和事件驱动的流处理著称,提供高吞吐量和低延迟的处理能力。功能:流处理:Flink可以处理实时数据流,支持低延迟和高吞吐量的流处理......
  • 【广度优先搜索 深度优先搜索 图论】854. 相似度为 K 的字符串
    本文涉及知识点广度优先搜索深度优先搜索图论图论知识汇总深度优先搜索汇总C++BFS算法LeetCode854.相似度为K的字符串对于某些非负整数k,如果交换s1中两个字母的位置恰好k次,能够使结果字符串等于s2,则认为字符串s1和s2的相似度为k。给你两个字母......