问题
Flink SQL 写 Doris表,报错 actual column number in csv file is less than schema column number.
现象
FLINK SQL 定义的 Schema 明明有 m 列,结果写入的时候 报错实际的列有 m+1列。
CREATE TABLE DORIS_SINK (
ID STRING,
NAME STRING,
BANK STRING,
AGE INT
) with (
'connector' = 'doris',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age',
'sink.properties.partial_columns' = 'true'
)
分析
其实 sink.enable-delete 默认 true,则 Connector 在做 StreamLoad 的时候,会自动追加一列。
先往 header 里面加
public HttpPutBuilder addHiddenColumns(boolean add) {
if(add){
header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);
}
return this;
}
再往 SteamLoad 的数据里头加列
org.apache.doris.flink.sink.writer.RowDataSerializer#buildJsonString
public String buildJsonString(RowData record, int maxIndex) throws IOException {
int fieldIndex = 0;
Map<String, String> valueMap = new HashMap<>();
while (fieldIndex < maxIndex) {
Object field = rowConverter.convertExternal(record, fieldIndex);
String value = field != null ? field.toString() : null;
valueMap.put(fieldNames[fieldIndex], value);
fieldIndex++;
}
if (enableDelete) {
valueMap.put(DORIS_DELETE_SIGN, parseDeleteSign(record.getRowKind()));
}
return objectMapper.writeValueAsString(valueMap);
}
参考 Doris写入数据异常提示actual column number in csv file is less than schema column number
结论
可能已经修复了,具体怎么修复的,原理未知。
https://github.com/apache/doris-flink-connector/issues/28
生效版本的起点难以确定。分析 jar source 和 Github 上的分支源码也对不上。
经过测试版本 24.0.1 不需要加隐藏列。
另外,对于低版本、未生效的 flink-doris-connector,必须设置为 false,即便Flink SQL的Schema加上 __DORIS_DELETE_SIGN__列也不行。
判断标准是:
//org.apache.doris.flink.deserialization.converter.DorisRowConverter
public GenericRowData convertInternal(List record) {
//BUG:GenericRowData rowData = new GenericRowData(record.size());
//修复后
GenericRowData rowData = new GenericRowData(deserializationConverters.length);
for (int i = 0; i < deserializationConverters.length ; i++) {
rowData.setField(i, deserializationConverters[i].deserialize(record.get(i)));
}
return rowData;
}
标签:actual,column,number,record,fieldIndex,sink,doris
From: https://www.cnblogs.com/slankka/p/18554428