首页 > 数据库 >PostgreSQL数据库FDW——Parquet S3 DefaultParquetReader类

PostgreSQL数据库FDW——Parquet S3 DefaultParquetReader类

时间:2023-01-14 11:07:38浏览次数:41  
标签:DefaultParquetReader slot PostgreSQL chunkInfo S3 chunk arrow reader row


S3RandomAccessFile

S3RandomAccessFile类定义在parquet_s3_fdw.hpp,用于访问s3对象存储的类。其成员函数定义在parquet_s3_fdw.cpp文件中,S3RandomAccessFile构造函数用于初始化private成员(offset设置为0,isclosed设置为false)。

class S3RandomAccessFile : public arrow::io::RandomAccessFile{
private:
Aws::String bucket_;
Aws::String object_;
Aws::S3::S3Client *s3_client_;
int64_t offset;
bool isclosed;

public:
S3RandomAccessFile(Aws::S3::S3Client *s3_client, const Aws::String &bucket, const Aws::String &object);
arrow::Status Close();
arrow::Result<int64_t>Tell() const; // return offset
bool closed() const; // return isclosed
arrow::Status Seek(int64_t position); // offset = position
arrow::Result<int64_t> Read(int64_t nbytes, void* out);
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes);
arrow::Result<int64_t> GetSize();
};

Read函数首先初始化object_request,调用s3_client_->GetObject尝试获取数据,然后通过stringstream,将数据读取到out缓冲区中。

arrow::Result<int64_t> S3RandomAccessFile::Read(int64_t nbytes, void* out){
Aws::S3::Model::GetObjectRequest object_request;
object_request.WithBucket(bucket_.c_str()).WithKey(object_.c_str());
string bytes = "bytes=" + to_string(offset) + "-" + to_string(offset + nbytes - 1);
object_request.SetRange(bytes.c_str());
object_request.SetBucket(this->bucket_);
object_request.SetKey(this->object_);
object_request.SetResponseStreamFactory([](){ return Aws::New<Aws::StringStream >(S3_ALLOCATION_TAG); });

Aws::S3::Model::GetObjectOutcome get_object_outcome = this->s3_client_->GetObject(object_request);
if (!get_object_outcome.IsSuccess()) {
auto err = get_object_outcome.GetError();
Aws::String msg = "GetObject failed. " + err.GetExceptionName() + ": " + err.GetMessage();
return arrow::Status(arrow::StatusCode::IOError, msg.c_str());
}

int64_t n_read = get_object_outcome.GetResult().GetContentLength();
offset += n_read;
std::stringstream string_stream;
string_stream << get_object_outcome.GetResult().GetBody().rdbuf();
string_stream.read((char*)out, n_read);
return n_read;
}

另外一个read函数通过调用上一个read函数获取数据,然后封装到arrow::Buffer中返回。

arrow::Result<std::shared_ptr<arrow::Buffer>> S3RandomAccessFile::Read(int64_t nbytes){
char *out = (char*)malloc(nbytes);
arrow::Result<int64_t> res = this->Read(nbytes, out);
int64_t n = res.ValueOrDie();
std::shared_ptr<arrow::Buffer> buf = make_shared<arrow::Buffer>((const uint8_t*)out, n);
return buf;
}

GetSize函数通过HeadObjectRequest获取数据大小。

arrow::Result<int64_t> S3RandomAccessFile::GetSize(){
Aws::S3::Model::HeadObjectRequest headObj;
headObj.SetBucket(bucket_); headObj.SetKey(object_);
auto object = this->s3_client_->HeadObject(headObj);
if (!object.IsSuccess()){
return arrow::Status(arrow::StatusCode::IOError, "HeadObject failed");
}
int64_t fileSize = object.GetResultWithOwnership().GetContentLength();
return fileSize;
}

ReaderCacheEntry

