基于Paimon 0.5版本
写入流程的构建org.apache.paimon.flink.sink.FlinkSinkBuilder#build
算子的流向
BucketingStreamPartitioner 分区 -> RowDataStoreWriteOperator 写入 -> CommitterOperator 提交
Primary key表写入
BucketingStreamPartitioner 根据数据的bucket和partition计算应该发送的通道
RowDataStoreWriteOperator#processElement
StoreSinkWriteImpl(StoreSinkWrite)#write
org.apache.paimon.operation.AbstractFileStoreWrite#write
org.apache.paimon.operation.KeyValueFileStoreWrite#createWriter 创建writer MergeTreeWriter
org.apache.paimon.mergetree.MergeTreeWriter#write
org.apache.paimon.mergetree.SortBufferWriteBuffer#put 添加到buffer
BinaryInMemorySortBuffer/BinaryExternalSortBuffer#write 序列化后写入buffer
org.apache.paimon.mergetree.MergeTreeWriter#flushWriteBuffer 内存满后 刷写writeBuffer. 遍历buffer 应用merge函数, 并创建level 0的 file writer, 将数据写入到datafile中. 如果同时配置了Changelog producer是input,那么会将原始的数据写出到Changelog文件中
Append-only表写入
Append-only的表是没有Pk的表, 在创建表的时候就已经根据pk和write-mode
参数确定了表的类型, 一般来说,没有PK的就是Append-only的表, Append-only的表意味着不处理变更流的数据
WriteMode writeMode = coreOptions.get(CoreOptions.WRITE_MODE);
if (writeMode == WriteMode.AUTO) {
writeMode =
tableSchema.primaryKeys().isEmpty()
? WriteMode.APPEND_ONLY
: WriteMode.CHANGE_LOG;
coreOptions.set(CoreOptions.WRITE_MODE, writeMode);
}
if (writeMode == WriteMode.APPEND_ONLY) {
table = new AppendOnlyFileStoreTable(fileIO, tablePath, tableSchema);
} else {
if (tableSchema.primaryKeys().isEmpty()) {
// 没有PK但是设置了写入模式是Changelog类型, 会将全列作为主键, 并记录出现的次数来进行回撤
table = new ChangelogValueCountFileStoreTable(fileIO, tablePath, tableSchema);
} else {
table = new ChangelogWithKeyFileStoreTable(fileIO, tablePath, tableSchema);
}
}
org.apache.paimon.operation.AbstractFileStoreWrite#write
org.apache.paimon.operation.AppendOnlyFileStoreWrite#createWriter 创建AppendOnlyWriter
org.apache.paimon.io.RollingFileWriter#write append-only 表直接写文件了, 没有pk表中的write buffer