首页 > 其他分享 >53、Flink 测试工具测试用户自定义函数详解

53、Flink 测试工具测试用户自定义函数详解

时间:2024-07-01 09:55:39浏览次数:3  
标签:function testHarness 自定义 Flink 测试 测试工具 new public

1.测试用户自定义函数
a)单元测试无状态、无时间限制的 UDF

示例:无状态的 MapFunction

public class IncrementMapFunction implements MapFunction<Long, Long> {

    @Override
    public Long map(Long record) throws Exception {
        return record + 1;
    }
}

通过传递合适地参数并验证输出进行测试。

public class IncrementMapFunctionTest {

    @Test
    public void testIncrement() throws Exception {
        // instantiate your function
        IncrementMapFunction incrementer = new IncrementMapFunction();

        // call the methods that you have implemented
        assertEquals(3L, incrementer.map(2L));
    }
}

对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来测试。

具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行测试。

public class IncrementFlatMapFunctionTest {

    @Test
    public void testIncrement() throws Exception {
        // instantiate your function
        IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();

        Collector<Integer> collector = mock(Collector.class);

        // call the methods that you have implemented
        incrementer.flatMap(2L, collector);

        //verify collector was called with the right output
        Mockito.verify(collector, times(1)).collect(3L);
    }
}
b)对有状态或及时 UDF 和自定义算子进行单元测试

概述

对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  • OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
  • KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
  • TwoInputStreamOperatorTestHarness (适用于两个 DataStreamConnectedStreams 算子)
  • KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

DataStream API 测试依赖项

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.19.0</version>
    <scope>test</scope>
</dependency>

该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

Table API 测试依赖项

如果想在 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,还要添加以下依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-test-utils</artifactId>
    <version>1.19.0</version>
    <scope>test</scope>
</dependency>

这将自动引入查询计划器和运行时,分别用于计划和执行查询。

可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

public class StatefulFlatMapTest {
    private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
    private StatefulFlatMap statefulFlatMapFunction;

    @Before
    public void setupTestHarness() throws Exception {

        //instantiate user-defined function
        statefulFlatMapFunction = new StatefulFlatMapFunction();

        // wrap user defined function into a the corresponding operator
        testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));

        // optionally configured the execution environment
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50);

        // open the test harness (will also call open() on RichFunctions)
        testHarness.open();
    }

    @Test
    public void testingStatefulFlatMapFunction() throws Exception {

        //push (timestamped) elements into the operator (and hence user defined function)
        testHarness.processElement(2L, 100L);

        //trigger event time timers by advancing the event time of the operator with a watermark
        testHarness.processWatermark(100L);

        //trigger processing time timers by advancing the processing time of the operator directly
        testHarness.setProcessingTime(100L);

        //retrieve list of emitted records for assertions
        assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));

        //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
        //assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
    }
}

KeyedOneInputStreamOperatorTestHarnessKeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformationKeySelector 来实例化。

public class StatefulFlatMapFunctionTest {
    private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
    private StatefulFlatMap statefulFlatMapFunction;

    @Before
    public void setupTestHarness() throws Exception {

        //instantiate user-defined function
        statefulFlatMapFunction = new StatefulFlatMapFunction();

        // wrap user defined function into a the corresponding operator
        testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);

        // open the test harness (will also call open() on RichFunctions)
        testHarness.open();
    }

    //tests

}

在 Flink 代码库里可以找到更多使用这些测试工具的示例,例如:

  • org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest 是测试算子和用户自定义函数(取决于处理时间和事件时间)的一个很好的例子。

注意 AbstractStreamOperatorTestHarness 及其派生类目前不属于公共 API,可以进行更改。

单元测试 Process Function

除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。示例如下:

public static class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {

	@Override
	public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
        out.collect(value);
	}
}

通过传递合适的参数并验证输出,对使用 ProcessFunctionTestHarnesses 是很容易进行单元测试并验证输出。

public class PassThroughProcessFunctionTest {

    @Test
    public void testPassThrough() throws Exception {

        //instantiate user-defined function
        PassThroughProcessFunction processFunction = new PassThroughProcessFunction();

        // wrap user defined function into a the corresponding operator
        OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses
        	.forProcessFunction(processFunction);

        //push (timestamped) elements into the operator (and hence user defined function)
        harness.processElement(1, 10);

        //retrieve list of emitted records for assertions
        assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
    }
}