​arrow::FileReader​​类作为Arrow读取适配器类,用于将Parquet文件反序列化为Arrow行批处理。该接口满足不同的用例,因此提供了不同的接口。在其最简单的形式中,我们迎合了希望使用FileReader::ReadTable方法一次读取整个Parquet数据。更高级的用户如果也希望在每个Parquet文件上实现并行性,应该在RowGroup级别执行此操作。为此,他们可以调用FileReader::RowGroup(i)->ReadTable,以仅接收指定的RowGroup作为表。在最高级的情况下,消费者希望并行独立读取RowGroup并单独使用每一列,他们可以调用FileReader::RowGroup(i)->column(j)->read并接收arrow::column实例。parquet格式支持可分配给字段field的可选整数field_id。Arrow会将这些字段id转换为相应字段上名为PARQUET:field_id的元数据键。Arrow read adapter class for deserializing Parquet files as Arrow row batches. This interfaces caters for different use cases and thus provides different interfaces. In its most simplistic form, we cater for a user that wants to read the whole Parquet at once with the FileReader::ReadTable method. More advanced users that also want to implement parallelism on top of each single Parquet files should do this on the RowGroup level. For this, they can call FileReader::RowGroup(i)->ReadTable to receive only the specified RowGroup as a table. In the most advanced situation, where a consumer wants to independently read RowGroups in parallel and consume each column individually, they can call FileReader::RowGroup(i)->Column(j)->Read and receive an arrow::Column instance. The parquet format supports an optional integer field_id which can be assigned to a field. Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.
从下面注释中可以看出这里为何需要用FileReaderCache类包装​​​std::unique_ptr<parquet::arrow::FileReader>​​。We would like to cache FileReader. When creating new hash entry, the memory of entry is allocated by PostgreSQL core. But FileReader is a unique_ptr. In order to initialize it in parquet_s3_fdw, we define FileReaderCache class and the cache entry has the pointer of this class.

class FileReaderCache{
public: std::unique_ptr<parquet::arrow::FileReader> reader;
};

FileReaderHash变量为指向ReaderCache HTAB的指针,ReaderCacheKey为其key,ReaderCacheEntry为其键。

static HTAB *FileReaderHash = NULL;
typedef struct ReaderCacheKey{
char dname[256]; char fname[256];
} ReaderCacheKey;
typedef struct ReaderCacheEntry{
ReaderCacheKey key; /* hash key (must be first) */
FileReaderCache *file_reader;
arrow::MemoryPool *pool;
} ReaderCacheEntry;

parquetGetFileReader函数定义在parquet_s3_fdw_connection.cpp文件中,用于获取ReaderCacheEntry(ReaderCacheKey为其key,ReaderCacheEntry为其键)。

ReaderCacheEntry *parquetGetFileReader(Aws::S3::S3Client *s3client, char *dname, char *fname){
bool found;
ReaderCacheEntry *entry; ReaderCacheKey key = {0};
if (FileReaderHash == NULL){ /* First time through, initialize connection cache hashtable */
HASHCTL ctl; MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(ReaderCacheKey); ctl.entrysize = sizeof(ReaderCacheEntry);
ctl.hcxt = CacheMemoryContext; /* allocate ConnectionHash in the cache context */
FileReaderHash = hash_create("parquet_s3_fdw file reader cache", 8, &ctl,
#if (PG_VERSION_NUM >= 140000)
HASH_ELEM | HASH_BLOBS);
#else
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
#endif
/* Register some callback functions that manage connection cleanup. This should be done just once in each backend. */
CacheRegisterSyscacheCallback(FOREIGNSERVEROID, parquet_fdw_inval_callback, (Datum) 0);
CacheRegisterSyscacheCallback(USERMAPPINGOID, parquet_fdw_inval_callback, (Datum) 0);
}

/* Create hash key for the entry. Assume no pad bytes in key struct */
strcpy(key.dname, dname); strcpy(key.fname, fname);
/* Find or create cached entry for requested connection. */
entry = (ReaderCacheEntry *) hash_search(FileReaderHash, &key, HASH_ENTER, &found);
if (!found){
/* We need only clear "file_reader" here; remaining fields will be filled later when "file_reader" is set. */
entry->file_reader = NULL;
}

/* If cache entry doesn't have a reader, we have to establish a new reader. */
if (entry->file_reader == NULL || entry->file_reader->reader == nullptr){ // 构造新的arrow::FileReader
std::unique_ptr<parquet::arrow::FileReader> reader;
entry->pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::RandomAccessFile> input(new S3RandomAccessFile(s3client, dname, fname)); // 使用上述S3RandomAccessFile类构造arrow::io::RandomAccessFile
arrow::Status status = parquet::arrow::OpenFile(input, entry->pool, &reader);

if (!status.ok()) throw Error("failed to open Parquet file %s", status.message().c_str());

if (!entry->file_reader) entry->file_reader = new FileReaderCache();
entry->file_reader->reader = std::move(reader);
elog(DEBUG3, "parquet_s3_fdw: new parquet file reader for s3handle %p %s/%s", s3client, dname, fname);
}
return entry;
}

