首页 > 其他分享 >Doris写入数据异常提示actual column number in csv file is less than schema column number

Doris写入数据异常提示actual column number in csv file is less than schema column number

时间:2023-07-14 19:00:10浏览次数:50  
标签:actual column 写入 number label dorisStreamLoad Doris

版本信息:

  • Flink 1.17.1
  • Doris 1.2.3
  • Flink Doris Connector 1.4.0

写入方式

采用 String 数据流,依照社区网站的样例代码,在sink之前将数据转换为DataStream,分隔符采用"\t"。

运行异常

通过Stream Load返回结果json中的ErrorUrl可以看到如题的异常

Reason: actual column number in csv file is less than schema column number. actual number: 10, ..., schema column number: 11; src line: [...]

数据库表明明只有10个字段,提示schema column number却是11个。是自己眼花数错字段了吗?经过反复确认及同事确认,没有错,目标表就是10个字段,我写入的也是10个字段,是Flink Doris Connector 的bug吗?

分析过程

既然怀疑是bug,那就去扒代码。
实际数据写入逻辑封装在org.apache.doris.flink.sink.writer.DorisWriter,该类实现了org.apache.flink.api.connector.sink.SinkWriter接口。查看该类发现,写入Doris的过程实际是使用微批写入的。

    @Override
    public void write(IN in, Context context) throws IOException {
        checkLoadException();
        byte[] serialize = serializer.serialize(in);
        if(Objects.isNull(serialize)){
            //ddl record
            return;
        }
        if(!loading) {
            //Start streamload only when there has data
            dorisStreamLoad.startLoad(currentLabel);
            loading = true;
        }
        dorisStreamLoad.writeRecord(serialize);
    }
    @Override
    public List<DorisCommittable> prepareCommit(boolean flush) throws IOException {
        if(!loading){
            //There is no data during the entire checkpoint period
            return Collections.emptyList();
        }
        // disable exception checker before stop load.
        loading = false;
        Preconditions.checkState(dorisStreamLoad != null);
        RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
        if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
            String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
            throw new DorisRuntimeException(errMsg);
        }
        if (!executionOptions.enabled2PC()) {
            return Collections.emptyList();
        }
        long txnId = respContent.getTxnId();
        return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
    }

每一条记录都会触发write操作,从上述代码可以看到根据boolean变量loading的值,程序将会触发dorisStreamLoad.startLoad(currentLabel);,而loading的状态在preCommit方法中进行修改,而preCommit是在checkpoint时触发,所以数据提交动作是通过checkpoint触发的。查看startLoad源代码


    /**
     * start write data for new checkpoint.
     * @param label
     * @throws IOException
     */
    public void startLoad(String label) throws IOException{
        loadBatchFirstRecord = true;
        HttpPutBuilder putBuilder = new HttpPutBuilder();
        recordStream.startInput();
        LOG.info("stream load started for {} on host {}", label, hostPort);
        try {
            InputStreamEntity entity = new InputStreamEntity(recordStream);
            putBuilder.setUrl(loadUrlStr)
                    .baseAuth(user, passwd)
                    .addCommonHeader()
                    .addHiddenColumns(enableDelete)
                    .setLabel(label)
                    .setEntity(entity)
                    .addProperties(streamLoadProp);
            if (enable2PC) {
               putBuilder.enable2PC();
            }
            pendingLoadFuture = executorService.submit(() -> {
                LOG.info("start execute load");
                return httpClient.execute(putBuilder.build());
            });
        } catch (Exception e) {
            String err = "failed to stream load data with label: " + label;
            LOG.warn(err, e);
            throw e;
        }
    }

DorisStreamLoad类负责将数据实际写入Doris,在上面的代码中我看到了一个陌生的词汇HiddenColumns,“隐藏列”,什么是隐藏列?.addHiddenColumns(enableDelete)的参数enableDelete 是一个boolean值,继续扒代码发现,默认值enableDelete = true;,addHiddenColumn(true)?是否意味着我的put操作数据中必须包含隐藏列?继续扒

    public HttpPutBuilder addHiddenColumns(boolean add) {
        if(add){
            header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);
        }
        return this;
    }

在http请求header中添加了一个配置,似乎是指明了"hidden_columns"="DORIS_DELETE_SIGN",看着好像是一个列名称,使用IDEA的跟踪调用功能,查看下哪里用到了这个变量。