有关如何使用 ProcessFunctionTestHarnesses 来测试 ProcessFunction 不同风格的更多示例,, 例如 KeyedProcessFunctionKeyedCoProcessFunctionBroadcastProcessFunction等,请自行查看 ProcessFunctionTestHarnessesTest

标签:function,testHarness,自定义,Flink,测试,测试工具,new,public
From: https://blog.csdn.net/m0_50186249/article/details/140093126

相关文章

  • Cesium 实战 - 自定义纹理材质系列之 - 涟漪效果
    Cesium实战-自定义纹理材质系列之-涟漪效果核心代码完整代码在线示例Cesium给实体对象(Entity)提供了很多实用的样式,基本满足普通项目需求;但是作为WebGL引擎,肯定不够丰富,尤其是动态效果样式。对于实体对象(Entity),可以通过自定义材质,实现各种动态效果,虽......
  • 【Flink metric(3)】chunjun是如何实现脏数据管理的
    文章目录一.基础逻辑二.DirtyManager1.初始化2.收集脏数据并check3.关闭资源三.DirtyDataCollector1.初始化2.收集脏数据并check3.run:消费脏数据4.释放资源四.LogDirtyDataCollector一.基础逻辑脏数据管理模块的基本逻辑是:当数据消费失败时,将脏数据......
  • Cesium 实战 - 自定义纹理材质系列之 - 动态扩散效果
    Cesium实战-自定义纹理材质系列之-动态扩散效果核心代码完整代码在线示例Cesium给实体对象(Entity)提供了很多实用的样式,基本满足普通项目需求;但是作为WebGL引擎,肯定不够丰富,尤其是动态效果样式。对于实体对象(Entity),可以通过自定义材质,实现各种动态效......
  • 【Gradio】Chatbot | 如何使用 Gradio Blocks 创建自定义聊天机器人
    简介重要提示:如果您刚开始接触,我们建议使用 gr.ChatInterface 来创建聊天机器人——它是一个高级抽象,使得可以快速创建漂亮的聊天机器人应用程序,往往只需一行代码。在这里了解更多信息。本教程将展示如何使用Gradio的低级BlocksAPI从头开始制作聊天机器人UI。这将使......
  • 使用fnm安装node,并自定义安装路径
    作者:咕魂时间:2024年6月23日本教程使用winget对fnm进行安装,主要分两部分,第一步安装fnm,第二步安装nodejs其中nodejs配置成功后只在powershell中生效1.fnm的安装假设我们自定义安装路径为:D:\fnm下载安装fnmwingetinstallSchniz.fnm--locationD:\fnm由于要从github上下......
  • Postman接口测试工具详解
    个人名片......
  • Postman接口测试工具详解
    一、引言在现代软件开发和测试流程中,接口测试占据了举足轻重的地位。接口作为系统与系统之间、模块与模块之间数据交互的桥梁,其稳定性和可靠性直接关系到整个系统的性能和用户体验。Postman作为一款强大的接口测试工具,凭借其简单易用、功能丰富等特点,深受开发者和测试人员......
  • 从工具产品体验对比spark、hadoop、flink
    作为一名大数据开发,从工具产品的角度,对比一下大数据工具最常使用的框架spark、hadoop和flink。工具无关好坏,但人的喜欢有偏好。目录评价标准1效率2用户体验分析从用户的维度来看从市场的维度来看从产品的维度来看3用户体验的基本原则成本和产出是否成正比操作是否“......
  • Apache Flink 和 Apache Spark详细介绍、优缺点、使用场景以及选型抉择?
    ApacheFlink和ApacheSpark我该投入谁的怀抱?ApacheFlink简介:ApacheFlink是一个用于分布式流处理和批处理的开源框架。它以实时数据处理和事件驱动的流处理著称,提供高吞吐量和低延迟的处理能力。功能:流处理:Flink可以处理实时数据流,支持低延迟和高吞吐量的流处理......
  • Go自定义数据的序列化流程
    ......