首页 > 数据库 >PostgreSQL数据库FDW——Parquet S3 CachingParquetReader

PostgreSQL数据库FDW——Parquet S3 CachingParquetReader

时间:2023-01-14 11:06:45浏览次数:49  
标签:slot break PostgreSQL S3 CachingParquetReader column arrow data row


CachingParquetReader类继承自ParquetReader类,相对于DefaultParquetReader类,其新增is_active成员用于控制next函数的返回值RS_INACTIVE。新增column_data和column_nulls成员取代chuck,用于存储列数据。

class CachingParquetReader : public ParquetReader{
private:
std::vector<void *> column_data;
std::vector<std::vector<bool> > column_nulls;
bool is_active; /* weather reader is active */

int row_group; /* current row group index */
uint32_t row; /* current row within row group */
uint32_t num_rows; /* total rows in row group */
...

next

next函数用于对外返回rowgroup提取的记录(通过read_next_rowgroup函数),并将其放置到TupleTableSlot中(通过populate_slot函数,populate_slot函数也会提取记录)。可以看出新增is_active成员用于控制next函数的返回值RS_INACTIVE。

ReadStatus next(TupleTableSlot *slot, bool fake=false) {
if (this->row >= this->num_rows) {
if (!is_active) return RS_INACTIVE;

/* Read next row group. We do it in a loop to skip possibly empty row groups. */
do{
if (!this->read_next_rowgroup()) return RS_EOF;
}
while (!this->num_rows);
}
if (!fake) this->populate_slot(slot, false);
return RS_SUCCESS;
}

read_next_rowgroup

read_next_rowgroup和DefaultParquetReader类的read_next_rowgroup类似,不同之处在于Read columns data and store it into column_data vector环节,通过​​ this->read_column(table, col, has_nulls)​​函数读取数据。

bool read_next_rowgroup() {
arrow::Status status;
std::shared_ptr<arrow::Table> table;

/* TODO: release previously stored data */
this->column_data.resize(this->types.size(), nullptr); this->column_nulls.resize(this->types.size());

if (this->coordinator) { /* In case of parallel query get the row group index from the coordinator. Otherwise just increment it. */
coordinator->lock();
if ((this->row_group = coordinator->next_rowgroup(reader_id)) == -1){
coordinator->unlock(); return false;
}
coordinator->unlock();
} else this->row_group++;
/* row_group cannot be less than zero at this point so it is safe to cast it to unsigned int */
if ((uint) this->row_group >= this->rowgroups.size()) return false;

int rowgroup = this->rowgroups[this->row_group];
auto rowgroup_meta = this->reader->parquet_reader()->metadata()->RowGroup(rowgroup);
status = this->reader->RowGroup(rowgroup)->ReadTable(this->indices, &table);
if (!status.ok()) throw Error("parquet_s3_fdw: failed to read rowgroup #%i: %s", rowgroup, status.message().c_str());


allocator->recycle(); /* Release resources acquired in the previous iteration */

/* Read columns data and store it into column_data vector */
for (std::vector<TypeInfo>::size_type col = 0; col < types.size(); ++col){
std::shared_ptr<parquet::Statistics> stats;
if (types[col].index >= 0) stats = rowgroup_meta->ColumnChunk(types[col].index)->statistics();
bool has_nulls = stats ? stats->null_count() > 0 : true;

this->num_rows = table->num_rows();
this->column_nulls[col].resize(this->num_rows);

this->read_column(table, col, has_nulls); //读取数据
}

this->row = 0;
return true;
}

read_column同样也是使用arrow::ChunkedArray存储获取的列数据,通过allocator为每列根据数据类型分配存储空间,拷贝arrow::ChunkedArray中的列数据到新分配的存储空间中。最终将数据空间设置到this->column_data[col]中。

void read_column(std::shared_ptr<arrow::Table> table, int col, bool has_nulls){
std::shared_ptr<arrow::ChunkedArray> column = table->column(col);

TypeInfo &typinfo = this->types[col];
size_t sz; int row = 0;
switch(typinfo.arrow.type_id) { // 确定列存储空间
case arrow::Type::BOOL: sz = sizeof(bool); break;
case arrow::Type::INT8: sz = sizeof(int8); break;
case arrow::Type::INT16: sz = sizeof(int16); break;
case arrow::Type::INT32: sz = sizeof(int32); break;
case arrow::Type::FLOAT: sz = sizeof(float); break;
case arrow::Type::DATE32: sz = sizeof(int); break;
default: sz = sizeof(Datum);
}
void *data = allocator->fast_alloc(sz * num_rows); //分配存储空间

for (int i = 0; i < column->num_chunks(); ++i) {
arrow::Array *array = column->chunk(i).get();
/* XXX We could probably optimize here by copying the entire array by using copy_to_c_array when has_nulls = false. */
for (int j = 0; j < array->length(); ++j) { // 处理coll列的每行
if (has_nulls && array->IsNull(j)) {
this->column_nulls[col][row++] = true; continue; // 某行某列为null值
}
switch (typinfo.arrow.type_id) {
/* For types smaller than Datum (assuming 8 bytes) we copy raw values to save memory and only convert them into Datum on the slot population stage. */
case arrow::Type::BOOL: {
arrow::BooleanArray *boolarray = (arrow::BooleanArray *) array;
((bool *) data)[row] = boolarray->Value(row); break;
}
case arrow::Type::INT8: {
arrow::Int8Array *intarray = (arrow::Int8Array *) array;
((int8 *) data)[row] = intarray->Value(row); break;
}
case arrow::Type::INT16: {
arrow::Int16Array *intarray = (arrow::Int16Array *) array;
((int16 *) data)[row] = intarray->Value(row); break;
}
case arrow::Type::INT32: {
arrow::Int32Array *intarray = (arrow::Int32Array *) array;
((int32 *) data)[row] = intarray->Value(row); break;
}
case arrow::Type::FLOAT: {
arrow::FloatArray *farray = (arrow::FloatArray *) array;
((float *) data)[row] = farray->Value(row); break;
}
case arrow::Type::DATE32: {
arrow::Date32Array *tsarray = (arrow::Date32Array *) array;
((int *) data)[row] = tsarray->Value(row); break;
}
case arrow::Type::LIST: {
auto larray = (arrow::ListArray *) array;
/* In schemaless mode, read nested list as jsonb array */
if (schemaless == true) {
Datum jsonb = this->nested_list_to_jsonb_datum(larray, j, typinfo);
/* Copy jsonb into memory block allocated by FastAllocator to prevent its destruction though to be able to recycle it once it fulfilled its purpose. */
void *res = allocator->fast_alloc(VARSIZE_ANY(jsonb));
memcpy(res, (Jsonb *) jsonb, VARSIZE_ANY(jsonb));
((Datum *) data)[row] = (Datum) res;
pfree((Jsonb *) jsonb); break;
}
((Datum *) data)[row] = this->nested_list_to_datum(larray, j, typinfo);
break;
}
case arrow::Type::MAP: {
arrow::MapArray* maparray = (arrow::MapArray*) array;
Datum jsonb = this->map_to_datum(maparray, j, typinfo);
/* Copy jsonb into memory block allocated by FastAllocator to prevent its destruction though to be able to recycle it once it fulfilled its purpose. */
void *res = allocator->fast_alloc(VARSIZE_ANY(jsonb));
memcpy(res, (Jsonb *) jsonb, VARSIZE_ANY(jsonb));
((Datum *) data)[row] = (Datum) res;
pfree((Jsonb *) jsonb); break;
}
default: /* For larger types we copy already converted into Datum values. */
((Datum *) data)[row] = this->read_primitive_type(array, typinfo, j);
}
this->column_nulls[col][row] = false;
row++;
}
}

this->column_data[col] = data;
}

populate_slot

populate_slot和和DefaultParquetReader类的populate_slot类似,不同之处在于该函数从​​this->column_data​​处获取数据,且不需要向chuck一样再次加载下一个chuck。

void populate_slot(TupleTableSlot *slot, bool fake=false){
if (this->schemaless == true) {
schemaless_populate_slot(slot, fake); return;
}
for (int attr = 0; attr < slot->tts_tupleDescriptor->natts; attr++){
int arrow_col = this->map[attr];
/* We only fill slot attributes if column was referred in targetlist or clauses. In other cases mark attribute as NULL. */
if (arrow_col >= 0){
TypeInfo &typinfo = this->types[arrow_col];
void *data = this->column_data[arrow_col];
bool need_cast = typinfo.need_cast;

switch(typinfo.arrow.type_id){
case arrow::Type::BOOL:slot->tts_values[attr] = BoolGetDatum(((bool *) data)[this->row]); break;
case arrow::Type::INT8:slot->tts_values[attr] = Int8GetDatum(((int8 *) data)[this->row]); break;
case arrow::Type::INT16:slot->tts_values[attr] = Int16GetDatum(((int16 *) data)[this->row]); break;
case arrow::Type::INT32:slot->tts_values[attr] = Int32GetDatum(((int32 *) data)[this->row]); break;
case arrow::Type::FLOAT:slot->tts_values[attr] = Float4GetDatum(((float *) data)[this->row]); break;
case arrow::Type::DATE32: {
int dt = ((int *) data)[this->row] + (UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE);
slot->tts_values[attr] = DateADTGetDatum(dt);
} break;
default: slot->tts_values[attr] = ((Datum *) data)[this->row]; need_cast = false;
}

if (need_cast) slot->tts_values[attr] = do_cast(slot->tts_values[attr], typinfo);
slot->tts_isnull[attr] = this->column_nulls[arrow_col][this->row];
} else {
slot->tts_isnull[attr] = true;
}
}
this->row++;
}


标签:slot,break,PostgreSQL,S3,CachingParquetReader,column,arrow,data,row
From: https://blog.51cto.com/feishujun/6007525

相关文章