学习 探索 分享数据库知识 共建数据库技术交流圈
上篇图文openGauss数据库源码解析系列文章——事务机制源码解析(二)中,从MVCC可见性判断机制和进程内多线程管理机制两方面对事务并发控制进行了分享,本篇将对事务机制的锁机制方面内容进行详细介绍。
三、锁机制
数据库对公共资源的并发控制是通过锁来实现的,根据锁的用途不同,通常可以分为3种:自旋锁(spinlock)、轻量级锁(LWLock,light weight lock)和常规锁(或基于这3种锁的进一步封装)。使用锁的一般操作流程可以简述为3步:加锁、临界区操作、放锁。在保证正确性的情况下,锁的使用及争抢成为制约性能的重要因素,下面先简单介绍openGauss中的3种锁,最后再着重介绍openGauss基于鲲鹏架构所做的锁相关性能优化。
(一)自旋锁
(1) SpinLockInit:自旋锁的初始化。
(2) SpinLockAcquire:自旋锁加锁。
(3) SpinLockRelease:自旋锁释放锁。
(4) SpinLockFree:自旋锁销毁并清理相关资源。
(二)LWLock轻量级锁
(1) LWLockAssign:申请一个LWLock。
(2) LWLockAcquire:加锁。
(3 )LWLockConditionalAcquire:条件加锁,如果没有获取锁则返回false,并不一直等待。
(4) LWLockRelease:释放锁。
(5) LWLockReleaseAll:释放拥有的所有锁。当事务过程中出错了,会将持有的所有LWLock全部回滚释放,避免残留阻塞后续操作。
#define LW_FLAG_HAS_WAITERS ((uint32)1 << 30)
#define LW_FLAG_RELEASE_OK ((uint32)1 << 29)
#define LW_FLAG_LOCKED ((uint32)1 << 28)
#define LW_VAL_EXCLUSIVE ((uint32)1 << 24)
#define LW_VAL_SHARED 1 /* 用于标记LWLock的state状态,实现锁的获取和释放*/
typedef struct LWLock {
uint16 tranche; /* 轻量级锁的ID标识 */
pg_atomic_uint32 state; /* 锁的状态位 */
dlist_head waiters; /* 等锁线程的链表 */
#ifdef LOCK_DEBUG
pg_atomic_uint32 nwaiters; /* 等锁线程的个数 */
struct PGPROC *owner; /* 最后独占锁的持有者 */
#endif
#ifdef ENABLE_THREAD_CHECK
pg_atomic_uint32 rwlock;
pg_atomic_uint32 listlock;
#endif
} LWLock;
#define AccessShareLock 1 /* SELECT语句 */
#define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE语句 */
#define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE语句 */
#define ShareUpdateExclusiveLock \
4 /* VACUUM (non-FULL),ANALYZE, CREATE INDEX CONCURRENTLY语句 */
#define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY)语句 */
#define ShareRowExclusiveLock \
6 /* 类似于独占模式, 但是允许ROW SHARE模式并发 */
#define ExclusiveLock \
7 /* 阻塞ROW SHARE,如SELECT...FOR UPDATE语句 */
#define AccessExclusiveLock \
8 /* ALTER TABLE, DROP TABLE, VACUUM FULL, LOCK TABLE语句 */
typedef struct LOCKTAG {
uint32 locktag_field1; /* 32比特位*/
uint32 locktag_field2; /* 32比特位*/
uint32 locktag_field3; /* 32比特位*/
uint32 locktag_field4; /* 32比特位*/
uint16 locktag_field5; /* 32比特位*/
uint8 locktag_type; /* 详情见枚举类LockTagType*/
uint8 locktag_lockmethodid; /* 锁方法类型*/
} LOCKTAG;
typedef enum LockTagType {
LOCKTAG_RELATION, /* 表关系*/
/* LOCKTAG_RELATION的ID信息为所属库的OID+表OID;如果库的OID为0表示此表是共享表,其中OID为openGauss内核通用对象标识符 */
LOCKTAG_RELATION_EXTEND, /* 扩展表的优先权*/
/* LOCKTAG_RELATION_EXTEND的ID信息 */
LOCKTAG_PARTITION, /* 分区*/
LOCKTAG_PARTITION_SEQUENCE, /* 分区序列*/
LOCKTAG_PAGE, /* 表中的页*/
/* LOCKTAG_PAGE的ID信息为RELATION信息+BlockNumber(页面号)*/
LOCKTAG_TUPLE, /* 物理元组*/
/* LOCKTAG_TUPLE的ID信息为PAGE信息+OffsetNumber(页面上的偏移量) */
LOCKTAG_TRANSACTION, /* 事务ID (为了等待相应的事务结束) */
/* LOCKTAG_TRANSACTION的ID信息为事务ID号 */
LOCKTAG_VIRTUALTRANSACTION, /* 虚拟事务ID */
/* LOCKTAG_VIRTUALTRANSACTION的ID信息为它的虚拟事务ID号 */
LOCKTAG_OBJECT, /* 非表关系的数据库对象 */
/* LOCKTAG_OBJECT的ID信息为数据OID+类OID+对象OID+子ID */
LOCKTAG_CSTORE_FREESPACE, /* 列存储空闲空间 */
LOCKTAG_USERLOCK, /* 预留给用户锁的锁对象 */
LOCKTAG_ADVISORY, /* 用户顾问锁 */
LOCK_EVENT_NUM
} LockTagType;
typedef struct LOCK {
/* 哈希键 */
LOCKTAG tag; /* 锁对象的唯一标识 */
/* 数据 */
LOCKMASK grantMask; /* 已经获取锁对象的位掩码 */
LOCKMASK waitMask; /* 等待锁对象的位掩码 */
SHM_QUEUE procLocks; /* 与锁关联的PROCLOCK对象链表 */
PROC_QUEUE waitProcs; /* 等待锁的PGPROC对象链表 */
int requested[MAX_LOCKMODES]; /* 请求锁的计数 */
int nRequested; /* requested数组总数 */
int granted[MAX_LOCKMODES]; /* 已获取锁的计数 */
int nGranted; /* granted数组总数 */
} LOCK;
typedef struct PROCLOCK {
/* 标识 */
PROCLOCKTAG tag; /* proclock对象的唯一标识 */
/* 数据 */
LOCKMASK holdMask; /* 已获取锁类型的位掩码 */
LOCKMASK releaseMask; /* 预释放锁类型的位掩码 */
SHM_QUEUE lockLink; /* 指向锁对象链表的指针 */
SHM_QUEUE procLink; /* 指向PGPROC链表的指针 */
} PROCLOCK;
(1) LockAcquire:对锁对象加锁。
(2) LockRelease:对锁对象释放锁。
(3 )LockReleaseAll:释放所有锁资源。
(四)死锁检测机制
1. LWLock死锁检测自愈
(1) pgstat_read_light_detect:从统计信息结构体中读取线程及锁id相关的时间戳,并记录到指针队列中。
(2) lwm_compare_light_detect:跟几秒检测之前的时间对比,如果找到可能发生死锁的线程及锁id则返回true,否则返回false。
(1) lwm_heavy_diagnosis:检测是否有死锁。
(2) lwm_deadlock_report:报告死锁详细信息,方便定位诊断。
(3) lw_deadlock_auto_healing:治愈死锁,选择环中一个线程退出。
(1) lock_entry_id记录线程信息,有thread_id及sessionid是为了适配线程池框架,可以准确的从统计信息中找到相应的信息。对应的代码如下:
typedef struct {
ThreadId thread_id;
uint64 st_sessionid;
} lock_entry_id;
(2) lwm_light_detect记录可能出现死锁的线程,最后用一个链表的形式将当前所有信息串联起来。对应的代码如下:
typedef struct {
/* 线程ID */
lock_entry_id entry_id;
/* 轻量级锁检测引用计数 */
int lw_count;
} lwm_light_detect;
(3) lwm_lwlocks 记录线程相关的锁信息,持有锁数量,以及等锁信息。对应的代码如下:
typedef struct {
lock_entry_id be_tid; /* 线程ID */
int be_idx; /* 后台线程的位置 */
LWLockAddr want_lwlock; /* 预获取锁的信息 */
int lwlocks_num; /* 线程持有的轻量级锁个数 */
lwlock_id_mode* held_lwlocks; /* 线程持有的轻量级锁数组 */
} lwm_lwlocks;
2. 常规锁死锁检测
(1) DeadLockCheck:死锁检测函数。
(2) DeadLockCheckRecurse:如果死锁则返回true,如果有soft edge,返回false并且尝试解决死锁冲突。
(3) check_stack_depth:openGauss会检查死锁递归检测堆栈(死锁检测递归栈过长,会引发在死锁检测时,长期持有所有锁的LWLock分区,从而阻塞业务)。
(4) CheckDeadLockRunningTooLong:openGauss会检查死锁检测时间,防止死锁检测时间过长,阻塞后面所有业务。对应的代码如下:
static void CheckDeadLockRunningTooLong(int depth)
{ /* 每4层检测一下 */
if (depth > 0 && ((depth % 4) == 0)) {
TimestampTz now = GetCurrentTimestamp();
long secs = 0;
int usecs = 0;
if (now > t_thrd.storage_cxt.deadlock_checker_start_time) {
TimestampDifference(t_thrd.storage_cxt.deadlock_checker_start_time, now, &secs, &usecs);
if (secs > 600) { /* 如果从死锁检测开始超过十分钟,则报错处理。*/
#ifdef USE_ASSERT_CHECKING
DumpAllLocks();/* 在debug版本时,导出所有的锁信息,便于定位问题。*/
#endif
ereport(defence_errlevel(), (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Deadlock checker runs too long and is greater than 10 minutes.")));
}
}
}
}
(5) FindLockCycle:检查是否有死锁环。
(6) FindLockCycleRecurse:死锁检测内部递归调用函数。
(1) 死锁检测中最核心最关键的有向边数据结构。对应的代码如下:
typedef struct EDGE {
PGPROC *waiter; /* 等待的线程 */
PGPROC *blocker; /* 阻塞的线程 */
int pred; /* 拓扑排序的工作区 */
int link; /* 拓扑排序的工作区 */
} EDGE;
(2) 可重排的一个等待队列。对应的代码如下:
typedef struct WAIT_ORDER {
LOCK *lock; /* the lock whose wait queue is described */
PGPROC **procs; /* array of PGPROC *'s in new wait order */
int nProcs;
} WAIT_ORDER;
(3) 死锁检测最后打印的相应信息。对应的代码如下:
typedef struct DEADLOCK_INFO {
LOCKTAG locktag; /* 等待锁对象的唯一标识 */
LOCKMODE lockmode; /* 等待锁对象的锁类型 */
ThreadId pid; /* 阻塞线程的线程ID */
} DEADLOCK_INFO;
(五)无锁原子操作
(1) gs_atomic_add_32:32位原子加,并且返回加之后的值。对应的代码如下:
static inline int32 gs_atomic_add_32(volatile int32* ptr, int32 inc)
{
return __sync_fetch_and_add(ptr, inc) + inc;
}
(2) gs_atomic_add_64:64位原子加,并且返回加之后的值。对应的代码如下:
static inline int64 gs_atomic_add_64(int64* ptr, int64 inc)
{
return __sync_fetch_and_add(ptr, inc) + inc;
}
(3) gs_compare_and_swap_32:32位CAS操作,如果dest在更新前没有被更新,则将newval写到dest地址。dest地址的值没有被更新,就返回true;否则返回false。对应的代码如下:
static inline bool gs_compare_and_swap_32(int32* dest, int32 oldval, int32 newval)
{
if (oldval == newval)
return true;
volatile bool res = __sync_bool_compare_and_swap(dest, oldval, newval);
return res;
}
(4) gs_compare_and_swap_64:64位CAS操作,如果dest在更新前没有被更新,则将newval写到dest地址。dest地址的值没有被更新,就返回true;否则返回false。对应的代码如下:
static inline bool gs_compare_and_swap_64(int64* dest, int64 oldval, int64 newval)
{
if (oldval == newval)
return true;
return __sync_bool_compare_and_swap(dest, oldval, newval);
}
(5) arm_compare_and_swap_u128:openGauss提供跨平台的128位CAS操作,在ARM平台下,使用单独的指令集汇编了128位原子操作,用于提升内核测锁并发的性能,详情可以参考下一小结。对应的代码如下:
static inline uint128_u arm_compare_and_swap_u128(volatile uint128_u* ptr, uint128_u oldval, uint128_u newval)
{
#ifdef __ARM_LSE
return __lse_compare_and_swap_u128(ptr, oldval, newval);
#else
return __excl_compare_and_swap_u128(ptr, oldval, newval);
#endif
}
#endif
(6) atomic_compare_and_swap_u128:128位CAS操作,如果dest地址的值在更新前没有被其他线程更新,则将newval写到dest地址。dest地址的值没有被更新,就返回新值;否则返回被别人更新后的值。需要注意必须由上层的调用者保证传入的参数是128位对齐的。对应的代码如下:
static inline uint128_u atomic_compare_and_swap_u128(
volatile uint128_u* ptr,
uint128_u oldval = uint128_u{0},
uint128_u newval = uint128_u{0})
{
#ifdef __aarch64__
return arm_compare_and_swap_u128(ptr, oldval, newval);
#else
uint128_u ret;
ret.u128 = __sync_val_compare_and_swap(&ptr->u128, oldval.u128, newval.u128);
return ret;
#endif
}
(六)基于鲲鹏服务器的性能优化
1. WAL Group inset优化
(1) 不需要所有线程都竞争锁。
(2) 在同一时间窗口所有线程在争抢锁之前先加入到一个group中,第一个加入group的线程作为leader。通过CAS原子操作来实现队列的管理。
(3) leader线程代表整个group去争抢锁。group中的其他线程(follower)开始睡眠,等待leader唤醒。
(4) 争抢到锁后,leader线程将group里的所有线程想要插入的日志遍历一遍得到需要空间总大小。leader线程只执行一次reserve space操作。
(5) leader线程将group中所有线程想要写入的日志都写入到日志缓冲区中。
(6) 释放锁,唤醒所有follower线程。
(7) follower线程由于需要写入的日志已经被leader写入,不需要再争抢锁,直接进入后续流程。
static XLogRecPtr XLogInsertRecordGroup(XLogRecData* rdata, XLogRecPtr fpw_lsn)
{
…/* 初始化变量以及简单校验 */
START_CRIT_SECTION(); /* 开启临界区 */
proc->xlogGroupMember = true;
…
proc->xlogGroupDoPageWrites = &t_thrd.xlog_cxt.doPageWrites;
nextidx = pg_atomic_read_u32(&t_thrd.shemem_ptr_cxt.LocalGroupWALInsertLocks[groupnum].l.xlogGroupFirst);
while (true) {
pg_atomic_write_u32(&proc->xlogGroupNext, nextidx); /* 将上一个成员记录到proc结构体中 */
/* 防止ARM乱序:保证所有前面的写操作都可见 */
pg_write_barrier();
if (pg_atomic_compare_exchange_u32(&t_thrd.shemem_ptr_cxt.LocalGroupWALInsertLocks[groupnum].l.xlogGroupFirst,
&nextidx,
(uint32)proc->pgprocno)) {
break;
} /* 这一步原子操作获取上一个成员的proc no,如果是invalid,说明是leader。*/
}
/* 非leader成员不去获取WAL Insert锁,仅仅进行等待,直到被leader唤醒 */
if (nextidx != INVALID_PGPROCNO) {
int extraWaits = 0;
for (;;) {
/* 充当读屏障 */
PGSemaphoreLock(&proc->sem, false);
/* 充当读屏障 */
pg_memory_barrier();
if (!proc->xlogGroupMember) {
break;
}
extraWaits++;
}
while (extraWaits-- > 0) {
PGSemaphoreUnlock(&proc->sem);
}
END_CRIT_SECTION();
return proc->xlogGroupReturntRecPtr;
}
/* leader成员持有锁 */
WALInsertLockAcquire();
/* 计算每个成员线程的xlog record size */
…
/* leader线程将所有成员线程的xlog record插入到缓冲区 */
while (nextidx != INVALID_PGPROCNO) {
localProc = g_instance.proc_base_all_procs[nextidx];
if (unlikely(localProc->xlogGroupIsFPW)) {
nextidx = pg_atomic_read_u32(&localProc->xlogGroupNext);
localProc->xlogGroupIsFPW = false;
continue;
}
XLogInsertRecordNolock(localProc->xlogGrouprdata,
localProc,
XLogBytePosToRecPtr(StartBytePos),
XLogBytePosToEndRecPtr(
StartBytePos + MAXALIGN(((XLogRecord*)(localProc->xlogGrouprdata->data))->xl_tot_len)),
XLogBytePosToRecPtr(PrevBytePos));
PrevBytePos = StartBytePos;
StartBytePos += MAXALIGN(((XLogRecord*)(localProc->xlogGrouprdata->data))->xl_tot_len);
nextidx = pg_atomic_read_u32(&localProc->xlogGroupNext);
}
WALInsertLockRelease(); /* 完成工作放锁并唤醒所有成员线程 */
while (wakeidx != INVALID_PGPROCNO) {
PGPROC* proc = g_instance.proc_base_all_procs[wakeidx];
wakeidx = pg_atomic_read_u32(&proc->xlogGroupNext);
pg_atomic_write_u32(&proc->xlogGroupNext, INVALID_PGPROCNO);
proc->xlogGroupMember = false;
pg_memory_barrier();
if (proc != t_thrd.proc) {
PGSemaphoreUnlock(&proc->sem);
}
}
END_CRIT_SECTION();
return proc->xlogGroupReturntRecPtr;
}
2. Cache align消除伪共享
#ifdef __aarch64__
#define LWLOCK_PADDED_SIZE PG_CACHE_LINE_SIZE(128)
#else
#define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 32 ? 32 : 64)
#endif
typedef union LWLockPadded
{
LWLocklock;
charpad[LWLOCK_PADDED_SIZE];
} LWLockPadded;
当前锁逻辑中LWLock的访问仍然是最突出的热点之一。如果LWLOCK_PADDED_SIZE是32字节,且LWLock是按照一个连续的数组来存储的,对于64字节的缓存行可以同时容纳两个LWLockPadded,128字节的缓存行则可以同时含有4个LWLockPadded。当系统中对LWLock竞争激烈时,对应的缓存行不停地获取和失效,浪费大量CPU资源。故在ARM机器的优化下将padding_size直接设置为128,消除伪共享,提升整体LWLock的使用性能。
3. WAL INSERT 128CAS无锁临界区保护
优化的主要涉及思想是将两个64位的全局数据位置信息通过128位原子操作替换原子锁,消除原子锁本身在跨CPU访问、原子锁退避(backoff)、缓存一致性代价。如图6所示。
typedef union {
uint128 u128;
uint64 u64[2];
uint32 u32[4];
} uint128_u; * 为了代码可读及操作,将u128设计成union的联合结构体,内存位置进行64位数值的赋值。*/
static void ReserveXLogInsertLocation(uint32 size, XLogRecPtr* StartPos, XLogRecPtr* EndPos, XLogRecPtr* PrevPtr)
{
volatile XLogCtlInsert* Insert = &t_thrd.shemem_ptr_cxt.XLogCtl->Insert;
uint64 startbytepos;
uint64 endbytepos;
uint64 prevbytepos;
size = MAXALIGN(size);
#if defined(__x86_64__) || defined(__aarch64__)
uint128_u compare;
uint128_u exchange;
uint128_u current;
compare = atomic_compare_and_swap_u128((uint128_u*)&Insert->CurrBytePos);
loop1:
startbytepos = compare.u64[0];
endbytepos = startbytepos + size;
exchange.u64[0] = endbytepos; * 此处为了代码可读,将uint128设置成一个union的联合结构体。将起始和结束位置写入到exchange中。*/
exchange.u64[1] = startbytepos;
current = atomic_compare_and_swap_u128((uint128_u*)&Insert->CurrBytePos, compare, exchange);
if (!UINT128_IS_EQUAL(compare, current)) { * 如果被其他线程并发更新,重新循环*/
UINT128_COPY(compare, current);
goto loop1;
}
prevbytepos = compare.u64[1];
#else
SpinLockAcquire(&Insert->insertpos_lck); /* 其余平台使用自旋锁原子锁来保护变量更新 */
startbytepos = Insert->CurrBytePos;
prevbytepos = Insert->PrevBytePos;
endbytepos = startbytepos + size;
Insert->CurrBytePos = endbytepos;
Insert->PrevBytePos = startbytepos;
SpinLockRelease(&Insert->insertpos_lck);
#endif * __x86_64__|| __aarch64__ */
*StartPos = XLogBytePosToRecPtr(startbytepos);
*EndPos = XLogBytePosToEndRecPtr(endbytepos);
*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
}
4. CLOG Partition优化
如图7所示,CLOG的日志缓冲池在共享内存中且全局唯一,名称为名称为“CLOG Ctl”,为各工作线程共享该资源。在高并发的场景下,该资源的竞争成为性能瓶颈,优化分区后如图8。按页面号进行取模运算(求两个数相除的余数)将日志均分到多个共享内存的缓冲池中,由线程局部对象的数组ClogCtlData来记录,名称为“CLOG Ctl i”,同步增加共享内存中的缓冲池对象及对应的全局锁。也就是通过打散的方式提高整体吞吐。
/* CLOG分区*/
#define NUM_CLOG_PARTITIONS 256 /*分区打散的个数*/
/* CLOG轻量级分区锁*/
#define CBufHashPartition(hashcode) \
((hashcode) % NUM_CLOG_PARTITIONS)
#define CBufMappingPartitionLock(hashcode) \
(&t_thrd.shemem_ptr_cxt.mainLWLockArray[FirstCBufMappingLock + CBufHashPartition(hashcode)].lock)
#define CBufMappingPartitionLockByIndex(i) \
(&t_thrd.shemem_ptr_cxt.mainLWLockArray[FirstCBufMappingLock + i].lock)
5. 支持NUMA-aware数据和线程访问分布
(1) “int numa_run_on_node(int node);”将当前任务及子任务运行在指定的Node上。该API对应函数如下所示。
numa_run_on_node函数在特定节点上运行当前任务及其子任务。在使用numa_run_on_node_mask函数重置节点关联之前,这些任务不会迁移到其他节点的CPU上。传递-1让内核再次在所有节点上调度。成功时返回0;错误-1时返回,错误码记录在errno中。 |
(2) “void numa_set_localalloc(void);”将调用者线程的内存分配策略设置为本地分配,即优先从本节点进行内存分配。该API对应函数如下所示。
numa_set_localalloc函数 设置调用任务的内存分配策略为本地分配。在此模式下,内存分配的首选节点为内存分配时任务正在执行的节点。 |
(3) “void numa_alloc_onnode(void);”在指定的NUMA Node上申请内存。该API对应函数如下所示。
numa_alloc_onnode函数在特定节点上分配内存。分配大小为系统页的倍数并向上取整。如果指定的节点在外部拒绝此进程,则此调用将失败。与函数系列Malloc(3)相比,此函数相对较慢。必须使用numa_free函数释放内存。错误时返回NULL。 |
1) 全局PGPROC数组优化
#ifdef __USE_NUMA
if (nNumaNodes > 1) {
ereport(INFO, (errmsg("InitProcGlobal nNumaNodes: %d, inheritThreadPool: %d, groupNum: %d",
nNumaNodes, g_instance.numa_cxt.inheritThreadPool,
(g_threadPoolControler ? g_threadPoolControler->GetGroupNum() : 0))));
int groupProcCount = (TotalProcs + nNumaNodes - 1) nNumaNodes;
size_t allocSize = groupProcCount * sizeof(PGPROC);
for (int nodeNo = 0; nodeNo < nNumaNodes; nodeNo++) {
initProcs[nodeNo] = (PGPROC *)numa_alloc_onnode(allocSize, nodeNo);
if (!initProcs[nodeNo]) {
ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("InitProcGlobal NUMA memory allocation in node %d failed.", nodeNo)));
}
add_numa_alloc_info(initProcs[nodeNo], allocSize);
int ret = memset_s(initProcs[nodeNo], groupProcCount * sizeof(PGPROC), 0, groupProcCount * sizeof(PGPROC));
securec_check_c(ret, "\0", "\0");
}
} else {
#endif
2) 全局WALInsertLock数组优化
WALInsertLockPadded** insertLockGroupPtr =
(WALInsertLockPadded**)CACHELINEALIGN(palloc0(nNumaNodes * sizeof(WALInsertLockPadded*) + PG_CACHE_LINE_SIZE));
#ifdef __USE_NUMA
if (nNumaNodes > 1) {
size_t allocSize = sizeof(WALInsertLockPadded) * g_instance.xlog_cxt.num_locks_in_group + PG_CACHE_LINE_SIZE;
for (int i = 0; i < nNumaNodes; i++) {
char* pInsertLock = (char*)numa_alloc_onnode(allocSize, i);
if (pInsertLock == NULL) {
ereport(PANIC, (errmsg("XLOGShmemInit could not alloc memory on node %d", i)));
}
add_numa_alloc_info(pInsertLock, allocSize);
insertLockGroupPtr[i] = (WALInsertLockPadded*)(CACHELINEALIGN(pInsertLock));
}
} else {
#endif
char* pInsertLock = (char*)CACHELINEALIGN(palloc(
sizeof(WALInsertLockPadded) * g_instance.attr.attr_storage.num_xloginsert_locks + PG_CACHE_LINE_SIZE));
insertLockGroupPtr[0] = (WALInsertLockPadded*)(CACHELINEALIGN(pInsertLock));
#ifdef __USE_NUMA
}
#endif
typedef struct
{
LWLock lock;
#ifdef __aarch64__
pg_atomic_uint32xlogGroupFirst;
#endif
XLogRecPtrinsertingAt;
} WALInsertLock;
四、小结
文章转载自Gauss松鼠会,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。