flink支持的序列化类型
- 官方支持
- java tuples and scala caseclasses
- java pojos
- primitive types
- regular classes
- values
- hadoop writables
- speclal Types
- 验证代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().disableGenericTypes();
Tuple2<String, String> tuple2 = new Tuple2<>();
tuple2.f0 = "a";
tuple2.f1 = "b";
DataStreamSource<Tuple2<String, String>> tupleStream = env.fromElements(tuple2);
tupleStream.print();
// 1. list测试
List<String> list = new ArrayList<>();
list.add("hadoop");
list.add("hive");
list.add("spark");
list.add("flink");
DataStreamSource<List<String>> listStream = env.fromElements(list);
listStream.print("listStream");
// 2. map测试
Map<String, String> map = new HashMap<>();
map.put("a","java");
map.put("b","python");
map.put("c","scala");
DataStreamSource<Map<String, String>> mapStream = env.fromElements(map);
mapStream.print("mapStream");
// 3. json数据测试
JSONObject json = new JSONObject();
json.put("a","java");
json.put("b","python");
json.put("c", 123);
DataStreamSource<JSONObject> jsonStream = env.fromElements(json);
jsonStream.print("jsonStream");
env.execute();
-
测试结果
class java.util.ArrayList does not contain a setter for field size Class class java.util.ArrayList cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. class java.util.HashMap does not contain a getter for field threshold class java.util.HashMap does not contain a setter for field threshold Class class java.util.HashMap cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. class com.alibaba.fastjson2.JSONObject does not contain a getter for field accessOrder class com.alibaba.fastjson2.JSONObject does not contain a setter for field accessOrder Class class com.alibaba.fastjson2.JSONObject cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.ArrayList is treated as a generic type.
-
分析结果
- 禁用掉 generic type 程序会直接报错
- flink 范用类型都是采用的 kryo序列化器进行的序列化
-
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.HashMap is treated as a generic type.
-
由于泛型擦除导致被识别为 泛用类型
-
添加相应的类型推断即可 dataStream.returns(Types.MAP(Types.STRING,Types.STRING))
-