测试 Flink 作业
a)JUnit 规则 MiniClusterWithClientResource
Apache Flink 提供了一个名为 MiniClusterWithClientResource
的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource
.
要使用 MiniClusterWithClientResource
,需要添加一个额外的依赖项(测试范围)。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.19.0</version>
<scope>test</scope>
</dependency>
示例:MapFunction
;
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1;
}
}
在本地 Flink 集群使用这个 MapFunction
的简单 pipeline,如下所示。
public class ExampleIntegrationTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L)
.map(new IncrementMapFunction())
.addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
}
// create a testing sink
private static class CollectSink implements SinkFunction<Long> {
// must be static
public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Long value, SinkFunction.Context context) throws Exception {
values.add(value);
}
}
}
使用 MiniClusterWithClientResource
进行集成测试的注意:
- 为了不将整个 pipeline 代码从生产复制到测试,请将 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
- 这里使用
CollectSink
中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,可以使用测试的 sink 将数据写入临时目录的文件中。 - 如果作业使用事件时间定时器,则可以实现自定义的 并行 源函数来发出 watermark。
- 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
- 优先使用
@ClassRule
而不是@Rule
,这样多个测试可以共享同一个 Flink 集群。可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。 - 如果 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。