首页 > 其他分享 >50、Flink的单元测试介绍及示例

50、Flink的单元测试介绍及示例

时间:2024-01-14 13:31:53浏览次数:38  
标签:flink 示例 Flink 单元测试 new org apache import public

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


(文章目录)


本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、Flink测试概述

Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。 本文示例的maven依赖

	<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.13</version>
		</dependency>
		<dependency>
			<groupId>org.mockito</groupId>
			<artifactId>mockito-core</artifactId>
			<version>4.0.0</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

二、测试用户自定义函数

可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。

1、单元测试无状态、无时间限制的 UDF

1)、示例-mapFunction

以下无状态的 MapFunction 为例

	public class IncrementMapFunction implements MapFunction<Long, Long> {

		@Override
		public Long map(Long record) throws Exception {
			return record + 1;
		}
	}

通过传递合适地参数并验证输出,可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。

import static org.junit.Assert.assertEquals;

import org.apache.flink.api.common.functions.MapFunction;
import org.junit.Test;

/**
 * @author alanchan
 *
 */
public class TestDemo {
	public class IncrementMapFunction implements MapFunction<Long, Long> {

		@Override
		public Long map(Long record) throws Exception {
			return record + 1;
		}
	}

	@Test
	public void testIncrement() throws Exception {
		IncrementMapFunction incrementer = new IncrementMapFunction();
		assertEquals((Long) 3L, incrementer.map(2L));
	}

}

2)、示例-flatMapFunction

对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

/**
 * @author alanchan
 *
 */
@RunWith(MockitoJUnitRunner.class)
public class TestDemo2 {
	public static class IncrementFlatMapFunction implements FlatMapFunction<String, Long> {

		@Override
		public void flatMap(String value, Collector<Long> out) throws Exception {
			Long sum = 0L;
			for (String num : value.split(",")) {
				sum += Long.valueOf(num);
			}
			out.collect(sum);
		}

	}

	@Test
	public void testSum() throws Exception {
		IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();

		Collector<Long> collector = mock(Collector.class);
		incrementer.flatMap("1,2,3,4,5", collector);

		Mockito.verify(collector, times(1)).collect(15L);
	}
}

2、对有状态或及时 UDF 和自定义算子进行单元测试

对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  • OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
  • KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
  • TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
  • KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。

1)、DataStream API 测试依赖

如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.17.2</version>
    <scope>test</scope>
</dependency>

在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

2)、Table API 测试依赖

如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-test-utils</artifactId>
    <version>1.17.2</version>
    <scope>test</scope>
</dependency>

这将自动引入查询计划器和运行时,分别用于计划和执行查询。

flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。

3)、flatmap function 单元测试

现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

  • 示例如下
/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
 */
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

public class TestStatefulFlatMapDemo3 {

    static class AlanFlatMapFunction implements FlatMapFunction<Integer, Integer> {
        @Override
        public void flatMap(Integer value, Collector<Integer> out) throws Exception {
            if (value % 2 == 0) {
                out.collect(value);
                out.collect(value * value);
            }
        }

    }

    OneInputStreamOperatorTestHarness<Integer, Integer> testHarness;

    @Before
    public void setupTestHarness() throws Exception {
        StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new AlanFlatMapFunction());

        testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
        testHarness.open();
    }

    @Test
    public void testFlatMap2() throws Exception {
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

        testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
        testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
        testHarness.processWatermark(new Watermark(initialTime + 2));
        testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
        testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
        testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
        testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
        testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
        testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));

        expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
        expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
        expectedOutput.add(new Watermark(initialTime + 2));
        expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
        expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
        expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
        expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
        expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
        expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));

        TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());

    }
}

KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。

  • 示例如下
/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 按照城市分类,并将城市缩写变成大写
 */
import com.google.common.collect.Lists;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

