首页 > 其他分享 >flink stream转table POJO对象遇到的坑

flink stream转table POJO对象遇到的坑

时间:2024-08-02 12:16:44浏览次数:12  
标签:stream statDateTime flink POJO private LocalDateTime Integer entityId public

核心代码

public class TrackLog {
    private Integer entityId;
    // flink的时间类型,必须使用LocalDateTime
    private LocalDateTime statDateTime;
	public Integer getEntityId() {
        return entityId;
    }

    public void setEntityId(Integer entityId) {
        this.entityId = entityId;
    }
	public LocalDateTime getStatDateTime() {
        return statDateTime;
    }

    public void setStatDateTime(LocalDateTime statDateTime) {
        this.statDateTime = statDateTime;
    }
}

SideOutputDataStream<TrackLog> patrolStream = traceStream.getSideOutput(outputLogTag);
Table table = tableEnv.fromDataStream(patrolStream);
table.printSchema();

会输出:

(
  `entityId` INT,
  `statDateTime` RAW('java.time.LocalDateTime', '...')
)

问题一: 往POJO类(TrackLog)中private 属性isDup,未定义getter方法

public class TrackLog {
    private Integer entityId;
    // flink的时间类型,必须使用LocalDateTime
    private LocalDateTime statDateTime;
	
	private boolean isDup = false;
	
	public Integer getEntityId() {
        return entityId;
    }

    public void setEntityId(Integer entityId) {
        this.entityId = entityId;
    }
	public LocalDateTime getStatDateTime() {
        return statDateTime;
    }

    public void setStatDateTime(LocalDateTime statDateTime) {
        this.statDateTime = statDateTime;
    }
}

再运行:

(
  `f0` RAW('com.tide.entity.TrackLog', '...')
)

schema中,只有f0一个field,类型是TrackLog,也就是说,在把POJO类的fields映射到表时,出现了问题。
很奇怪,debug了好久才发现问题所在。

标签:stream,statDateTime,flink,POJO,private,LocalDateTime,Integer,entityId,public
From: https://www.cnblogs.com/xushengbin/p/18338491

相关文章

  • 使用GZipStream类在C#中进行数据压缩和解压缩操作
    GZipStream是.NET中用于实现GZip算法的类。GZip是一种用于压缩和解压缩数据的算法,广泛应用于文件压缩和网络传输等场景GZip算法简介GZip是基于DEFLATE算法的压缩方法,由Jean-LoupGailly和MarkAdler创建,最初用于Unix系统中的gzip工具。GZip主要用于减少文件大小以便更高效地存储......
  • Flink的DateStream API中的ProcessWindowFunction和AllWindowFunction两种用于窗口处
    目录ProcessWindowFunctionAllWindowFunction具体区别ProcessWindowFunction示例AllWindowFunction示例获取时间不同,一个数据产生的时间一个是数据处理的时间ProcessWindowFunctionAllWindowFunction具体示例ProcessWindowFunction示例AllWindowFunction示例总......
  • 当我尝试在 flink 集群上运行 Beam Pipeline 时,为什么会出现 ERROR:root:java.lang.Nu
    我正在尝试在本地托管的Flink集群上运行一个简单的Beam管道,但在执行此操作时遇到错误。我已经尝试了在互联网上可以找到的所有内容。importapache_beamasbeamfromapache_beam.ioimportReadFromTextfromapache_beam.ioimportWriteToTextfromapache_beam.option......
  • Java中的数据流处理框架:Apache Flink
    Java中的数据流处理框架:ApacheFlink大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们来探讨一下Java中的数据流处理框架——ApacheFlink。Flink是一款用于处理数据流和批处理的分布式处理框架。它具有高吞吐量、低延迟和容错的特性,广泛应用于实时......
  • 使用Spring Cloud Stream处理Java消息流
    使用SpringCloudStream处理Java消息流大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们来探讨如何使用SpringCloudStream来处理Java消息流。SpringCloudStream是一个用于构建消息驱动微服务的框架,能够与各种消息中间件集成,如RabbitMQ、Kafka......
  • (六)Redis 消息队列 List、Streams
    Redis适合做消息队列吗?有什么解决方案?首先要明白消息队列的消息存取需求和工作流程。1、消息队列我们一般把消息队列中发送消息的组件称为生产者,把接收消息的组件称为消费者,下图是一个通用的消息队列的架构模型:消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重......
  • Flink-CDC
    Flink-CDC(FlinkChangeDataCapture)是一个基于Flink计算框架的数据集成工具,它实现了对数据库变更数据的捕获、处理和传输,支持全量和增量数据的一体化读取。下面将从多个方面对Flink-CDC进行详细的解析。一、Flink-CDC概述1.1CDC简介CDC(ChangeDataCapture)是变更数据捕......
  • 从流读取时,PyAudio Stream 导致 Windows 堆损坏(-1073740940 (0xC0000374))
    我在尝试读取PyAudio的Stream时遇到了问题。它因退出代码而崩溃-1073740940这是一个Windows堆损坏错误0xC0000374它发生在我从PyAudio流读取的行中,如下所示:stream.read(chunk_size)我也看到它崩溃了-1073741819ACCESS_VIOLATION_......
  • InputStream inputStream = classLoader.getResourceAsStream("aaa.properties") ; 
    问:InputStreaminputStream=classLoader.getResourceAsStream("aaa.properties"); 获取到的 inputStream 是null答:当您尝试使用ClassLoader的getResourceAsStream方法来获取一个资源文件(如"aaa.properties")的InputStream,但得到的结果是null时,这通常意味着资源文......
  • Java8 Stream操作流10条常用方法
    1.filter过滤满足条件的元素2.mapmap方法将每个元素转换成另一个类型,并返回新的流3.flatMap与map类似,可以用来扁平化多层嵌套4.distinct去除流中的重复元素5.sorted对流中的元素进行排序6.limit可以限制流中的元素数量7.skip可以跳过流中的前n个元素8.reduce......