在openGauss(PostgreSQL)的两表JOIN中,如果遇到等值连接、且JOIN条件上没有索引时,HASH JOIN通常是一种较为高效的JOIN连接方式。本文将从源码的角度详细介绍openGauss中如何实现HASH JOIN。
HASH JOIN原理
HASH JOIN指的是两表JOIN时,内表先按照连接列建立HASH表,然后遍历外表的每一条记录,使用HASH表的HASH函数对外表连接值求HASH值,从而快速判断内表是否存在满足条件的记录,若满足则直接完成两条记录的JOIN。当HASH表能够完全放在内存中时,这种方法相对于NESTLOOP JOIN来说减少扫描内表的全表扫描的次数为1次,因此极大提升了JOIN的性能。当内表较大时,内存中无法存放该HASH表,此时需要将一部分记录存放在磁盘中,此时如何保证外表的每条记录一定不会漏掉内表中匹配的记录是本文想要介绍的HASH JOIN算法的关键内容。
在openGauss中HASH JOIN分为以下6个阶段:
- HJ_BUILD_HASHTABLE:HASH JOIN的第一个阶段建立HASH表;
- HJ_NEED_NEW_OUTER:已经建立完HASH索引之后HASH JOIN的HASH过程中,读取外表的一条记录;
- HJ_SCAN_BUCKET:读取到的外表记录的HASH值已经和HASH表匹配上,此时需要读取HASH表对应的bucket中的记录;
- HJ_FILL_OUTER_TUPLE:发现内表的记录不匹配,考虑到左连接、全连接等场景,构造空的内表记录和外表JOIN发送给客户端;
- HJ_FILL_INNER_TUPLES:对于当前HASH表,已经没有能够外表记录与之匹配,考虑到右连接、全连接等场景,需要构造空的外表记录和内表JOIN,然后发送到给客户端;
- HJ_NEED_NEW_BATCH:在HASH表溢出磁盘存储场景中,当前内存中的HASH表已经全部被外表匹配完,需要将磁盘中的下一批数据加载到HASH表中。
接下来将详细介绍以上6个阶段,并介绍HASH JOIN是如何循环以上6个阶段来完成JOIN的过程。
Build Hashtable:HJ_BUILD_HASHTABLE
这个阶段是HASH JOIN的起始阶段,且只会进入该阶段一次,即扫描内表建立HASH表。这个过程中如果HASH表内存不够,会自适应扩内存/远程借用内存或者直接将HASH表中的元组溢出存储在本地文件中。
创建hash表
openGauss中的HASH表的结构如下图所示:
/*
* HashTable (total of nbuckets)
* [ ] <--hash_header
* [ ]
* [ ]->[ ]->[ ]->....
* [ ]
* [ ]
* [ ]
* ^^^ bucket_bytes
* ^^^^^^^^^^^^^^^inner_rel_bytes
*
* bucket_bytes = hash_header_size * nbuckets
* nbuckets = ntuples/ntuple_per_bucket
* inner_rel_bytes = ntuples * tupsize
* hash_table_bytes = bucket_bytes + inner_rel_bytes;
*/
openGauss的内存HASH表由结构体HashJoinTableData
定义,其中数组buckets
记录了该HASH表的所有bucket地址,相同HASH值的元组在同一个bucket中使用单链表链接在一起,HASH表中元组以及bucket的头部的定义如下:
typedef struct HashJoinTupleData {
struct HashJoinTupleData* next; /* link to next tuple in same bucket */
uint32 hashvalue; /* tuple's hash code */
/* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
} HashJoinTupleData;
typedef struct HashJoinTupleData* HashJoinTuple;
openGauss首先会根据统计信息中内表的元组数量、大小以及当前允许内存HASH表(work_mem
)的大小,初始化HASH表中bucket的数量,并假定每个bucket只有一个元组,这个数量不一定准确,如果最终发现内存不够会将元组溢出到磁盘或者重新分配bucket的数量,重新进行HASH,但是最终bucket的数量一定是2的幂,这个计算过程在函数ExecChooseHashTableSize
中。
bucket与batch
如上所述,bucket是内存HASH表中的概念,openGauss会根据元组计算HASH值,根据该HASH值对HASH表中的nbucket取余的结果,定位该元组所属的bucket。
如果内表数据过多,那么会有一部分数据溢出到磁盘中,为了尽量避免多次重复扫描,openGauss对磁盘中的文件也进行了分区,每个分区的大小也均为HASH表的内存大小,代码中每个HASH表内存大小的分区就称为一个batch
,在build HASH表阶段,当前HASH表中是batch 0,临时文件中分为是batch 1, batch 2 …。HASH JOIN的probe阶段就是依次按照每个batch进行JOIN的。
因此对于一个元组,它的位置包括batch no
和bucket no
,这两个值都是元组的HASH值计算得出:
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int* bucketno, int* batchno)
{
uint32 nbuckets = (uint32)hashtable->nbuckets;
uint32 nbatch = (uint32)hashtable->nbatch;
if (nbatch > 1) {
/* we can do MOD by masking, DIV by shifting */
*bucketno = hashvalue & (nbuckets - 1);
/* 向右循环移动nbucket的位数,即去掉nbucket的位数对nbatch取余 */
*batchno = pg_rotate_right32(hashvalue, hashtable->log2_nbuckets) & (nbatch - 1);
} else {
*bucketno = hashvalue & (nbuckets - 1);
*batchno = 0;
}
}
在创建HASH表阶段,首先会根据内表的大小和HASH表的最大大小估算出此次build阶段batch的数量,因此nbucket * nbatch可以认为是估计的总的hash元组数
扫描内表插入HASH表
向HASH表中插入记录的算法比较简单,关键过程在函数ExecHashTableInsert
中:
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int planid, int dop,
Instrumentation *instrument)
{
/* 从执行器的slot中读取记录,在build hash阶段该记录来自内表,在probe阶段该记录来自临时文件(batchno > 0) */
MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
....
/* 根据hash值计算该记录所属的batchno和bucketno*/
ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
/* 记录的batchno和内存hash表同属一个batch,则该记录需要插入到hash表中 */
if (batchno == hashtable->curbatch) {
...
double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
/* 将该元组放在对应bucket的前方 */
hashTuple = (HashJoinTuple)dense_alloc(hashtable, hashTupleSize);
hashTuple->hashvalue = hashvalue;
errorno = memcpy_s(HJTUPLE_MINTUPLE(hashTuple), tuple->t_len, tuple, tuple->t_len);
hashTuple->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = hashTuple;
...
/* 如果当前hash表中的元组数,已经超过了hash的最大允许元组数,则认为需要对hash表的bucket扩容,
这个地方只是做个标记,caller MultiExecHash会完成正在的扩容行为。*/
if ((hashtable->nbatch == 1) && (hashtable->nbuckets_optimal <= INT_MAX / 2) && /* overflow protection */
(ntuples >= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) {
hashtable->nbuckets_optimal *= 2;
hashtable->log2_nbuckets_optimal += 1;
}
/* 计算当前hash表的内存开销,判断是否需要扩容或者split到文件 */
hashtable->spaceUsed += hashTupleSize;
.....
} else {
/* 否则的话,将该元组插入到临时文件中 */
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(tuple, hashvalue, &hashtable->innerBatchFile[batchno]);
hashtable->spill_count += 1;
*hashtable->spill_size += sizeof(uint32) + tuple->t_len;
pgstat_increase_session_spill_size(sizeof(uint32) + tuple->t_len);
}
}
在HASH表的插入过程中,HASH表中的元组都是从HASH表的chunk(HashMemoryChunkData
)中分配的,HashMemoryChunkData
是由多个最大32KB的内存块的链表组成,每个chunk中同时记录了当前chunk中HASH元组的记录数。
HASH表扩容或split到磁盘
判断HASH表扩容或者分裂到磁盘主要考虑到如下因素:
- 预估HASH表的内存是否已经超过了最大允许内存
(hashtable->spaceUsed + int64(hashtable->nbuckets_optimal * sizeof(HashJoinTuple)) > hashtable->spaceAllowed)
; - 当前系统内存是否已经重度使用:
gs_sysmemory_busy(hashtable->spaceUsed * dop, false)
; - 是否本地内存不足且无法借用远端内存:
u_sess->local_memory_exhaust && rackBusy
;
openGauss首先判断条件2和条件3是否满足,任意一个满足都需要分裂部分数据到临时文件(batch扩容)中,如果仅条件1满足且当前batch no为0且openGauss允许动态扩展内存(扩展比例高于MEM_AUTO_SPREAD_MIN_RATIO
:10%),那么就可以直接扩展bucket。
- bucket扩容
bucket的扩容在函数ExecHashIncreaseBuckets
中完成,,每次扩容都是使用repalloc将bucket扩一倍。因为hashtable->nbuckets
一定是2的指数,因此直接遍历hash表的每个bucket,将其中的每个元组的HASH值对hashtable->nbuckets
取整,若整数不为0,则直接该元组移动到扩容后数组的后半部分对应的位置即可。...... hashtable->buckets = (HashJoinTuple*)repalloc(hashtable->buckets, hashtable->nbuckets * 2 * sizeof(HashJoinTuple)); /* 遍历桶中的每个元素 */ for (int i = 0; i < hashtable->nbuckets; i++) { HashJoinTuple htuple = hashtable->buckets[i]; ... while (htuple != NULL) { /* 通过移位操作实现对hashtable->buckets取整 */ int offset = (htuple->hashvalue >> hashtable->log2_nbuckets) & 1; ... if (offset == 1) { /* 因为扩容操作每次都是*2,因此offset是取整结果和1做与操作相当于对2取模,若结果为1,表示该元组属于后半个数组 */ if (prev == NULL) hashtable->buckets[i] = htuple->next; else prev->next = htuple->next; htuple->next = hashtable->buckets[i + hashtable->nbuckets]; hashtable->buckets[i + hashtable->nbuckets] = htuple; Assert((int32)(htuple->hashvalue % (hashtable->nbuckets * 2)) == i + hashtable->nbuckets); nmove++; } else { prev = htuple; Assert((int32)(htuple->hashvalue % (hashtable->nbuckets * 2)) == i); } htuple = next; } ... } ...
- batch扩容
batch扩容发生的场景是HASH
表的内存使用超过了限制,因此batch扩容会重新申请内存,在这个过程中可能会同步修改bucket的数量,因此整个HASH表都需要重建。openGauss会从hash表的chunk中扫描所有的元组,然后插入到新的HASH表中,每扫描完一个chunk就释放一个chunk。
可以看到batch扩容的过程中,只会将HASH表中的元组重新分配到新的batch中,但是存量的batch并不会,存量batch的re-HASH会在加载到内存HASH表中完成,即...... /* 调整bucket的数量 */ if (hashtable->nbuckets_optimal != hashtable->nbuckets) { hashtable->nbuckets = hashtable->nbuckets_optimal; hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; /* 申请bucket */ hashtable->buckets = (struct HashJoinTupleData **)repalloc(hashtable->buckets, sizeof(HashJoinTuple) * hashtable->nbuckets); } /* 重置所有的buckets */ rc = memset_s(hashtable->buckets, sizeof(HashJoinTuple) * hashtable->nbuckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets); oldchunks = hashtable->chunks; hashtable->chunks = NULL; /* 扫描hash表的每个旧chunk */ while (oldchunks != NULL) { HashMemoryChunk nextchunk = oldchunks->next; /* 遍历每个chunk的元组 */ while (idx < oldchunks->used) { ninmemory++; /* 重新计算bucket no和batch no */ ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, &bucketno, &batchno); /* 如果计算出来的batch no和hash表中curbatch相等,说明命中当前hash表,直接插入内存 */ if (batchno == curbatch) { /* 重新申请内存,当前内存后续会释放掉 */ HashJoinTuple copyTuple = (HashJoinTuple)dense_alloc(hashtable, hashTupleSize); rc = memcpy_s(copyTuple, hashTupleSize, hashTuple, hashTupleSize); /* and add it back to the appropriate bucket */ copyTuple->next = hashtable->buckets[bucketno]; hashtable->buckets[bucketno] = copyTuple; } else { /* 直接将该元组顺序插入到临时文件中 */ ExecHashJoinSaveTuple( HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, &hashtable->innerBatchFile[batchno]); hashtable->spaceUsed -= hashTupleSize; nfreed++; } /* next tuple in this chunk */ idx += MAXALIGN(hashTupleSize); ... } /* we're done with this chunk - free it and proceed to the next one */ pfree_ext(oldchunks); oldchunks = nextchunk; }
HJ_NEED_NEW_BATCH
阶段。
至此,HASH表已经建立完成,接下来就是probe阶段,首先获取外表的一条记录,即进入到HJ_NEED_NEW_OUTER
阶段。
寻找下一条外表记录:HJ_NEED_NEW_OUTER
从外表中读取一条记录,如果若:
- 该记录是空记录:当前JOIN是RIGHT JOIN/RIGHT ANTI JIOIN/RIGHT FULL ANTI JOIN/FULL JOIN,则此时进入到HJ_FILL_INNER_TUPLES,否则进入到HJ_NEED_NEW_BATCH;
- 如果该记录存在:计算该元组的HASH值,并根据HASH值计算bucket no和batch no,如果batch no和HASH表的cur_batch不相等,则需要将该记录插入到外表对应的临时文件中(对应batch no),然后重新进入HJ_NEED_NEW_OUTER寻找下一条外表记录,否则的话进入HJ_SCAN_BUCKET状态;
扫描bucket记录:HJ_SCAN_BUCKET
进入该阶段意味着openGuass已经外表记录已经在HASH表中找到了对应的桶,此时需要扫描HASH表中对应的桶(ExecScanHashBucket
),在这个过程中,为了提高RIGHT ANIT/SEMI/ANTI FULL JOIN的效率,还会记录相同hash值记录的前一条记录。在找到hash值完全相同的记录的情况下,针对不同的JOIN类型有不同的操作:
- RIGHT ANIT:此时不能输出该JOIN记录,并可以直接删除hash表中的记录;
- RIGHT SEMI:需要输出该JOIN记录,且该记录后续也不需要,可以直接删除;
- RIGHT ANTI FULL:此时不能输出该JOIN记录,并可以直接删除hash表中的记录;
- LEFT ANTI/LEFT ANTI FULL:直接跳过该JOIN记录,重新进入HJ_NEED_NEW_OUTER;
- sigle match(UNIQUE_INNER/LEFT SEMI):输出该JOIN记录,并进入HJ_NEED_NEW_OUTER;
- 其他JOIN:输出该匹配记录,并再次进入HJ_SCAN_BUCKET,寻找下个可能匹配的记录;
如果ExecScanHashBucket
没有返回匹配的记录,进入到HJ_FILL_OUTER_TUPLE(可能存在LEFT JOIN)
填充完整外表JOIN记录:HJ_FILL_OUTER_TUPLE
将HASH JOIN的下一轮状态标记为HJ_NEED_NEW_OUTER,且若此时发现内表中无匹配记录且该JOIN需要填充NULL内表记录(LEFT JOIN),直接将外表记录和NULL记录JOIN输出。
填充完整内表JOIN记录:HJ_FILL_INNER_TUPLES
进入该阶段意味着当前batch的HASH表的已经找不到对应的外表记录,此时为处理RIGHT JOIN/RIGHT ANTI JIOIN/RIGHT FULL ANTI JOIN/FULL JOIN,我们需要将当前内存HASH表所有记录输出,然后进入HJ_NEED_NEW_BATCH阶段。
加载下一轮batch到hash表:HJ_NEED_NEW_BATCH
前面提到如果内表过大,在HJ_BUILD_HASHTABLE阶段HASH表放不下所有记录,此时会将内表溢出到临时文件存储(这个过程会伴随着HASH表bucket的扩容或者batch的扩容),probe阶段会再次按顺序将临时文件加载到内存HASH表中。此外,在probe阶段,如果外表记录的batch no无法匹配当前HASH表中的batch no,也会将外表记录写入对应batch no的外表临时文件中。
因此进入该阶段,意味着当前batch的JOIN已经处理完毕,需要将临时文件的重新加载到HASH表中。
如果发现当前HASH JOIN的cur_batch不为0,可以直接关闭外表对应batch的临时表,这说明当前batch的外表记录已经完成HASH JOIN,接着就需要继续确定加载哪个batch的内表临时文件的batch到HASH表了:
...
curbatch++;
while (curbatch < nbatch &&
(hashtable->outerBatchFile[curbatch] == NULL || hashtable->innerBatchFile[curbatch] == NULL)) {
/* 外表当前batch存在临时文件,且需要处理LEFT/FULL JOIN等case. */
if (hashtable->outerBatchFile[curbatch] && HJ_FILL_OUTER(hjstate))
break; /* must process due to rule 1 */
/* 内表当前batch存在临时文件,且需要处理RIGHT/FULL JOIN等case. */
if (hashtable->innerBatchFile[curbatch] && HJ_FILL_INNER(hjstate))
break; /* must process due to rule 1 */
/* 内表当前batch存在临时文件,且在建立HASH表/probe阶段发生过batch扩容,这意味当前batch可能会存中其他batch的记录,需要进行重新hash,因此需要加载当前batch。 */
if (hashtable->innerBatchFile[curbatch] && nbatch != hashtable->nbatch_original)
break; /* must process due to rule 2 */
/* 外表当前batch存在临时文件,且在build HASH表/probe阶段发生过batch扩容,这意味当前batch可能会存中其他batch的记录,需要进行重新hash,因此需要加载当前batch。 */
if (hashtable->outerBatchFile[curbatch] && nbatch != hashtable->nbatch_outstart)
break; /* must process due to rule 3 */
/* 未发生过扩容,如果对应的batch,不是内外表都具备的,可以直接跳过。*/
if (hashtable->innerBatchFile[curbatch])
BufFileClose(hashtable->innerBatchFile[curbatch]);
hashtable->innerBatchFile[curbatch] = NULL;
if (hashtable->outerBatchFile[curbatch])
BufFileClose(hashtable->outerBatchFile[curbatch]);
hashtable->outerBatchFile[curbatch] = NULL;
curbatch++;
}
...
确定好batch后,如果内表存在对应batch的临时文件(否则hash表为空,此轮batch的hash join只需要处理外表记录即可),就需要将内表对应batch临时文件的记录读入到HASH表中,这个过程是在函数ExecHashTableInsert
完成的,因此在这个过程中如果发现内存使用紧张或者HASH冲突比较严重,仍然会再次发生bucket/batch的扩容。加载完成后,继续进入HJ_NEED_NEW_OUTER,后续外表记录的读取就直接从当前batch对应的临时文件中读取即可。
...
/* 重置hash表 */
ExecHashTableReset(hashtable);
innerFile = hashtable->innerBatchFile[curbatch];
if (innerFile != NULL) {
if (BufFileSeek(innerFile, 0, 0L, SEEK_SET)) {
ereport(
ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join build side temporary file: %m")));
}
/* 顺序从临时文件中读取记录并插入到hash表中 */
while ((slot = ExecHashJoinGetSavedTuple(hjstate, innerFile, &hashvalue, hjstate->hj_HashTupleSlot))) {
/*
* NOTE: some tuples may be sent to future batches. Also, it is
* possible for hashtable->nbatch to be increased here!
*/
ExecHashTableInsert(hashtable,
slot,
hashvalue,
hjstate->js.ps.plan->righttree->plan_node_id,
SET_DOP(hjstate->js.ps.plan->righttree->dop));
}
...
/* 关闭并删除内表对应batch的临时文件 */
BufFileClose(innerFile);
hashtable->innerBatchFile[curbatch] = NULL;
}
/* 下一轮batch的JOIN从外表当前batch的临时文件中读取记录。 */
if (hashtable->outerBatchFile[curbatch] != NULL) {
if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
ereport(
ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join probe side temporary file: %m")));
}
...
可以看到,如果存在临时文件,那么openGauss也总是先使用内表确定好总共的batch数量,因此在JOIN过程中,即使batch的数量一直在变化,外表记录对应的batch no仍能找到正确的batch号。
如果发现没有batch可以加载,则整个HASH JOIN结束。
总结
本文介绍了openGauss基础的HASH JOIN执行过程,从HJ_BUILD_HASHTABLE
阶段开始在以下5个阶段循环,中间需要在HJ_NEED_NEW_OUTER
阶段读取外表记录,在HJ_SCAN_BUCKET
阶段读取内表记录,如果涉及半连接/反半连接/全连接等此时会进入HJ_FILL_OUTER_TUPLE
或者HJ_FILL_INNER_TUPLES
阶段,如果存在临时文件需要在HJ_NEED_NEW_BATCH
阶段加载临时文件到hash表,直到HJ_NEED_NEW_BATCH
无法找到新的batch。
此外,如果内表元组存在数据倾斜,openGauss还会为倾斜数据建立独立的hash表;对于并行HASH JOIN,分别针对build和probe阶段进行并行;这两个优化与HASH JOIN的基本原理类似,本文不再赘述。