跟踪这些代码更确信,这是一个列名称。我的10列加上这一列就是11列啊,设置enableDelete = false,是否意味着我的put操作不再包含这一隐含列?

解决方案

修改构造DorisSink的代码添加.setDeletable(false);

        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix(labelPrefix) //streamload label prefix
                .setDeletable(false);

重新运行代码,写入成功,问题解决。

总结

出现该异常是因为,Flink Doris Connector 在构造Sink时默认用户写入数据中包含了隐藏列__DORIS_DELETE_SIGN__
尽管问题解决了,但是还是有很多疑问,什么是隐藏列,__DORIS_DELETE_SIGN__这个隐藏列是什么意思,从前面的代码中可以看出其取值为0或1,导入数据时为什么默认需要传递该列,该列在最前面还是在最后面?不传递该列是否会有问题?

标签:actual,column,写入,number,label,dorisStreamLoad,Doris
From: https://www.cnblogs.com/aaronking/p/17554705.html

相关文章

  • EF Core 7.0 – JSON Column
    @@EFCore7json列 前言SQLServer支持JSON,以前写过一篇介绍SQLServer–WorkwithJSON. 但EFCore一直没有支持。直到EFCore7.0才支持。EFCore7包含对JSON列的提供程序无关的支持,以及SQLServer的实现。此支持允许将从.NET类型生成的聚合映射到......
  • PROPERTIES OF SQUARE NUMBERS
     Whenanumberismultipliedbyitself,theresultingnumberiscalledasasquarenumber. Forexample,whenwemultiply5by5,weget52 =25.Here,25isasquarenumber.Ingeometry,theareaofasquareisthefinestexampleofasquarenumber.Are......
  • CF1175F The Number of Subpermutations 对自己的警告--zhengjun
    太久没见过启发式合并了,然后没想出做法。首先笛卡尔树建出来。然后直接枚举跨过\(mid\)的长度为\(a_{mid}\)的区间,RMQ\(O(1)\)验证即可。发现这样的区间个数不超过左右区间大小的较小值,时间复杂度:\(O(n\logn)\)。代码#include<bits/stdc++.h>usingnamespacestd;us......
  • 特殊类型 调用Number函数
    //特殊类型null a=null; a=Number(a); console.log("nulla转换后类型="+typeofa); console.log("nulla转换后的值="+a); //特殊类型undefined a=undefined; a=Number(a); console.log("undefineda转换后类型="+typeofa); co......
  • mybatis-plus Error attempting to get column 'xxx' from result set.
     报错信息:mybatis-plusErrorattemptingtogetcolumn'xxx'fromresultset. 解决:1、获取数据的实体类中新建了一个有参的构造方法,却没有无参构造方法,使用MyBatis-Plus内置方法进行查询时会报错。解决办法:新建一个无参构造方法。......
  • 【论文解析】EJOR 2011 A clustering procedure for reducing the number of represen
    论文名称:AclusteringprocedureforreducingthenumberofrepresentativesolutionsintheParetoFrontofmultiobjectiveoptimizationproblems动机假设一个三目标优化问题\[\begin{aligned}&\text{Availability:}\max_\thetaJ_1(\theta)=\max_{\theta_p,......
  • ORA-01438处理方法 value larger than specified precision allowed for this column
    http://ora-01438.ora-code.com/ORA-01438:valuelargerthanspecifiedprecisionallowedforthiscolumnCause:Wheninsertingorupdatingrecords,anumericvaluewasenteredthatexceededtheprecisiondefinedforthecolumn.Action:Enteravaluethatcompli......
  • convert string list to number list
    #stringwithintegerssepatedbyspacesstring1="12345678"print("ActualStringcontainingintegers:",string1)print("Typeofstring:",type(string1))#convertingthestringintolistofstringslist1=list(string1.s......
  • saveOrUpdate failed with new sequence number
    Domainobject:<hibernate-mapping><classname="Trade"table="Trades"><idname="seqNum"column="SEQ_NUM"type="long"><generatorclass="sequence"><par......
  • CF1637H Minimize Inversions Number
    我直接??????????????????考虑一个数怎么做,就是逆序对减去一个\(i\)前面的逆序对再加上顺序对。考虑很多数怎么做,就是这个玩意的和再加上子序列种的顺序对减去逆序对,顺序对可以用逆序对表示,现在只考虑顺序对。注意到,如果\(i<j,p_i>p_j\)且\(i\)在子序列中\(j\)不在子序列中,那么把\(j\)弄......