DefaultParquetReader

DefaultParquetReader类继承自ParquetReader类,其包含了用于封装row group的table成员,chunk_info中包含了row group中列数据的ChunkInfo信息,chunks中包含了row group中列数据。

class DefaultParquetReader : public ParquetReader{
private:
std::shared_ptr<arrow::Table> table; /* Current row group */
struct ChunkInfo{
int chunk; /* current chunk number */
int64 pos; /* current pos within chunk */
int64 len; /* current chunk length */
ChunkInfo (int64 len) : chunk(0), pos(0), len(len) {}
};
std::vector<ChunkInfo> chunk_info; /* current chunk and position per-column */
std::vector<arrow::Array *> chunks; /* Plain pointers to inner the structures of row group. It's needed to prevent excessive shared_ptr management. */

int row_group; /* current row group index */
uint32_t row; /* current row within row group */
uint32_t num_rows; /* total rows in row group */
...

open

open成员函数主要工作就是获取FileReader,有形参的函数调用上一节介绍的parquetGetFileReader函数,没有形参的函数直接使用​​arrow::FileReader::Make​​创建FileReader,不涉及ReaderCache对FileReader的缓存。

void open(const char *dirname, Aws::S3::S3Client *s3_client){
arrow::Status status;
char *dname; char *fname;
parquetSplitS3Path(dirname, filename.c_str(), &dname, &fname);
this->reader_entry = parquetGetFileReader(s3_client, dname, fname);
elog(DEBUG1, "parquet_s3_fdw: open Parquet file on S3. %s%s", dname, fname);
pfree(dname); pfree(fname);

this->reader = std::move(this->reader_entry->file_reader->reader); // this->reader_entry和this->reader的关系
this->reader->set_use_threads(this->use_threads && parquet_fdw_use_threads); /* Enable parallel columns decoding/decompression if needed */
}

void open() {
arrow::Status status;
std::unique_ptr<parquet::arrow::FileReader> reader;
status = parquet::arrow::FileReader::Make( arrow::default_memory_pool(), parquet::ParquetFileReader::OpenFile(filename, use_mmap), &reader);
if (!status.ok()) throw Error("parquet_s3_fdw: failed to open Parquet file %s", status.message().c_str());
this->reader = std::move(reader);
this->reader->set_use_threads(this->use_threads && parquet_fdw_use_threads); /* Enable parallel columns decoding/decompression if needed */
}

next

next函数用于对外返回rowgroup提取的记录(通过read_next_rowgroup函数),并将其放置到TupleTableSlot中(通过populate_slot函数,populate_slot函数也会提取记录)。

ReadStatus next(TupleTableSlot *slot, bool fake=false){
allocator->recycle();
if (this->row >= this->num_rows) {
/* Read next row group. We do it in a loop to skip possibly empty row groups. */
do {
if (!this->read_next_rowgroup())
return RS_EOF;
} while (!this->num_rows);
}

this->populate_slot(slot, fake);
this->row++;

return RS_SUCCESS;
}

read_next_rowgroup提取数据到chunk

read_next_rowgroup函数首先需要确定row_group序号,如果是并行foreign scan,需要确定本DefaultParquetReader reader_id标识在coordinator类中所存储的rowgroup计数器的值;否则是串行foreign scan,直接递增DefaultParquetReader的row_group计数值。通过row_group计数值计数值确定DefaultParquetReader类保存的​​this->rowgroups[this->row_group]​​​rowgroups号。利用​​reader->RowGroup(rowgroup)->ReadTable(this->indices, &this->table)​​函数获取rowgroup数据。通过遍历查询涉及project column,提取row group中的列数据和其长度,并保存到chunk_info和chunk中。

bool read_next_rowgroup() {
arrow::Status status;
if (coordinator) { /* In case of parallel query get the row group index from the coordinator. Otherwise just increment it. */
coordinator->lock();
if ((this->row_group = coordinator->next_rowgroup(reader_id)) == -1){
coordinator->unlock();
return false;
}
coordinator->unlock();
} else
this->row_group++;
/* row_group cannot be less than zero at this point so it is safe to cast it to unsigned int */
if ((uint) this->row_group >= this->rowgroups.size()) return false;

int rowgroup = this->rowgroups[this->row_group];
auto rowgroup_meta = this->reader->parquet_reader()->metadata()->RowGroup(rowgroup);
status = this->reader->RowGroup(rowgroup)->ReadTable(this->indices, &this->table);
if (!status.ok()) throw Error("parquet_s3_fdw: failed to read rowgroup #%i: %s", rowgroup, status.message().c_str());
if (!this->table) throw std::runtime_error("parquet_s3_fdw: got empty table");

/* TODO: don't clear each time */
this->chunk_info.clear(); this->chunks.clear();
for (uint64_t i = 0; i < types.size(); ++i){ // 遍历查询涉及project column
const auto &column = this->table->column(i);
int64 len = column->chunk(0)->length();
this->chunk_info.emplace_back(len); // 列数据长度信息
this->chunks.push_back(column->chunk(0).get()); // 列数据
}

this->row = 0;
this->num_rows = this->table->num_rows();

return true;
}

populate_slot从chunks中获取记录组装成TupleTableSlot

populate_slot函数用于从parquet row中提取的数据封装成TupleTableSlot。如果是无schema模式,则调用schemaless_populate_slot函数。否则就是正常指定schema模式,仅仅关注targetlist或clauses涉及的列,slot中未涉及的列全部设置为null;

/* populate_slot Fill slot with the values from parquet row.
* If `fake` set to true the actual reading and populating the slot is skipped.
* The purpose of this feature is to correctly skip rows to collect sparse
* samples. */
void populate_slot(TupleTableSlot *slot, bool fake=false){
if (this->schemaless == true) {
schemaless_populate_slot(slot, fake); return;
}
/* Fill slot values */
for (int attr = 0; attr < slot->tts_tupleDescriptor->natts; attr++) {
int arrow_col = this->map[attr]; /* We only fill slot attributes if column was referred in targetlist or clauses. In other cases mark attribute as NULL. */
if (arrow_col >= 0){
ChunkInfo &chunkInfo = this->chunk_info[arrow_col];
arrow::Array *array = this->chunks[arrow_col];
TypeInfo &typinfo = this->types[arrow_col];

if (chunkInfo.pos >= chunkInfo.len) { // chunk中数据已经读取完成
const auto &column = this->table->column(arrow_col);
if (++chunkInfo.chunk >= column->num_chunks()) /* There are no more chunks */
break;
array = column->chunk(chunkInfo.chunk).get();
this->chunks[arrow_col] = array;
chunkInfo.pos = 0;
chunkInfo.len = array->length();
}

if (fake) continue; /* Don't do actual reading data into slot in fake mode */

if (array->IsNull(chunkInfo.pos)) { // 确定该列数据是否为null
slot->tts_isnull[attr] = true; chunkInfo.pos++; continue;
}
slot->tts_isnull[attr] = false;

/* Currently only primitive types and lists are supported */ // 根据arrow列类型进行相应的转换为postgres类型数据
switch (typinfo.arrow.type_id) {
case arrow::Type::LIST:{
arrow::ListArray *larray = (arrow::ListArray *) array;
slot->tts_values[attr] = this->nested_list_to_datum(larray, chunkInfo.pos, typinfo);
break;
}
case arrow::Type::MAP: {
arrow::MapArray* maparray = (arrow::MapArray*) array;
slot->tts_values[attr] = this->map_to_datum(maparray, chunkInfo.pos, typinfo);
break;
}
default: slot->tts_values[attr] = this->read_primitive_type(array, typinfo, chunkInfo.pos);
}
chunkInfo.pos++;
}else{
slot->tts_isnull[attr] = true;
}
}
}

};

