哈希Hash分布是Greenlum最常用的数据分布方式。根据预定义的分布键(distributed by key)计算用户数据的哈希值,然后把哈希值映射到某个segment上。 分布键可以包含多个字段。分布键选择是否恰当是Greenplum能否发挥性能的主要因素,适合的分布键将数据均匀分布到各个 segment 上,避免数据倾斜。
重要结构体和函数
Greenplum 计算分布键哈希值的代码在src/backend/cdb/cdbhash.c文件中。结构体CdbHash是处理分布键哈希的主要数据结构。 计算分布键哈希值的逻辑为:
- 使用
CdbHash *makeCdbHash(int numsegs, int natts, Oid *hashfuncs)
创建一个 CdbHash 结构体 - 然后对每个 tuple 执行下面操作,计算该 tuple 对应的哈希值,并确定该tuple应该分布到哪个segment上:
○cdbhashinit():执行初始化操作,仅仅是初始化hash初始值
○cdbhash():这个函数会调用hashDatum()针对不同类型做不同的预处理,最后将处理后的列值添加到哈希计算中
○cdbhashreduce() :映射哈希值到某个 segment
CdbHash结构体
CdbHash结构体的定义如下所示,其中hash成员就是哈希结果值,也就是根据分布键类型对应的哈希函数计算的用户数据哈希值,numsegs成员就是参与哈希值映射segment的数量。reducealg成员是枚举类型REDUCE_LAZYMOD
、REDUCE_BITMASK
和REDUCE_JUMP_HASH
,其指定了哈希值映射到某个segment使用的不同mod函数。is_legacy_hash成员指明使用greeplum数据块实现的hash函数而不是postgresql提供的hash函数。natts是参与哈希的分布键的数量,hashfuncs成员存放分布键类型对应的哈希函数组成的数组。
typedef struct CdbHash{
uint32 hash; /* The result hash value */ /* 哈希结果值 */
int numsegs; /* number of segments in Greenplum Database used for partitioning */ /* segment 的个数 */
CdbHashReduce reducealg;/* the algorithm used for reducing to buckets */ /* 用于减少桶的算法 */
bool is_legacy_hash;
int natts;
FmgrInfo *hashfuncs; // 分布键类型对应的哈希函数组成的数组
} CdbHash;
typedef enum{
REDUCE_LAZYMOD = 1,
REDUCE_BITMASK,
REDUCE_JUMP_HASH
} CdbHashReduce;
makeCdbHash
CdbHash *makeCdbHash(int numsegs, int natts, Oid *hashfuncs)
创建一个 CdbHash 结构体,它维护了以下信息:Segment 的个数、Reduction 方法和分布键类型对应的哈希函数。
CdbHash *makeCdbHash(int numsegs, int natts, Oid *hashfuncs){
CdbHash *h = palloc(sizeof(CdbHash)); /* Allocate a new CdbHash, with space for the datatype OIDs. */
/* set this hash session characteristics. */
h->hash = 0;
h->numsegs = numsegs;
h->hashfuncs = (FmgrInfo *) palloc(natts * sizeof(FmgrInfo)); /* Load hash function info */
bool is_legacy_hash = false;
for (int i = 0; i < natts; i++){
Oid funcid = hashfuncs[i]; // 从形参上取出分布键类型对应的哈希函数
if (isLegacyCdbHashFunction(funcid)) is_legacy_hash = true; // 只有有一个是legacy hash函数,就设置
fmgr_info(funcid, &h->hashfuncs[i]);
}
h->natts = natts;
h->is_legacy_hash = is_legacy_hash;
/* set the reduction algorithm: If num_segs is power of 2 use bit mask, else use lazy mod (h mod n) */
if (is_legacy_hash) h->reducealg = ispowof2(numsegs) ? REDUCE_BITMASK : REDUCE_LAZYMOD;
else h->reducealg = REDUCE_JUMP_HASH; // 非legacy hash,使用JUMP_HASH
return h;
}
Legacy哈希函数的oid是在源代码里面写死的,有如下29种哈希函数:F_CDBLEGACYHASH_INT2、F_CDBLEGACYHASH_INT4、F_CDBLEGACYHASH_INT8、F_CDBLEGACYHASH_FLOAT4、F_CDBLEGACYHASH_FLOAT8、F_CDBLEGACYHASH_NUMERIC、F_CDBLEGACYHASH_CHAR、F_CDBLEGACYHASH_TEXT、F_CDBLEGACYHASH_BPCHAR、F_CDBLEGACYHASH_BYTEA、F_CDBLEGACYHASH_NAME、F_CDBLEGACYHASH_OID、F_CDBLEGACYHASH_TID、F_CDBLEGACYHASH_TIMESTAMP、F_CDBLEGACYHASH_TIMESTAMPTZ、F_CDBLEGACYHASH_DATE、F_CDBLEGACYHASH_TIME、F_CDBLEGACYHASH_TIMETZ、F_CDBLEGACYHASH_INTERVAL、F_CDBLEGACYHASH_INET、F_CDBLEGACYHASH_MACADDR、F_CDBLEGACYHASH_BIT、F_CDBLEGACYHASH_BOOL、F_CDBLEGACYHASH_ARRAY、F_CDBLEGACYHASH_OIDVECTOR、F_CDBLEGACYHASH_CASH、F_CDBLEGACYHASH_UUID、F_CDBLEGACYHASH_COMPLEX、F_CDBLEGACYHASH_ANYENUM。
如果segment 个数是2的幂,则使用 REDUCE_BITMASK(使用hash&(numsegment-1)),否则使用 REDUCE_LAZYMOD(使用hash%numsegment),没啥区别,就是是2的幂直接使用bitmask与替换mod就行。
cdbhashinit
结构体内的 hash 值将会为每个 tuple 初始化,这个操作发生在 cdbhashinit() 中。如果使用legacy hash,则重置hash值为初始偏移基础量h->hash = FNV1_32_INIT;;否则重置为零。
void cdbhashinit(CdbHash *h) {
if (!h->is_legacy_hash) h->hash = 0;
else{ /* reset the hash value to the initial offset basis */
h->hash = FNV1_32_INIT;
}
}
cdbhash
void cdbhash(CdbHash *h, int attno, Datum datum, bool isnull)
函数通过传入attno指定第几个分布键,通过传入datumn传入该分布键的值或者设置isnull表明传入的是null值;通过attno找到指定的hashfuncs[attno - 1]
对分布键的值或null进行hash计算。
void cdbhash(CdbHash *h, int attno, Datum datum, bool isnull){
uint32 hashkey = h->hash;
if (!h->is_legacy_hash){ // 非legacy hash
hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); /* rotate hashkey left 1 bit at each step */
if (!isnull){ // 不处理null值
LOCAL_FCINFO(fcinfo, 1);
uint32 hkey;
InitFunctionCallInfoData(*fcinfo, &h->hashfuncs[attno - 1], 1, DEFAULT_COLLATION_OID, /* have to specify collation for attribute of text or bpchar */ NULL, NULL);
fcinfo->args[0].value = datum; fcinfo->args[0].isnull = false;
hkey = DatumGetUInt32(FunctionCallInvoke(fcinfo)); // 计算hash值
if (fcinfo->isnull) elog(ERROR, "function %u returned NULL", fcinfo->flinfo->fn_oid); /* Check for null result, since caller is clearly not expecting one */
hashkey ^= hkey;
}
}else{
magic_hash_stash = hashkey;
if (!isnull){ // 处理非null值
LOCAL_FCINFO(fcinfo, 1);
uint32 hkey;
InitFunctionCallInfoData(*fcinfo, &h->hashfuncs[attno - 1], 1, DEFAULT_COLLATION_OID, /* legacy hash functions don't care about collations */ NULL, NULL);
fcinfo->args[0].value = datum; fcinfo->args[0].isnull = false;
hkey = DatumGetUInt32(FunctionCallInvoke(fcinfo));
if (fcinfo->isnull) elog(ERROR, "function %u returned NULL", fcinfo->flinfo->fn_oid); /* Check for null result, since caller is clearly not expecting one */
hashkey = hkey;
}else hashkey = cdblegacyhash_null();
magic_hash_stash = FNV1_32_INIT;
}
h->hash = hashkey;
}
legacyhash对null值进行hash计算时使用的函数是cdblegacyhash_null,null值定义为((uint32)0XF0F0F0F1)。
/* Update the hash value for a null Datum, for any datatype. */
uint32 cdblegacyhash_null(void){
uint32 nullbuf = NULL_VAL; /* stores the constant value that represents a NULL */
return hashFn((char *) &nullbuf, sizeof(nullbuf));
}
cdbhashreduce
unsigned int cdbhashreduce(CdbHash *h)
:函数映射哈希值到某个 segment,主要逻辑是取模,如下所示:
/* Reduce the hash to a segment number. */
unsigned int cdbhashreduce(CdbHash *h){
int result = 0;
switch (h->reducealg){ /* Reduce our 32-bit hash value to a segment number */
case REDUCE_BITMASK: result = FASTMOD(h->hash, (uint32) h->numsegs); /* fast mod (bitmask) */ break;
case REDUCE_LAZYMOD: result = (h->hash) % (h->numsegs); /* simple mod */ break;
case REDUCE_JUMP_HASH: result = jump_consistent_hash(h->hash, h->numsegs); break;
}
return result;
}
jump_consistent_hash函数是不使用legacy hash,而使用postgresql提供的hash函数时使用的取模函数,其jump consistent hash algorithm来自于论文https://arxiv.org/abs/1406.229
。具体原理参考一致性哈希及其在Greenplum中的应用。
/* The following jump consistent hash algorithm is just the one from the original paper: https://arxiv.org/abs/1406.229 */
static inline int32 jump_consistent_hash(uint64 key, int32 num_segments){
int64 b = -1; int64 j = 0;
while (j < num_segments){
b = j;
key = key * 2862933555777941757ULL + 1;
j = (b + 1) * ((double)(1LL << 31) / (double)((key >> 33) + 1));
}
return b;
}
代码中的例子
对于每一个 tuple 要执行下面的flow:cdbhashinit、cdbhash、cdbhashreduce。我们以src/backend/cdb/cdbmutate.c文件中的cdbhash_const_list函数为例来介绍整个flow。该函数通过形参提供的分布键的值列表plConsts获取其应该分布到的segment。该函数的执行流程如下所示:
- 调用makeCdbHash创建一个 CdbHash 结构体pcdbhash
- 调用cdbhashinit初始化hash值
- 遍历plConsts中的分布键值,传入cdbhash函数计算hash值
- 调用cdbhashreduce函数将hash值映射到第几个segment
/* Hash a list of const values with GPDB's hash function */
int32 cdbhash_const_list(List *plConsts, int iSegments, Oid *hashfuncs) {
ListCell *lc;
CdbHash *pcdbhash = makeCdbHash(iSegments, list_length(plConsts), hashfuncs);
cdbhashinit(pcdbhash);
int i = 0;
foreach(lc, plConsts){
Const *pconst = (Const *) lfirst(lc);
cdbhash(pcdbhash, i + 1, pconst->constvalue, pconst->constisnull);
i++;
}
return cdbhashreduce(pcdbhash);
}
哈希函数
我们看到makeCdbHash函数需要传入分布键类型对应的hash函数oid(F_CDBLEGACYHASH_INT2、F_CDBLEGACYHASH_INT4、F_CDBLEGACYHASH_INT8、F_CDBLEGACYHASH_FLOAT4、F_CDBLEGACYHASH_FLOAT8、F_CDBLEGACYHASH_NUMERIC、F_CDBLEGACYHASH_CHAR、F_CDBLEGACYHASH_TEXT、F_CDBLEGACYHASH_BPCHAR、F_CDBLEGACYHASH_BYTEA、F_CDBLEGACYHASH_NAME、F_CDBLEGACYHASH_OID、F_CDBLEGACYHASH_TID、F_CDBLEGACYHASH_TIMESTAMP、F_CDBLEGACYHASH_TIMESTAMPTZ、F_CDBLEGACYHASH_DATE、F_CDBLEGACYHASH_TIME、F_CDBLEGACYHASH_TIMETZ、F_CDBLEGACYHASH_INTERVAL、F_CDBLEGACYHASH_INET、F_CDBLEGACYHASH_MACADDR、F_CDBLEGACYHASH_BIT、F_CDBLEGACYHASH_BOOL、F_CDBLEGACYHASH_ARRAY、F_CDBLEGACYHASH_OIDVECTOR、F_CDBLEGACYHASH_CASH、F_CDBLEGACYHASH_UUID、F_CDBLEGACYHASH_COMPLEX、F_CDBLEGACYHASH_ANYENUM),如下为这些hash函数对应的实现(定义在src/backend/cdb/cdblegacyhash.c文件中),其实这些函数都是调用hashFn函数来实现的。
uint32 magic_hash_stash = FNV1_32_INIT;
static uint32 hashFn(char *buf, int len) {
return fnv1_32_buf(buf, len, magic_hash_stash);
}
static uint32 fnv1_32_buf(void *buf, size_t len, uint32 hval){
unsigned char *bp = (unsigned char *) buf; /* start of buffer */
unsigned char *be = bp + len; /* beyond end of buffer */
/* FNV-1 hash each octet in the buffer */
while (bp < be) {
hval += (hval << 1) + (hval << 4) + (hval << 7) + (hval << 8) + (hval << 24); /* multiply by the 32 bit FNV magic prime mod 2^32 */
hval ^= (uint32) *bp++; /* xor the bottom with the current octet */
}
return hval; /* return our new hash value */
}
hash函数OID和hash函数API在系统表(pg_amproc和pg_proc)中的对应关系在pg_amproc.dat定义,由inidb初始化数据库时插入系统表中。pg_amproc系统表记录的是对应的access method operator familiy所需要的函数历程,amprocfamily列指定的就是operator familiy,比如这里的hash/cdbhash_integer_ops,而amproclefttype和amprocrighttype就是执行该operator familiy函数历程的参数类型,比如这里的int2和int2,amprocnum指的是支持该类型的函数历程的数量,最后的amproc指的就是函数历程的oid,可以看到这里就是cdblegacyhash_int2。
pg_proc系统表中记录了函数、存储过程、聚合函数和窗口函数的信息。这里可以看出procsrc指明了该函数的函数名为cdblegacyhash_int2,返回值类型为int4,参数类型为int2。
gp_use_legacy_hashops