Flink 使用介绍相关文档目录
背景
本篇从近期遇到的Stream Java API 问题,引出TypeInformation
的使用。
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(XXXTest.java:77)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
这个问题的原因是由于JVM运行时候会擦除类型(泛型类型),Flink无法准确的获取到数据类型。因此,在使用Java API的时候,我们需要手工指定类型。使用Scala的时候无需指定。
需要使用SingleOutputStreamOperator
的returns
方法来指定算子的返回数据类型。
- returns(Class<T> typeClass):使用Class的方式指定返回数据类型。
- returns(TypeHint<T> typeHint):使用
TypeHint
方式指定返回数据类型,通常泛型类型需要使用TypeHint
来指定。 - returns(TypeInformation<T> typeInfo):使用
TypeInformation
指定。
TypeInformation
TypeInformation
是Flink类型系统的核心,是生成序列化/反序列化工具和Comparator
的工具类。同时它还是连接schema和编程语言内部类型系统的桥梁。
我们可以使用of
方法创建TypeInformation
:
- of(Class<T> typeClass):从
Class
创建。 - of(TypeHint<T> typeHint):从
TypeHint
创建。
TypeHint
由于泛型类型在运行时会被JVM擦除,所以说我们无法使用TypeInformation.of(XXX.class)
方式指定带有泛型的类型。
为了可以支持泛型类型,Flink引入了TypeHint
。例如我们需要获取Tuple2<String, Long>
的类型信息,可以使用如下方式:
TypeInformation<Tuple2<String, Long>> info = TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){});
// 或者
TypeInformation<Tuple2<String, Long>> info = new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo();
Types
在Flink中经常使用的类型已经预定义在了Types
中。它们的serializer/deserializer和Comparator
已经定义好了。
Tuple
类型既可以使用TypeHint
指定又可以使用Types
指定。例如Tuple2<String, Integer>
类型我们可以使用如下
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})
Types.TUPLE(Types.STRING, Types.INT)
方式定义。