public class TestStatefulFlatMapDemo2 {
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private int id;
        private String name;
        private int age;
        private String city;
    }

    static class AlanFlatMapFunction extends RichFlatMapFunction<User, User> {
        // The state is only accessible by functions applied on a {@code KeyedStream}
        ValueState<User> previousInput;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            previousInput = getRuntimeContext()
                    .getState(new ValueStateDescriptor<User>("previousInput", User.class));
        }

        @Override
        public void flatMap(User input, Collector<User> out) throws Exception {
            previousInput.update(input);
            input.setCity(input.getCity().toUpperCase());
            out.collect(input);
        }
    }

    AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
    OneInputStreamOperatorTestHarness<User, User> testHarness;

    @Before
    public void setupTestHarness() throws Exception {
        alanFlatMapFunction = new AlanFlatMapFunction();

        testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
                new KeySelector<User, String>() {

                    @Override
                    public String getKey(User value) throws Exception {
                        return value.getCity();
                    }
                }, Types.STRING);
        
        testHarness.open();
    }

    @Test
    public void testFlatMap() throws Exception {
        testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);

        ValueState<User> previousInput = alanFlatMapFunction.getRuntimeContext().getState(
                new ValueStateDescriptor<>("previousInput", User.class));
        User stateValue = previousInput.value();

        Assert.assertEquals(
                Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
                testHarness.extractOutputStreamRecords());

        Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);

        testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
        Assert.assertEquals(
                Lists.newArrayList(
                        new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
                        new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
                testHarness.extractOutputStreamRecords());
        Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());

    }
}

4)、Process Function 单元测试

除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。

  • OneInputStreamOperatorTestHarness示例

import com.google.common.collect.Lists;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestProcessOperatorDemo1 {
    // public abstract class KeyedProcessFunction<K, I, O>
    static class AlanProcessFunction extends KeyedProcessFunction<String, String, String> {

        @Override
        public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
                Collector<String> out) throws Exception {
            ctx.timerService().registerProcessingTimeTimer(50);
            out.collect("vx->" + value);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 到达时间点触发事件操作
            out.collect(String.format("定时器在 %d 被触发", timestamp));
        }

    }

    private OneInputStreamOperatorTestHarness<String, String> testHarness;
    private AlanProcessFunction processFunction;

    @Before
    public void setupTestHarness() throws Exception {
        processFunction = new AlanProcessFunction();

        testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
                new KeyedProcessOperator<>(processFunction),
                x -> "1",
                Types.STRING);
        // Function time is initialized to 0
        testHarness.open();
    }

    @Test
    public void testProcessElement() throws Exception {
        testHarness.processElement("alanchanchn", 10);
        Assert.assertEquals(
                Lists.newArrayList(
                        new StreamRecord<>("vx->alanchanchn", 10)),
                testHarness.extractOutputStreamRecords());
    }

    @Test
    public void testOnTimer() throws Exception {
        // test first record
        testHarness.processElement("alanchanchn", 10);
        Assert.assertEquals(1, testHarness.numProcessingTimeTimers());

        // Function time 设置为 100
        testHarness.setProcessingTime(100);
        Assert.assertEquals(
                Lists.newArrayList(
                        new StreamRecord<>("vx->alanchanchn", 10),
                        new StreamRecord<>("定时器在 100 被触发")),
                testHarness.extractOutputStreamRecords());
    }
}

  • ProcessFunctionTestHarnesses示例

本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。

import java.util.Arrays;
import java.util.Collections;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * 
 * @LastEditors: alanchan
 * 
 * @Description:
 */
