首页 > 其他分享 >Flink实时写Hudi报NumberFormatException异常

Flink实时写Hudi报NumberFormatException异常

时间:2024-03-13 12:32:54浏览次数:32  
标签:java stream Flink util apache NumberFormatException Hudi fileId

Flink实时写Hudi报NumberFormatException异常

问题描述

在Flink项目中,针对Hudi表 xxxx_table 的 bucket_write 操作由于 java.lang.NumberFormatException 异常而从运行状态切换到失败状态。异常信息显示在解析字符串"ddd7a1ec"为整数时出现了问题。报错如下:

bucket_write: xxxx_table switched from RUNNING to FAILED with failure cause: java.lang.NumberFormatException: For input string: "ddd7a1ec"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$1(BucketStreamWriteFunction.java:162)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:160)
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:112)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)

原因分析:

报错相关源码如下:

  public static int bucketIdFromFileId(String fileId) {
    return Integer.parseInt(bucketIdStrFromFileId(fileId));
  }
  public static String bucketIdStrFromFileId(String fileId) {
    return fileId.substring(0, 8);
  }

通过查看 BucketIdentifier 源代码,发现 bucketIdFromFileId 方法尝试将 fileId 参数的子字符串解析为整数。 fileId 应包含作为前缀的桶标识符,而 bucketIdStrFromFileId 方法则通过取 fileId 的前8个字符来提取桶标识符。

这说明异常发生在解析历史数据文件时。查看hdfs文件目录发现,历史数据文件未按照桶索引逻辑进行编写,正常桶索引写入的文件名具有桶标识符作为前缀,而历史文件则缺乏此桶标识符前缀。因此,在尝试从历史文件名中解析桶标识符时,由于缺少预期的桶标识符前缀,解析过程失败。

那么历史数据是谁写入的呢?经过调查发现,该非法的历史数据是由下游系统为方便调试程序时写入。


解决方案:

删除无用的非法数据文件即可解决。如果非法文件数据有留存必要,那可能要备份后再处理。

为确保该问题不再发生,需要确保历史数据文件遵循桶索引逻辑,这样可以保证解析过程顺利进行,避免 java.lang.NumberFormatException 异常的发生。

标签:java,stream,Flink,util,apache,NumberFormatException,Hudi,fileId
From: https://blog.csdn.net/qq_36382892/article/details/136674842

相关文章

  • 【快捷部署】002_Flink
    Flink一键安装(本地模式)install-flink.sh脚本内容#!/bin/bash####变量###执行脚本的当前目录mydir=$(cd"$(dirname"$0")";pwd)echo$mydir#flink安装目录flink=/flink#检查点目录cp=$flink/checkpoints/#保留点目录sp=$flink/savepoints/#tasknumber数量ta......
  • flink部署模式和运行模式
    flink部署模式部署模式:flink里面的计算程序运行的方式sessionsession模式一个flink集群可以跑多个计算任务,资源共享session模式下集群是提前启动的,然后向flink集群提交jobper-job(高版本已经不推荐了)per-job模式下,一个集群只跑一个计算任务,资源独立,集群的启动是跟随......
  • centos7安装flink
    local模式环境说明,flink需要jdk,并且flin.2k1.17,需要的是jdk11,jdk17不行,实测jdk1.8也行下载flink包wgethttps://dlcdn.apache.org解压#解压tar-zxvfflink-1.17.2-bin-scala_2.12.tgz#进入flink目录cdflink-1.17.2修改配置文件viconf/flink-conf.yaml#允......
  • Flink CDC简介-flinkcdc-jian-jie
    FlinkCDC官方文档什么是FlinkCDC¶FlinkCDCConnectors是ApacheFlink的一组源连接器,使用变更数据捕获(CDC)从不同数据库中获取变更。FlinkCDCConnectors集成Debezium作为捕获数据变化的引擎。所以它可以充分发挥Debezium的能力。详细了解Debezium是什么。支......
  • Flink CDC 写 StarRocks
    Flink版本:1.17.1CDC版本:2.3.0StarRocks版本:2.5.8前言最近需要实时同步几个Mysql表到StarRocks,薅出之前写的Demo代码,简单改造了一下,加了个配置文件,可以通过修改配置文件指定source、sink表,这样就不用讲表名什么的写死到代码里面。再利用flinksession模式,把一堆任......
  • flink 中的水位线(Watermark)
    水位线Watermark实时统计使用了flinksql程序,使用flink-TVF表值函数滚动窗口按分钟进行数据聚合操作,消费的kafka数据需要在规定的时间窗口内进行推送数据并消费计算,为了解决处理乱序事件或延迟数据引入了Watermark,用来设置延迟计算时间等待迟到的数据,但不能无限期的等下去,必......
  • flink总结
    基本概念介绍flink的基本处理流程读取数据(source)->各种算子计算处理数据(rdd)-->输出数据(sink)有界流和无界流如果是从文件有限数据的地方读取数据就是有界流,如果是接到kafka或者socket这种地方就是无界流。有状态和无状态算子计算的过程中,是否要保存中间结算结果......
  • 使用 SPL 高效实现 Flink SLS Connector 下推
    作者:潘伟龙(豁朗)背景日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入SLS进行存储、分析;阿里云Flink是阿里云基于ApacheFlink构建的大数据分析平台......
  • 【Flink入门修炼】2-2 Flink State 状态
    什么是状态?状态有什么作用?如果你来设计,对于一个流式服务,如何根据不断输入的数据计算呢?又如何做故障恢复呢?一、为什么要管理状态流计算不像批计算,数据是持续流入的,而不是一个确定的数据集。在进行计算的时候,不可能把之前已经输入的数据全都保存下来,然后再和新数据合并计算。......
  • Flink AggregatingState 实例
    FlinkAggregatingState实例AggregatingState介绍AggregatingState需要和AggregateFunction配合使用add()方法添加一个元素,触发AggregateFunction计算get()获取State的值需求:计算每个设备10秒内的平均温度importorg.apache.flink.api.common.eventtime.SerializableTimesta......