有几个参数控制并行执行的行为
zjh@postgres=# show %parallel%;
name | setting | description
----------------------------------+---------+----------------------------------------------------------------------------------------------------
enable_parallel_append | on | Enables the planner's use of parallel append plans.
enable_parallel_hash | on | Enables the planner's use of parallel hash plans.
force_parallel_mode | off | Forces use of parallel query facilities.
max_parallel_maintenance_workers | 8 | Sets the maximum number of parallel processes per maintenance operation.
max_parallel_workers | 64 | Sets the maximum number of parallel workers that can be active at one time.
max_parallel_workers_per_gather | 8 | Sets the maximum number of parallel processes per executor node.
min_parallel_index_scan_size | 128MB | Sets the minimum amount of index data for a parallel scan.
min_parallel_table_scan_size | 2GB | Sets the minimum amount of table data for a parallel scan.
parallel_leader_participation | on | Controls whether Gather and Gather Merge also run subplans.
parallel_setup_cost | 10000 | Sets the planner's estimate of the cost of starting up worker processes for parallel query.
parallel_tuple_cost | 0.1 | Sets the planner's estimate of the cost of passing each tuple (row) from worker to master backend.
(11 rows)
lightdb支持自动计算并行执行,也支持优化器提示。
select /*+ parallel(t 4 hard)*/ count(1) from big_table t;
lightdb也支持oracle兼容模式优化器提示,即parallel(t 4),详见Parallel(table [# of workers] [soft|hard])。
支持普通索引并行创建
drop index idx_file_name;
CREATE INDEX idx_file_name ON big_search_doc_new_ic USING btree (filename);
GIN索引不支持并行执行,所以适合citus分布式架构做全文检索。
CREATE INDEX big_search_doc_new_ic_tsvector_content_idx ON big_search_doc_new_ic USING gin (tsvector_content);
并行执行不支持insert select,create table as select。其原因是可见性实现(也就是高并发下低成本的MVCC实现)还不太好:
- The combo CID mappings. This is needed to ensure consistent answers to
tuple visibility checks. The need to synchronize this data structure is
a major reason why we can't support writes in parallel mode: such writes
might create new combo CIDs, and we have no way to let other workers
(or the initiating backend) know about them.
除此之外,还包括函数、特性本身不支持,分为三种级别PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE,可通过select proparallel from pg_catalog.pg_proc where proname ='floor'进行查询。详见https://www.hs.net/lightdb/docs/html/catalog-pg-proc.html
oracle并行执行
由于pg很大程度上利用linux pagecache,所以I/O这一块不是问题。
在SMP并行执行而言,有两种模式,在数据库中我们一般理解都是数据切片并行执行(intra-parallelism)。类似如下:
另外一种是操作间并行执行(也就是管道/ETL的模式,流式计算如spark、flink经常采用):
Postgresql内部并行执行的实现
进程之间通过信号进行通信,PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */
typedef struct ParallelExecutorInfo
{
PlanState *planstate; /* plan subtree we're running in parallel */
ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; /* points to bufusage area in DSM */
WalUsage *wal_usage; /* walusage area in DSM */
SharedExecutorInstrumentation *instrumentation; /* optional */
struct SharedJitInstrumentation *jit_instrumentation; /* optional */
dsa_area *area; /* points to DSA area in DSM */
dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */
bool finished; /* set true by ExecParallelFinish */
/* These two arrays have pcxt->nworkers_launched entries: */
shm_mq_handle **tqueue; /* tuple queues for worker output */
struct TupleQueueReader **reader; /* tuple reader/writer support */
} ParallelExecutorInfo;
reader负责从worker产生结果存储的共享队列读取记录。
typedef struct ParallelWorkerInfo
{
BackgroundWorkerHandle *bgwhandle;
shm_mq_handle *error_mqh;
int32 pid;
} ParallelWorkerInfo;
typedef struct ParallelContext
{
dlist_node node;
SubTransactionId subid;
int nworkers; /* Maximum number of workers to launch */
int nworkers_to_launch; /* Actual number of workers to launch */
int nworkers_launched;
char *library_name;
char *function_name;
ErrorContextCallback *error_context_stack;
shm_toc_estimator estimator;
dsm_segment *seg;
void *private_memory;
shm_toc *toc;
ParallelWorkerInfo *worker;
int nknown_attached_workers;
bool *known_attached_workers;
} ParallelContext;
typedef struct ParallelWorkerContext
{
dsm_segment *seg;
shm_toc *toc;
} ParallelWorkerContext;