首页 > 其他分享 >fink泛型参数问题和TypeHint TypeInformation Types区别

fink泛型参数问题和TypeHint TypeInformation Types区别

时间:2024-03-08 16:12:26浏览次数:34  
标签:TypeHint java String fink TypeInformation add 泛型 returns Types

TypeHint,TypeInformation,Types 区别

TypeInformation是flink的类型定义,TypeHint是描述用于描述泛型参数的辅助类,Types 是一个封装了常用TypeInformation的工具类

描述问题

下面一段代码的有两个参数,第一个来自数据流元素,他的本质是入参。第二个是出参,效果和返回值类似
第一个参数是数据流元素,可以通过推断得到。第二个参数不能通过推测得到,因为只有程序执行了才知道它的类型。

一般我们是用返回值来接收输出参数的,如果是出参中使用泛型就是不明确的类型。

DataStreamSource<String> dataStreamSource = environment.fromElements(
        "add 1 value1","add 5 value3","add 3 value2",
        "add 2 value1","add 6 value3","add 4 value2" );
/*DataStreamSource<String> dataStreamSource = environment.fromElements(
        "1","2","3");*/

dataStreamSource.flatMap((String a, Collector<String> b) -> {
    String[] s = a.split(" ");
    for (String item : s) {
        b.collect(item);
    }
});

运行程序抛出如下异常
The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved。

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:560)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:177)
	at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:611)
	at bigdata.flink.Issue.type(Issue.java:72)
	... 27 more

解决办法两个

  • 使用匿名内部类代替lamda表达式

    dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String a, Collector<String> b) throws Exception {
            String[] s = a.split(" ");
            for (String item : s) {
                b.collect(item);
            }
        }
    });
    
  • 给此算子指定明确的类型
    rdd.returns( Types.STRING )指定类型

    DataStreamSource<String> dataStreamSource = environment.fromElements(
            "add 1 value1","add 5 value3","add 3 value2",
            "add 2 value1","add 6 value3","add 4 value2" );
    /*DataStreamSource<String> dataStreamSource = environment.fromElements(
            "1","2","3");*/
    
    dataStreamSource.flatMap((String a, Collector<String> b) -> {
        String[] s = a.split(" ");
        for (String item : s) {
            b.collect(item);
        }
    }).returns( Types.STRING );
    

出现异常原因

java的泛型是假泛型,java的泛型擦除机制保留的细节不够多,尤其是在使用函数接口的时候。
函数接口是运行时生成的,内部类是编译时就生成的,并且内部类保留了更多细节。

fink的 所有带有出参的接口都会有类似的问题,比如flatMap算子,比如 process 算子

FlatMapFunction接口

image-20240308154334032

ProcessFunc接口

image-20240308154441769

returns函数

returns有三个种重载

dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String a, Collector<String> b) throws Exception {
                String[] s = a.split(" ");
                for (String item : s) {
                    b.collect(item);
                }
            }
        })
		//returns的三种重载
        .returns(String.class)
        .returns(new TypeHint<String>(){})
        .returns(Types.STRING);

当出参没有泛型嵌套的时候他们是等价的都能用,比如上面 出参是 Collector

如果出参是复杂泛型,比如 Collector<List> ,那么returns(String.class) 就不能用了

//这种是不合法,已经丢失了内嵌泛型
.returns(List<String>.class.class)  
//下面两种还是可以
.returns(new TypeHint<List<String>>(){})
.returns(Types.LIST(Types.STRING));

TypeHint 类似json转java对象的第三个参数 TypeReference,用于保存泛型细节。

通过 TypeHint可以得到 TypeInformation,TypeInformation.of(typeHint)的返回值就是TypeInformation
Types.LIST(Types.STRING)返回的也是TypeInformation,只是官提供的工具方法

标签:TypeHint,java,String,fink,TypeInformation,add,泛型,returns,Types
From: https://www.cnblogs.com/cxygg/p/18061218

相关文章

  • Flink 使用之 TypeInformation 由于泛型类型在运行时会被JVM擦除,所以要指定类型
    Flink使用之TypeInformation由于泛型类型在运行时会被JVM擦除,所以要指定类型Flink使用介绍相关文档目录Flink使用介绍相关文档目录背景本篇从近期遇到的StreamJavaAPI问题,引出TypeInformation的使用。Exceptioninthread"main"org.apache.flink.api.common.functi......
  • flink 的安装以及fink-cdc 基于多数据源导入的es 的简单使用
    此文档是参照flink-cdc文档(https://ververica.github.io/flink-cdc-connectors/master/content/快速上手/mysql-postgres-tutorial-zh.html)案例 的最佳实践1.下载flinkrelease最新版本1.18.0并解压, https://repo.maven.apache.org/maven2/org/apache/flink/flink-......
  • Fink
    一。Flink和Spark一样,是一个大数据处理引擎。主要区别在于Flink做的是流处理,Spark做的是批处理。二。Flink处理的是无界的和有界的数据流,做有状态的计算。Flink设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。三。Flink具有以下几个特点:支持流处理和批处......
  • Kafka+Fink 实战+工具类
    LogServiceImpl@Service@Slf4jpublicclassLogServiceImplimplementsLogService{privatestaticfinalStringTOPIC_NAME="ods_link_visit_topic";@AutowiredprivateKafkaTemplatekafkaTemplate;/***记录日志**......
  • Fink集群搭建
    Fink集群搭建1、Flink集群搭建Local本地运行模式在IDEA上运行就可以了,主要用于开发StandAlone模式Flink自带的资源管理框架,不需要依赖于任何其他的框架1、上传解......