schemaless_populate_slot函数用于在无schema模式下从parquet row中提取的数据封装成TupleTableSlot。主要依赖read_schemaless_column从chunks中获取一条记录,并将其组装成jsonb;并调用allocator函数在内存上下文中申请空间,将数据拷贝其中,再设置到slot的tts_values中。

/* schemaless_populate_slot      Fill slot with the jsonb values made from parquet row.
* If `fake` set to true the actual reading and populating the slot is skipped.
* The purpose of this feature is to correctly skip rows to collect sparse
* samples. */
void schemaless_populate_slot(TupleTableSlot *slot, bool fake=false){
memset(slot->tts_isnull, true, sizeof(bool) * slot->tts_tupleDescriptor->natts);
for (int attr = 0; attr < slot->tts_tupleDescriptor->natts; attr++) {
Jsonb *result;
Form_pg_attribute attr_desc = TupleDescAttr(slot->tts_tupleDescriptor, attr);
if (attr_desc->attisdropped || attr_desc->atttypid != JSONBOID) continue;
result = read_schemaless_column(fake); // 从chunks中获取一条记录,并将其组装成jsonb

/* Copy jsonb into memory block allocated by FastAllocator to prevent its destruction though to be able to recycle it once it fulfilled its purpose. */
void *res = allocator->fast_alloc(VARSIZE_ANY(result));
memcpy(res, (Jsonb *) result, VARSIZE_ANY(result));
pfree((Jsonb *) result);

slot->tts_isnull[attr] = false;
slot->tts_values[attr] = (Datum) res;
break;
}
}

