首页 > 其他分享 >iceberg flink 读操作

iceberg flink 读操作

时间:2022-10-28 12:08:28浏览次数:46  
标签:iceberg currentIterator flink source org apache 操作


org.apache.iceberg.flink.source.FlinkInputFormat.open

@Override
public void open(FlinkInputSplit split) {
// split.getTask().files(): FlinkInputSplit.CombinedScanTask.Collection<FileScanTask>: SplitScanTask
// encryptionClass: org.apache.iceberg.encryption.PlaintextEncryptionManager
// tableSchema & context.project(): org.apache.iceberg.Schema
// caseSensitive: false
this.iterator = new RowDataIterator(
split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
context.caseSensitive());
}

org.apache.iceberg.flink.source.DataIterator.DataIterator

DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) {
this.tasks = task.files().iterator();

Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
task.files().stream()
.flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
.map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));

// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
Iterable<InputFile> decryptedFiles = encryption.decrypt(encrypted::iterator);

Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));

// 该 task 进程的所有需要 scan 的 parq 文件路径
this.inputFiles = Collections.unmodifiableMap(files);

this.currentIterator = CloseableIterator.empty();
}

org.apache.iceberg.hadoop.HadoopFileIO#newInputFile {...}

org.apache.iceberg.flink.source.RowDataIterator.RowDataIterator

RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
Schema projectedSchema, String nameMapping, boolean caseSensitive) {
super(task, io, encryption);
// tableSchema & projectedSchema: org.apache.iceberg.Schema
this.tableSchema = tableSchema;
this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
// false
this.caseSensitive = caseSensitive;
}

org.apache.iceberg.flink.source.DataIterator.hasNext

@Override
public boolean hasNext() {
updateCurrentIterator();
return currentIterator.hasNext();
}

org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator

private void updateCurrentIterator() {
try {
while (!currentIterator.hasNext() && tasks.hasNext()) {
currentIterator.close();
currentIterator = openTaskIterator(tasks.next());
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

org.apache.iceberg.flink.source.RowDataIterator.openTaskIterator

org.apache.iceberg.flink.source.RowDataIterator#newIterable

org.apache.iceberg.flink.source.RowDataIterator#newParquetIterable

org.apache.iceberg.parquet.ParquetReader.iterator

@Override
public CloseableIterator<T> iterator() {
FileIterator<T> iter = new FileIterator<>(init());
addCloseable(iter);
return iter;
}

org.apache.iceberg.flink.source.DataIterator.next

@Override
public T next() {
updateCurrentIterator();
return currentIterator.next();
}

org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator

private void updateCurrentIterator() {
try {
while (!currentIterator.hasNext() && tasks.hasNext()) {
currentIterator.close();
currentIterator = openTaskIterator(tasks.next());
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

org.apache.iceberg.parquet.ParquetReader.FileIterator.next

@Override
public T next() {
// valuesRead: 0
// nextRowGroupStart: 0
if (valuesRead >= nextRowGroupStart) {
advance();
}

if (reuseContainers) {
this.last = model.read(last);
} else {
this.last = model.read(null);
}
valuesRead += 1;

return last;
}

org.apache.iceberg.parquet.ParquetValueReaders.StructReader.read

@Override
public final T read(T reuse) {
I intermediate = newStructData(reuse);

for (int i = 0; i < readers.length; i += 1) {
set(intermediate, i, readers[i].read(get(intermediate, i)));
}

return buildStruct(intermediate);
}

org.apache.iceberg.flink.data.FlinkParquetReaders.StringReader.read

@Override
public StringData read(StringData ignored) {
Binary binary = column.nextBinary();
ByteBuffer buffer = binary.toByteBuffer();
if (buffer.hasArray()) {
return StringData.fromBytes(
buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
return StringData.fromBytes(binary.getBytes());
}
}

 

标签:iceberg,currentIterator,flink,source,org,apache,操作
From: https://blog.51cto.com/u_11290086/5804037

相关文章

  • iceberg flink 写操作
    org.apache.iceberg.io.PartitionedFanoutWriter#writepublicvoidwrite(Trow)throwsIOException{//org.apache.flink.table.data.RowData->org.apache.iceberg......
  • git提交指定文件,如果配合IDEA操作,改动文件自动add。使用命令行 git status 查看仓库状
    git提交指定文件如果配合IDEA操作,改动文件自动add。使用命令行gitstatus查看仓库状态,gitcommitsrc/main/java/com/test01.javasrc/main/java/com/test01.java......
  • 记录的简单操作
    1.以名调用并且修改值VueX的mutations中:setVal(state,{ valName, val}){state[valName]=val;}调用时:store.commit('setVal',{valeName:'......
  • Oracle 数据库 日常操作函数
    --获取当天数据select*fromPOS_TRANMAINwheretrunc(SLDATE)=trunc(sysdate);--decode等同于casewhenthendecode(条件,值1,返回值1,值2,返回值2....)--to_char......
  • C#文件路径操作
    System.Environment.CurrentDirectorySystem.IO.Directory.GetCurrentDirectory()这两个方法获得的路径是一样的,获得的是当前路径,这个路径不一定是程序所在的路径。任何会......
  • 07-项目训练_管理员数据操作部分
    目录​​1,导入后台操作模板​​​​2,创建数据库及管理员信息表​​​​3,编写管理员数据与数据库的交互逻辑​​​​3.1面向接口编程,创建BaseAdminDao接口​​​​3.2编写B......
  • 利用PV操作实现进程的同步的理论意义
    进程的同步是由于进程间合作引起的相互制约的问题,要实现进程的同步可用一个信号量与消息联系起来。当信号量的值为0时表示希望的消息未产生,当信号量的值为非0时表示希望的消......
  • composer 基础操作
    一、composer入门1、每次安装新的包文件,会更新/vendor/autoload.php文件2、composer.lock与composer.json的关系文件composer.lock会根据composer.json的内容自动生成,......
  • Clickhouse之集群操作
    查看集群:在任意一台机上,使用/usr/bin/clickhouse-client--hostlocalhost--port9000连接本地服务器select*from`system`.clusters; 建库:建库create......
  • cmd的基础操作和快捷指令
    打开CMD的方式开始+系统+命令提示符win键+r输入cmd打开控制台(推荐使用)在桌面空白处按住shift键+鼠标右键+s键资源管理器的地址栏前面加上cmd+空格 管......