public class TestProcessOperatorDemo3 {

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private int id;
        private String name;
        private int age;
        private String city;
    }

    // 测试ProcessFunction 的 processElement
    @Test
    public void testProcessFunction() throws Exception {
        // public abstract class ProcessFunction<I, O>
        ProcessFunction<String, String> function = new ProcessFunction<String, String>() {

            @Override
            public void processElement(
                    String value, Context ctx, Collector<String> out) throws Exception {
                out.collect("vx->" + value);
            }
        };

        OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
                .forProcessFunction(function);

        harness.processElement("alanchanchn", 10);

        Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));
    }

    // 测试KeyedProcessFunction 的 processElement
    @Test
    public void testKeyedProcessFunction() throws Exception {
        // public abstract class KeyedProcessFunction<K, I, O>
        KeyedProcessFunction<String, String, String> function = new KeyedProcessFunction<String, String, String>() {

            @Override
            public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
                    Collector<String> out) throws Exception {
                out.collect("vx->" + value);
            }

        };

        OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
                .forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);

        harness.processElement("alanchan", 10);

        Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
    }

    // 测试CoProcessFunction 的 processElement1、processElement2
    @Test
    public void testCoProcessFunction() throws Exception {
        // public abstract class CoProcessFunction<IN1, IN2, OUT>
        CoProcessFunction<String, User, User> function = new CoProcessFunction<String, User, User>() {

            @Override
            public void processElement1(String value, CoProcessFunction<String, User, User>.Context ctx,
                    Collector<User> out) throws Exception {
                String[] userStr = value.split(",");
                out.collect(
                        new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
            }

            @Override
            public void processElement2(User value, CoProcessFunction<String, User, User>.Context ctx,
                    Collector<User> out) throws Exception {
                out.collect(value);
            }

        };

        TwoInputStreamOperatorTestHarness<String, User, User> harness = ProcessFunctionTestHarnesses
                .forCoProcessFunction(function);

        harness.processElement2(new User(2, "alan", 19, "bj"), 100);
        harness.processElement1("1,alanchan,18,sh", 10);

        Assert.assertEquals(harness.extractOutputValues(),
                Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
    }

    // 测试KeyedCoProcessFunction 的 processElement1和processElement2
    @Test
    public void testKeyedCoProcessFunction() throws Exception {
        // public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT>
        KeyedCoProcessFunction<String, String, User, User> function = new KeyedCoProcessFunction<String, String, User, User>() {

            @Override
            public void processElement1(String value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
                    Collector<User> out) throws Exception {
                String[] userStr = value.split(",");
                out.collect(
                        new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
            }

            @Override
            public void processElement2(User value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
                    Collector<User> out) throws Exception {
                out.collect(value);
            }

        };

        // public static <K,IN1,IN2,OUT>
        // KeyedTwoInputStreamOperatorTestHarness<K,IN1,IN2,OUT>
        // forKeyedCoProcessFunction(
        // KeyedCoProcessFunction<K,IN1,IN2,OUT> function,
        // KeySelector<IN1,K> keySelector1,
        // KeySelector<IN2,K> keySelector2,
        // TypeInformation<K> keyType)
        KeyedTwoInputStreamOperatorTestHarness<String, String, User, User> harness = ProcessFunctionTestHarnesses
                .forKeyedCoProcessFunction(function, new KeySelector<String, String>() {

                    @Override
                    public String getKey(String value) throws Exception {
                        return value.split(",")[3];
                    }

                }, new KeySelector<User, String>() {

                    @Override
                    public String getKey(User value) throws Exception {
                        return value.getCity();
                    }

                }, TypeInformation.of(String.class));

        harness.processElement2(new User(2, "alan", 19, "bj"), 100);
        harness.processElement1("1,alanchan,18,sh", 10);

        Assert.assertEquals(harness.extractOutputValues(),
                Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
    }

    // 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement
    @Test
    public void testBroadcastOperator() throws Exception {

        // 定义广播
        // 数据格式:
        // sh,上海
        // bj,北京
        // public class MapStateDescriptor<UK, UV>
        MapStateDescriptor<String, String> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
                String.class,
                String.class);

        // public abstract class BroadcastProcessFunction<IN1, IN2, OUT>
        // * @param <IN1> The input type of the non-broadcast side.
        // * @param <IN2> The input type of the broadcast side.
        // * @param <OUT> The output type of the operator.
        BroadcastProcessFunction<User, String, User> function = new BroadcastProcessFunction<User, String, User>() {

            // 负责处理广播流的元素
            @Override
            public void processBroadcastElement(String value, BroadcastProcessFunction<User, String, User>.Context ctx,
                    Collector<User> out) throws Exception {
                System.out.println("收到广播数据:" + value);
                // 得到广播流的存储状态
                ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);
            }

            // 处理非广播流,关联维度
            @Override
            public void processElement(User value, BroadcastProcessFunction<User, String, User>.ReadOnlyContext ctx,
                    Collector<User> out) throws Exception {
                // 得到广播流的存储状态
                ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastDesc);

                value.setCity(state.get(value.getCity()));
                out.collect(value);
            }

        };

        BroadcastOperatorTestHarness<User, String, User> harness = ProcessFunctionTestHarnesses
                .forBroadcastProcessFunction(function, broadcastDesc);

        harness.processBroadcastElement("sh,上海", 10);
        harness.processBroadcastElement("bj,北京", 20);

        harness.processElement(new User(2, "alan", 19, "bj"), 10);
        harness.processElement(new User(1, "alanchan", 18, "sh"), 30);

        Assert.assertEquals(harness.extractOutputValues(),
                Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));
    }
}

三、测试 Flink 作业

1、JUnit 规则 MiniClusterWithClientResource

Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.17.2</version>    
    <scope>test</scope>
</dependency>

让我们采用与前面几节相同的简单 MapFunction来做示例。

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
package com.win;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;

public class TestExampleIntegrationDemo {