read_schemaless_column函数主要功能是从chunks中获取一条记录,并将其组装成jsonb。

/* Get a record and contruct it to an jsonb value */
Jsonb *read_schemaless_column(bool fake=false) {
JsonbParseState *jb_pstate = NULL; JsonbValue *result;
result = pushJsonbValue(&jb_pstate, WJB_BEGIN_OBJECT, NULL); /* Fill jsonb values */

for (size_t i = 0; i < this->column_names.size(); i++) {
char *key = pstrdup(this->column_names[i].c_str());
bool isnull = false; Datum value = (Datum) 0;

ChunkInfo &chunkInfo = this->chunk_info[i]; // 第i列数据信息
arrow::Array *array = this->chunks[i]; // 第i列数据chunk
TypeInfo &typinfo = this->types[i]; // 第i列数据类型转换信息
int sorted_col = this->sorted_col_map[i]; // 第i列sorted column

Oid value_type = InvalidOid; Oid typoutput; bool typIsVarlena;
FmgrInfo finfo;

if (chunkInfo.pos >= chunkInfo.len){ // 该chunk中没有数据了,尝试获取下一个chunk
const auto &column = this->table->column(chunkInfo.pos);
if (++chunkInfo.chunk >= column->num_chunks()) break; /* There are no more chunks */ // 没有数据了
array = column->chunk(chunkInfo.chunk).get(); // 获取下一个chunk数据
this->chunks[i] = array;
chunkInfo.pos = 0; chunkInfo.len = array->length();
}

if (fake) continue; /* Don't do actual reading data into slot in fake mode */

if (strlen(key) == 0) throw std::runtime_error("key is null");
if (!array->IsNull(chunkInfo.pos)){
/* Currently only primitive types, lists and map are supported */
switch (typinfo.arrow.type_id) {
case arrow::Type::LIST: {
arrow::ListArray *larray = (arrow::ListArray *) array;
value = this->nested_list_to_jsonb_datum(larray, chunkInfo.pos, typinfo);
value_type = JSONBOID;
break;
}
case arrow::Type::MAP:{
arrow::MapArray* maparray = (arrow::MapArray*) array;
value = this->map_to_datum(maparray, chunkInfo.pos, typinfo);
value_type = JSONBOID;
break;
}
default:{
value_type = to_postgres_type(typinfo.arrow.type_id);
value = this->read_primitive_type(array, typinfo, chunkInfo.pos);
}
}
getTypeOutputInfo(value_type, &typoutput, &typIsVarlena);
fmgr_info(typoutput, &finfo);

if (sorted_col >= 0) {
this->sorted_cols_data[sorted_col].val = value;
this->sorted_cols_data[sorted_col].is_null = false;
}
} else {
isnull = true;
elog(DEBUG2, "key %s is null", key);
}

/* TODO: adding cstring would be cheaper than adding text */
push_jsonb_string_key(jb_pstate, key);
datum_to_jsonb(value, value_type, isnull, &finfo, jb_pstate, WJB_VALUE);
chunkInfo.pos++;
}

result = pushJsonbValue(&jb_pstate, WJB_END_OBJECT, NULL);
return JsonbValueToJsonb(result);
}


标签:DefaultParquetReader,slot,PostgreSQL,chunkInfo,S3,chunk,arrow,reader,row
From: https://blog.51cto.com/feishujun/6007522

相关文章