ParallelCoordinator类定义在src/reader.hpp文件中,该类用于支持postgres parallel foreign scan能力。ParallelCoordinator类支持单个parquet文件扫描并行PC_SINGLE和多个parquet文件扫描并行PC_MULTI。latch提供多个进程同步的锁,用于保护data数据域,确定进程所使用的ParquetReader所扫描的parquet files和rowgroup。data联合体保存了对于PC_SINGLE的数据结构体single和对于PC_MULTI的数据结构体multi。
class ParallelCoordinator{
private:
enum Type { PC_SINGLE = 0, PC_MULTI };
Type type;
slock_t latch;
union {
struct {
int32 reader; /* current reader */
int32 nfiles; /* number of parquet files to read */
int32 rowgroup; /* current rowgroup */
int32 nrowgroups[FLEXIBLE_ARRAY_MEMBER]; /* per-file rowgroups numbers */
} single; /* single file and simple multifile case */
struct {
int32 next_rowgroup[FLEXIBLE_ARRAY_MEMBER]; /* per-reader counters */
} multi; /* multimerge case */
} data;
public:
void lock() { SpinLockAcquire(&latch); }
void unlock() { SpinLockRelease(&latch); }
};
init
init_single函数用于为PC_SINGLE初始化data.single数据域、初始化latch slock_t。PC_SINGLE初始化主要是设置nfiles和nrowgroups。init_multi函数用于为PC_MULTI初始化data.multi.next_rowgroup域。
void init_single(int32 *nrowgroups, int32 nfiles){
type = PC_SINGLE;
data.single.reader = -1; data.single.rowgroup =-1; data.single.nfiles = nfiles;
SpinLockInit(&latch);
if (nfiles) memcpy(data.single.nrowgroups, nrowgroups, sizeof(int32) * nfiles);
}
void init_multi(int nfiles){
type = PC_MULTI;
for (int i = 0; i < nfiles; ++i)
data.multi.next_rowgroup[i] = 0;
}
next_reader
next_reader函数主要用于PC_SINGLE,主要考虑两个情况:所有parquet文件是否全部读取、当前parquet文件包含的rowgroup是否全部读取。如果当前parquet文件未读取、当前parquet文件包含的rowgroup未读取,则返回当前reader。否则递增当前reader标识符和重置当前rowgroup计数器。
/* Get the next reader id. Caller must hold the lock. */
int32 next_reader(){
if (type == PC_SINGLE) {
/* Return current reader if it has more rowgroups to read */
if (data.single.reader >= 0 && data.single.reader < data.single.nfiles && data.single.nrowgroups[data.single.reader] > data.single.rowgroup + 1)
return data.single.reader;
data.single.reader++; data.single.rowgroup = -1;
return data.single.reader;
}
Assert(false && "unsupported");
return -1;
}
next_rowgroup
next_rowgroup函数可以用于PC_SINGLE和PC_MULTI。对于PC_MULTI,通过reader_id递增per-reader counters计数器next_rowgroup[reader_id] 。对于PC_SINGLE,递增data.single.rowgroup计数器。
/* Get the next reader id. Caller must hold the lock. */
int32 next_rowgroup(int32 reader_id) {
if (type == PC_SINGLE){
if (reader_id != data.single.reader) return -1;
return ++data.single.rowgroup;
} else {
return data.multi.next_rowgroup[reader_id]++;
}
Assert(false && "unsupported");
return -1;
}