    static class AlanIncrementMapFunction implements MapFunction<Long, Long> {
        @Override
        public Long map(Long record) throws Exception {
            return record + 1;
        }
    }

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                    .setNumberSlotsPerTaskManager(2)
                    .setNumberTaskManagers(1)
                    .build());

    @Test
    public void testIncrementPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new AlanIncrementMapFunction())
                .addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L)));
    }

    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {

        // must be static
        public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            values.add(value);
        }
    }
}

关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:

  • 为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。

  • 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。

  • 如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。

  • 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。

  • 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。

  • 如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。

以上,本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

标签:flink,示例,Flink,单元测试,new,org,apache,import,public
From: https://blog.51cto.com/alanchan2win/9240374

相关文章

  • c#中DataTable转List的2种方法示例
    1. 直接写一个datatable转list的类publicList<Dictionary<string,object>>DatatoTable(DataTabledt){List<Dictionary<string,object>>list=newList<Dictionary<string,object>>();foreach(DataRowdrindt.Rows)//每一行......
  • AntDesignBlazor示例——暗黑模式
    本示例是AntDesignBlazor的入门示例,在学习的同时分享出来,以供新手参考。示例代码仓库:https://gitee.com/known/BlazorDemo1.学习目标暗黑模式切换查找组件样式覆写组件样式2.添加暗黑模式切换组件1)双击打开MainLayout.razor文件,在header区域添加Switch组件及其事件来......
  • 构建高效外卖配送系统:技术要点与示例代码
    随着外卖服务的普及,构建一个高效的外卖配送系统成为餐饮业务成功的关键。在这篇文章中,我们将探讨外卖配送系统的关键技术要点,并提供一些示例代码,演示其中的一些实现方法。1.订单处理与管理在外卖配送系统中,订单处理是一个核心环节。以下是一个简化的订单类的示例代码,用Python语言......
  • 51、Flink的管理执行(执行配置、程序打包和并行执行)的介绍及示例
    文章目录Flink系列文章一、执行配置二、程序打包和分布式运行1、打包程序2、总结三、并行执行1、设置并行度1)、算子层次2)、执行环境层次3)、客户端层次4)、系统层次2、设置最大并行度本文介绍了Flink的管理执行的三个内容,即执行配置、打包和分布式运行以及并行执行(设置并行度的几......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggre
    文章目录Flink系列文章一、maven依赖二、Row-based操作1、本示例的公共代码1、Map2、FlatMap3、Aggregate4、GroupWindowAggregate5、FlatAggregate本文介绍了通过TableAPI基于行的map、flatmap、aggregate、groupwindowaggregate和flataggregate操作,并以示例进行展示操......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
    文章目录Flink系列文章一、maven依赖二、示例:基本的程序结构三、示例:通过TableAPI和SQL创建表四、示例:通过TableAPI和SQL创建视图1、示例:通过SQL创建视图2、示例:通过TableAPI创建视图五、示例:通过API查询表和使用窗口函数的查询1、示例:基本的查询表2、示例:Tumble窗口查询表......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
    文章目录Flink系列文章一、maven依赖二、时态表的join1、统计需求对应的SQL2、Withoutconnnector实现代码3、Withconnnector实现代码1)、bean定义2)、序列化定义3)、实现本文通过两个示例介绍了时态表TemporalTableFunction的join操作。本文除了maven依赖外,没有其他依赖。一、m......
  • 50、Flink的单元测试介绍及示例
    文章目录Flink系列文章一、Flink测试概述二、测试用户自定义函数1、单元测试无状态、无时间限制的UDF1)、示例-mapFunction2)、示例-flatMapFunction2、对有状态或及时UDF和自定义算子进行单元测试1)、DataStreamAPI测试依赖2)、TableAPI测试依赖3)、flatmapfunction单元测试......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、slidi
    文章目录Flink系列文章一、maven依赖二、Groupwindow1、Tumble(TumblingWindows)2、Slide(SlidingWindows)3、Session(SessionWindows)本文介绍了表的groupwindows三种窗口(tumbling、sliding和session)操作,以示例形式展示每个操作的结果。本文除了maven依赖外,没有其他依......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersec
    文章目录Flink系列文章一、maven依赖二、表的union、unionall、intersect、intersectall、minus、minusall和in的操作本文介绍了表的union、unionall、intersect、intersectall、minus、minusall和in的操作,以示例形式展示每个操作的结果。本文除了maven依赖外,没有其他依赖。本文......