首页 > 其他分享 >iceberg flink 写操作

iceberg flink 写操作

时间:2022-10-28 12:07:59浏览次数:45  
标签:writer iceberg flink write io org apache 操作


org.apache.iceberg.io.PartitionedFanoutWriter#write

public void write(T row) throws IOException {
// org.apache.flink.table.data.RowData -> org.apache.iceberg.PartitionKey
PartitionKey partitionKey = partition(row);
// org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter
RollingFileWriter writer = writers.get(partitionKey);
if (writer == null) {
// NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
PartitionKey copiedKey = partitionKey.copy();
writer = new RollingFileWriter(copiedKey);
writers.put(copiedKey, writer);
}
writer.write(row);
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#write(T)

public void write(T record) throws IOException {
// org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter
write(currentWriter, record);
this.currentRows++;
// currentRows % ROWS_DIVISOR == 0
if (shouldRollToNewFile()) {
closeCurrent();
openCurrent();
}
}

org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter#write

@Override
void write(DataWriter<T> writer, T record) {
// org.apache.iceberg.io.DataWriter
writer.add(record);
}

org.apache.iceberg.io.DataWriter#add

public void add(T row) {
// org.apache.iceberg.parquet.ParquetWriter
appender.add(row);
}

org.apache.iceberg.parquet.ParquetWriter#add

@Override
public void add(T value) {
recordCount += 1;
// org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter
model.write(0, value);
// org.apache.parquet.column.impl.ColumnWriteStoreV1
writeStore.endRecord();
checkSize();
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#close

@Override
public void close() throws IOException {
closeCurrent();
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#closeCurrent

private void closeCurrent() throws IOException {
if (currentWriter != null) {
// org.apache.iceberg.io.DataWriter
currentWriter.close();

if (currentRows == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
} else {
complete(currentWriter);
}

this.currentFile = null;
this.currentWriter = null;
this.currentRows = 0;
}
}

 

标签:writer,iceberg,flink,write,io,org,apache,操作
From: https://blog.51cto.com/u_11290086/5804039

相关文章

  • git提交指定文件,如果配合IDEA操作,改动文件自动add。使用命令行 git status 查看仓库状
    git提交指定文件如果配合IDEA操作,改动文件自动add。使用命令行gitstatus查看仓库状态,gitcommitsrc/main/java/com/test01.javasrc/main/java/com/test01.java......
  • 记录的简单操作
    1.以名调用并且修改值VueX的mutations中:setVal(state,{ valName, val}){state[valName]=val;}调用时:store.commit('setVal',{valeName:'......
  • Oracle 数据库 日常操作函数
    --获取当天数据select*fromPOS_TRANMAINwheretrunc(SLDATE)=trunc(sysdate);--decode等同于casewhenthendecode(条件,值1,返回值1,值2,返回值2....)--to_char......
  • C#文件路径操作
    System.Environment.CurrentDirectorySystem.IO.Directory.GetCurrentDirectory()这两个方法获得的路径是一样的,获得的是当前路径,这个路径不一定是程序所在的路径。任何会......
  • 07-项目训练_管理员数据操作部分
    目录​​1,导入后台操作模板​​​​2,创建数据库及管理员信息表​​​​3,编写管理员数据与数据库的交互逻辑​​​​3.1面向接口编程,创建BaseAdminDao接口​​​​3.2编写B......
  • 利用PV操作实现进程的同步的理论意义
    进程的同步是由于进程间合作引起的相互制约的问题,要实现进程的同步可用一个信号量与消息联系起来。当信号量的值为0时表示希望的消息未产生,当信号量的值为非0时表示希望的消......
  • composer 基础操作
    一、composer入门1、每次安装新的包文件,会更新/vendor/autoload.php文件2、composer.lock与composer.json的关系文件composer.lock会根据composer.json的内容自动生成,......
  • Clickhouse之集群操作
    查看集群:在任意一台机上,使用/usr/bin/clickhouse-client--hostlocalhost--port9000连接本地服务器select*from`system`.clusters; 建库:建库create......
  • cmd的基础操作和快捷指令
    打开CMD的方式开始+系统+命令提示符win键+r输入cmd打开控制台(推荐使用)在桌面空白处按住shift键+鼠标右键+s键资源管理器的地址栏前面加上cmd+空格 管......
  • 不同角度理解线程的状态(操作系统 & Java API)
    3.12五种状态(操作系统层面)这是从操作系统层面来描述的【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联【可运行状态】(就绪状态)指该线程......