学习 探索 分享数据库知识 共建数据库技术交流圈
上篇图文openGauss数据库源码解析系列文章——事务机制源码解析(一)中,从事务状态机、事务ID分配及CLOG/CSNLOG两方面对事务并发控制进行了内容分享,本篇将接着从MVCC可见性判断机制和进程内多线程管理机制两方面对事务并发控制展开介绍。
(三)MVCC可见性判断机制
openGauss利用多版本并发控制来维护数据的一致性。当扫描数据时每个事务看到的只是拿快照那一刻的数据,而不是数据当前的最新状态。这样就可以避免一个事务看到其他并发事务的更新而导致不一致的场景。使用多版本并发控制的主要优点是读取数据的锁请求与写数据的锁请求不冲突,以此来实现读不阻塞写,写也不阻塞读。下面介绍事务的隔离级别以及openGauss可见性判断CSN机制。
1. 事务隔离级别
SQL标准考虑了并行事务间应避免的现象,定义了以下几种隔离级别,如表1所示。
表1 事务隔离级别
隔离级别 | P0:脏写 | P1:脏读 | P2:不可重复读 | P3:幻读 |
读未提交 | 不可能 | 可能 | 可能 | 可能 |
读已提交 | 不可能 | 不可能 | 可能 | 可能 |
可重复读 | 不可能 | 不可能 | 不可能 | 可能 |
可串行化 | 不可能 | 不可能 | 不可能 | 不可能 |
(2) 脏读(dirty read):一个事务可以读取另一个事务未提交的修改数据。
(3) 不可重复读(fuzzy read):一个事务重复读取前面读取过的数据,数据的结果被另外的事务修改。
(4) 幻读(phantom):一个事务重复执行范围查询,返回一组符合条件的数据,每次查询的结果集因为其他事务的修改发生改变(条数)。
在各类数据库实现的过程中,并发事务产生了一些新的现象,在原来的隔离级别的基础上,有了一些扩展。如表2所示。
表2 事务隔离级别扩展
隔离级别 | P0:脏写 | P1:脏读 | P4:更新丢失 | P2:不可重复读 | P3:幻读 | A5A:读偏斜 | A5B:写偏斜 |
读未提交 | 不可能 | 可能 | 可能 | 可能 | 可能 | 可能 | 可能 |
读已提交 | 不可能 | 不可能 | 可能 | 可能 | 可能 | 可能 | 可能 |
可重复读 | 不可能 | 不可能 | 不可能 | 不可能 | 可能 | 不可能 | 不可能 |
快照一致性读 | 不可能 | 不可能 | 不可能 | 不可能 | 偶尔 | 不可能 | 可能 |
可串行化 | 不可能 | 不可能 | 不可能 | 不可能 | 不可能 | 不可能 | 不可能 |
(5) 更新丢失(lost update):一个事务在读取元组并更新该元组的过程中,有另一个事务修改了该元组的值,导致最终这次修改丢失。
(6) 读偏斜(read skew):假设数据x,y有隐式的约束x+y=100;事务一读取x=50;事务二写x=25并更新y=75保证约束成立,事务二提交,事务一再读取y=75,导致事务一中读取x+y=125,不满足约束。
(7) 写偏斜(write skew):假设数据x,y有隐式的约束x+y<=100;事务一读取x=50,并写入y=50;事务二读取y=30并写入x=70,并提交;事务一再提交;最终导致x=70,y=50不满足x+y<=100的约束。
openGauss提供读已提交隔离级别和可重复读隔离级别:在实现上可重复读隔离级别无幻读问题,有A5B写偏斜问题。
2. CSN机制
1) CSN原理简单如图1所示。
图1 CSN原理
每个非只读事务在运行过程中会取得一个xid号,在事务提交时会推进CSN,同时会将当前CSN与事务的xid映射关系保存起来(CSNLOG)。图5-12中,实心竖线标识取snapshot(快照)时刻,会获取最新提交CSN(3)的下一个值4。TX1、TX3、TX5已经提交,对应的CSN号分别是1、2、3。TX2、TX4、TX6正在运行,TX7、TX8是未来还未开启的事务。对于当前snapshot而言,严格小于CSN号4的事务提交结果均可见;其余事务提交结果在获取快照时刻还未提交,不可见。
2) MVCC快照可见性判断的流程
获取快照时记录当前活跃的最小的xid,记为snapshot.xmin。当前最新提交的“事务id(latestCompleteXid) + 1”,记为snapshot.xmax。当前最新提交的“CSN号 + 1”(NextCommitSeqNo),记为snapshot.csn。可见性判断的简易流程如图2所示。
图2 MVCC快照可见性判断流程
(1) xid大于等于snapshot.xmax时,该事务id不可见。
(2) xid比snapshot.xmin小时,说明该事务id在本次事务启动以前已经结束,需要去CLOG查询事务的提交状态,并在元组头上设置相应的标记位。
(3) xid处于snapshot.xmin和snapshot.xmax之间时,需要从CSN-XID映射中读取事务结束的CSN;如果CSN有值且比snapshot.csn小,表示该事务可见,否则不可见。
3) 提交流程
事务提交流程如图3所示。
图3 提交流程
(1) 设置CSN-XID映射commit-in-progress标记。
(2) 原子更新NextCommitSeqNo值。
(3) 生成redo日志,写CLOG,写CSNLOG。
(4) 更新PGPROC将对应的事务信息从PGPROC中移除,xid设置为InvalidTransactionId、xmin设置为InvalidTransactionId等。
4) 热备支持
在事务的提交流程步骤(1)与(2)之间,增加commit-in-progress的XLOG日志。备机在读快照时,首先获取轻量锁ProcArrayLock,并计算当前快照。如果使用当前快照中的CSN时,碰到xid对应的CSN号有COMMITSEQNO_COMMIT_INPROGRESS标记,则必须等待相应的事务提交XLOG回放结束后再读取相应的CSN判断是否可见。为了实现上述等待操作,备机在对commit-in-progress的XLOG日志做redo操作时,会调用XactLockTableInsert函数获取相应xid的事务排他锁;其他的读事务如果访问到该xid,会等待在此xid的事务锁上直到相应的事务提交XLOG回放结束后再继续运行。
3. 关键数据结构及函数
1) 快照
快照相关代码如下:
typedef struct SnapshotData {
SnapshotSatisfiesFunc satisfies; /* 判断可见性的函数;通常使用MVCC,即HeapTupleSatisfiesMVCC */
TransactionId xmin; /*当前活跃事务最小值,小于该值的事务说明已结束 */
TransactionId xmax; /*最新提交事务id(latestCompeleteXid)+1,大于等于改值说明事务还未开始,该事务id不可见 */
TransactionId* xip; /*记录当前活跃事务链表,在CSN版本中该值无用 */
TransactionId* subxip; /* 记录缓存子事务活跃链表,在CSN版本中该值无用 */
uint32 xcnt; /* 记录活跃事务的个数(xip中元组数)在CSN版本中该值无用 */
GTM_Timeline timeline; /* openGauss单机中无用 */
uint32 max_xcnt; /* xip的最大个数,CSN版本中该值无用 */
int32 subxcnt; /* 缓存子事务活跃链表的个数,在CSN版本中该值无用 */
int32 maxsubxcnt; /* 缓存子事务活跃链表最大个数,在CSN版本中该值无用 */
bool suboverflowed; /* 子事务活跃链表是否已超过共享内存中预分配的上限,在CSN版本中无用。 */
CommitSeqNo snapshotcsn; /* 快照的CSN号,一般为最新提交事务的CSN号+1(NextCommitSeqNo),CSN号严格小于该值的事务可见。 */
int prepared_array_capacity; /* 单机openGauss无用 */
int prepared_count; /* 单机openGauss无用 */
TransactionId* prepared_array; /* 单机openGauss无用 */
bool takenDuringRecovery; /* 是否Recovery过程中产生的快照 */
bool copied; /* 该快照是会话级别静态的,还是新分配内存拷贝的 */
CommandId curcid; /*事务块中的命令序列号,即同一事务中,前面插入的数据,后面可见。 */
uint32 active_count; /* ActiveSnapshot stack的refcount */
uint32 regd_count; /* RegisteredSnapshotList 的refcount*/
void* user_data; /* 本地多版本快照使用,标记该快照还有线程使用,不能直接释放 */
SnapshotType snapshot_type; /* openGauss单机无用 */
} SnapshotData;
2) HeapTupleSatisfiesMVCC
用于一般读事务的快照扫描,基于CSN的大体逻辑,详细代码如下:
bool HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
{
…… /* 初始化变量 */
if (!HeapTupleHeaderXminCommitted(tuple)) { /* 此处先判断用一个bit位记录的hint bit(提示比特位:openGauss判断可见性时,通常需要知道元组xmin和xmax对应的clog的提交状态;为了避免重复访问clog,openGauss内部对可见性判断进行了优化。hint bit是把事务状态直接记录在元组头中,用一个bit位来表示提交和回滚状态。openGauss并不会在事务提交或者回滚时主动更新元组上的 hint bit,而是等到访问该元组并进行可见性判断时,如果发现hint bit没有设置,则从 CLOG 中读取并设置,否则直接读取hint bit的值),防止同一条tuple反复获取事务最终提交状态。如果一次扫描发现该元组的xmin/xmax已经提交,就会打上相应的标记,加速扫描;如果没有标记则继续判断。 */
if (HeapTupleHeaderXminInvalid(tuple)) /* 同样判断hint bit。如果xmin已经标记为invalid说明插入该元组的事务已经回滚,直接返回不可见 */
return false;
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(page, tuple))) { /* 如果是一个事务内部,需要去判断该元组的CID,也即是同一个事务内,后面的查询可以查到当前事务之前插入的扫描结果 */
…….
} else { /* 如果扫描别的事务,需要根据快照判断事务是否可见 */
visible = XidVisibleInSnapshot(HeapTupleHeaderGetXmin(page, tuple), snapshot, &hintstatus); /* 通过csnlog判断事务是否可见,并且返回该事务的最终提交状态 */
if (hintstatus == XID_COMMITTED) /* 如果该事务提交,则打上提交的hint bit用于加速判断 */
SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED, HeapTupleHeaderGetXmin(page, tuple));
if (hintstatus == XID_ABORTED) {
… /* 如果事务回滚,则打上回滚标记 */
SetHintBits(tuple, buffer, HEAP_XMIN_INVALID, InvalidTransactionId);
}
if (!visible) { /* 如果xmin不可见,则该元组不可见,否则表示插入该元组的事务对于该次快照已经提交,继续判断删除该元组的事务是否对该次快照提交 */
return false;
}
}
}
} else { /* 如果该条元组的xmin已经被打上提交的hint bit,则通过函数接口CommittedXidVisibleInSnapshot判断是否对本次快照可见 */
/* xmin is committed, but maybe not according to our snapshot */
if (!HeapTupleHeaderXminFrozen(tuple) &&
!CommittedXidVisibleInSnapshot(HeapTupleHeaderGetXmin(page, tuple), snapshot)) {
return false;
}
}
…… /* 后续xmax的判断同xmin类似,如果xmax对于本次快照可见,则说明删除该条元组的事务已经提交,则不可见,否则可见,此处不再赘述 */
if (!(tuple->t_infomask & HEAP_XMAX_COMMITTED)) {
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(page, tuple))) {
if (HeapTupleHeaderGetCmax(tuple, page) >= snapshot->curcid)
return true; /* 在扫面前该删除事务已经提交 */
else
return false; /* 扫描开始后删除操作的事务才提交 */
}
visible = XidVisibleInSnapshot(HeapTupleHeaderGetXmax(page, tuple), snapshot, &hintstatus);
if (hintstatus == XID_COMMITTED) {
/* 设置xmax的hint bit */
SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED, HeapTupleHeaderGetXmax(page, tuple));
}
if (hintstatus == XID_ABORTED) {
/* 回滚或者故障 */
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID, InvalidTransactionId);
}
if (!visible) {
return true; /* 快照中xmax对应的事务不可见,则认为该元组仍然活跃 */
}
} else {
/* xmax对应的事务已经提交,但是快照中该事务不可见,认为删除该元组的操作未完成,仍然认为该元组可见 */
if (!CommittedXidVisibleInSnapshot(HeapTupleHeaderGetXmax(page, tuple), snapshot)) {
return true; /* 认为元组可见 */
}
}
return false;
}
该函数的逻辑同MVCC类似,只是此时并没有统一快照,而仅仅是判断当前xmin/xmax的状态,而不再继续调用XidVisibleInSnapshot函数、CommittedXidVisibleInSnapshot函数来判断是否对快照可见。
4) HeapTupleSatisfiesVacuum
根据传入的OldestXmin的值返回相应的状态。死亡元组(openGauss多版本机制中不可见的旧版本元组)且没有任何其他未结束的事务可能访问该元组(xmax<oldestXmin),可以被VACUUM清理。本函数具体代码如下:
HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin, Buffer buffer)
{
…… /* 初始化变量
if (!HeapTupleHeaderXminCommitted(tuple)) { * hint bit标记加速,与MVCC的逻辑相同。 */
if (HeapTupleHeaderXminInvalid(tuple)) /* 如果xmin未提交,则返回该元组死亡,可以清理。 */
return HEAPTUPLE_DEAD;
xidstatus = TransactionIdGetStatus(HeapTupleGetRawXmin(htup), false); /* 通过CSNLog来获取当前的事务状态 */
if (xidstatus == XID_INPROGRESS) {
if (tuple->t_infomask & HEAP_XMAX_INVALID) /* 如果xmax还没有,说明没有人删除,此时判断该元组正在插入过程中,否则在删除过程中 */
return HEAPTUPLE_INSERT_IN_PROGRESS;
return HEAPTUPLE_DELETE_IN_PROGRESS; /* 返回正在删除的过程中 */
} else if (xidstatus == XID_COMMITTED) { /* 如果xmin提交了,打上hint bit,后面继续看xmax是否提交。 */
SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED, HeapTupleGetRawXmin(htup));
} else {
…. /* 事务结束了且未提交,可能是abort或者是crash的事务,一般返回死亡,可删除;单机情形 t_thrd.xact_cxt.useLocalSnapshot没有作用,恒为false。 */
SetHintBits(tuple, buffer, HEAP_XMIN_INVALID, InvalidTransactionId);
return ((!t_thrd.xact_cxt.useLocalSnapshot || IsInitdb) ? HEAPTUPLE_DEAD : HEAPTUPLE_LIVE);
}
}
/* 接着判断xmax。如果还没有设置xmax说明没有人删除该元组,返回元组存活,不可删除。 */
if (tuple->t_infomask & HEAP_XMAX_INVALID)
return HEAPTUPLE_LIVE;
……
if (!(tuple->t_infomask & HEAP_XMAX_COMMITTED)) { /*如果xmax提交,则看xmax是否比oldesxmin小。小的话说明没有未结束的事务会访问该元组,可以删除。 */
xidstatus = TransactionIdGetStatus(HeapTupleGetRawXmax(htup), false);
if (xidstatus == XID_INPROGRESS)
return HEAPTUPLE_DELETE_IN_PROGRESS;
else if (xidstatus == XID_COMMITTED)
SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED, HeapTupleGetRawXmax(htup));
else {
… /* xmax对应的事务abort或者crash */
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID, InvalidTransactionId);
return HEAPTUPLE_LIVE;
}
}
/*判断该元组是否可以删除,xmax<OldestXmin可以删除。 */
if (!TransactionIdPrecedes(HeapTupleGetRawXmax(htup), OldestXmin))
return ((!t_thrd.xact_cxt.useLocalSnapshot || IsInitdb) ? HEAPTUPLE_RECENTLY_DEAD : HEAPTUPLE_LIVE);
/* 该元组可以认为已经DEAD,不被任何活跃事务访问,可以删除。 */
return ((!t_thrd.xact_cxt.useLocalSnapshot || IsInitdb) ? HEAPTUPLE_DEAD : HEAPTUPLE_LIVE);
}
设置xid对应CSNLog的标记位COMMITSEQNO_COMMIT_INPROGRESS(详情见“五(二)事务ID分配及CLOG/CSNLOG”的第2节),表示此xid对应的事务正在提交过程中。该操作是为了保证可见性判断时的原子性,即为了防止并发读事务在CSN号设置的过程中读到不一致的数据。
6) CSNLogSetCommitSeqNo
给对应的xid设置相应的CSNLog。
7) RecordTransactionCommit
记录事务提交,主要是写CLOG、CSNLOG的XLOG日志以及写CLOG及CSNLOG。
(四)进程内多线程管理机制
简述进程内多线程管理机制相关数据结构及多版本快照计算机制。
1. 事务信息管理
数据库启动时候维护了一段共享内存,每个线程初始化的时候会从这个共享内存中获取一个槽位并将其线程信息记录到槽位中。获取快照时,需要在共享内存数组中更新槽位信息,事务结束时,需要从槽位中将其事务信息清除。计算快照时,通过遍历该全局数组,获取当前所有并发线程的事务信息,并计算出快照信息(xmin、xmax、snapshotcsn等)。事务信息管理的关键数据结构代码如下:
typedef struct PGXACT {
GTM_TransactionHandle handle; /* 单机模式无用参数 */
TransactionId xid; /* 该线程持有的xid号,如果没有则为0 */
TransactionId prepare_xid; /* 准备阶段的xid号*/
TransactionId xmin; /* 当前事务开启时最小的活跃xid,vaccum操作不会删除那些xid大于等于xmin的元组。 */
CommitSeqNo csn_min; /* 当前事务开启时最小的活跃CSN号*/
TransactionId next_xid; /* 单机模式无用参数*/
int nxids; /* 子事物个数*/
uint8 vacuumFlags; /* vacuum操作相关的flag */
bool needToSyncXid; /* 单机模式无用参数*/
bool delayChkpt; /* 如果该线程需要checkpoint线程延迟等待,此值为true
#ifdef __aarch64__ */
char padding[PG_CACHE_LINE_SIZE - PGXACT_PAD_OFFSET]; /* 为了性能考虑的结构体对齐*/
#endif
} PGXACT;
struct PGPROC {
SHM_QUEUE links; /* 链表中的指针 */
PGSemaphoreData sem; /* 休眠等待的信号量 */
int waitStatus; /* 等待状态 */
Latch procLatch; /* 线程的通用闩锁 */
LocalTransactionId lxid; /* 当前线程本地顶层事务ID */
ThreadId pid; /* 线程的PID */
ThreadId sessMemorySessionid;
uint64 sessionid; /* 线程池模式下当前的会话ID */
int logictid; /* 逻辑线程ID */
TransactionId gtt_session_frozenxid; /* 会话级全局临时表的冻结XID */
int pgprocno;
int nodeno;
/* 线程启动时下面这些数据结构为0 */
BackendId backendId; /* 线程的后台ID */
Oid databaseId; /* 当前访问数据库的OID */
Oid roleId; /* 当前用户的OID */
/* 版本号,用于升级过程中新老版本的判断 */
uint32 workingVersionNum;
/*热备模式下,标记当前事务是否收到冲突信号。设置该值时需要持有ProcArray锁。 */
bool recoveryConflictPending;
/* 线程等待的轻量级锁信息. */
bool lwWaiting; /* 当等待轻量级锁时,为真 */
uint8 lwWaitMode; /* 预获取锁的模式 */
bool lwIsVictim; /* 强制放弃轻量级锁 */
dlist_node lwWaitLink; /* 等待在相同轻量级锁对象的下一个等待者 */
/* 线程等待的常规锁信息 */
LOCK* waitLock; /* 等待的常规锁对象 */
PROCLOCK* waitProcLock; /* 等待常规锁对象的持有者 */
LOCKMODE waitLockMode; /* 预获取常规锁对象的模式 */
LOCKMASK heldLocks; /* 本线程获取锁对象模式的位掩码 */
/* 等待主备机回放日志同步的信息 */
XLogRecPtr waitLSN; /* 等待的lsn*/
int syncRepState; /* 等待主备同步的状态 */
bool syncRepInCompleteQueue; /* 是否等待在完成队列中 */
SHM_QUEUE syncRepLinks; /* 指向同步队列的指针 */
DataQueuePtr waitDataSyncPoint; /* 数据页复制的数据同步点 */
int dataSyncRepState; /* 数据页复制的同步状态 */
SHM_QUEUE dataSyncRepLinks; /* 指向数据页同步队列的指针*/
MemoryContext topmcxt; /* 本线程的顶层内存上下文 */
char myProgName[64];
pg_time_t myStartTime;
syscalllock deleMemContextMutex;
SHM_QUEUE myProcLocks[NUM_LOCK_PARTITIONS];
/* 以下结构为了实现XID批量提交 */
/* 是否为XID批量提交中的成员 */
bool procArrayGroupMember;
/* XID批量提交中的下一个成员 */
pg_atomic_uint32 procArrayGroupNext;
/* 父事务XID和子事物XID中的最大者 */
TransactionId procArrayGroupMemberXid;
/* 提交序列号 */
CommitSeqNo commitCSN;
/* 以下结构为了实现CLOG批量提交 */
bool clogGroupMember; /* 是否为CLOG批量提交中的成员*/
pg_atomic_uint32 clogGroupNext; /* CLOG批量提交中的下一个成员*/
TransactionId clogGroupMemberXid; /* CLOG批量提交的事务ID */
CLogXidStatus clogGroupMemberXidStatus; /* CLOG批量提交的事务状态 */
int64 clogGroupMemberPage; /* CLOG批量提交对应的CLOG页面 */
XLogRecPtr clogGroupMemberLsn; /* CLOG批量提交成员的提交回放日志位置 */
#ifdef __aarch64__
/* 以下结构体是为了实现ARM架构下回放日志批量插入 */
bool xlogGroupMember;
pg_atomic_uint32 xlogGroupNext;
XLogRecData* xlogGrouprdata;
XLogRecPtr xlogGroupfpw_lsn;
XLogRecPtr* xlogGroupProcLastRecPtr;
XLogRecPtr* xlogGroupXactLastRecEnd;
void* xlogGroupCurrentTransactionState;
XLogRecPtr* xlogGroupRedoRecPtr;
void* xlogGroupLogwrtResult;
XLogRecPtr xlogGroupReturntRecPtr;
TimeLineID xlogGroupTimeLineID;
bool* xlogGroupDoPageWrites;
bool xlogGroupIsFPW;
uint64 snap_refcnt_bitmap;
#endif
LWLock* subxidsLock;
struct XidCache subxids; /* 子事物XID */
LWLock* backendLock; /* 每个线程的轻量级锁,用于保护以下数据结构的并发访问 */
/* Lock manager data, recording fast-path locks taken by this backend. */
uint64 fpLockBits; /* 快速路径锁的持有模式 */
FastPathTag fpRelId[FP_LOCK_SLOTS_PER_BACKEND]; /* 表对象的槽位 */
bool fpVXIDLock; /* 是否获得本地XID的快速路径锁 */
LocalTransactionId fpLocalTransactionId; /* 本地的XID */
};
图4 事务信息
如图4所示,proc_base_all_procs以及proc_base_all_xacts为全局的共享区域,每个线程启动的时候会在这个共享区域中注册一个槽位,并且将线程级指针变量t_thrd.proc以及t_thrd.pgxact指向该区域。当该线程有事务开始时,会将对应事务的xmin、xid等信息填写到pgxact结构体中。关键函数及接口如下。
(1) GetOldestXmin:返回当前多版本快照缓存的oldestXmin。(多版本快照机制见后续章节)
(2) ProcArrayAdd:线程启动时在共享区域中注册一个槽位。
(3) ProcArrayRemove:将当前线程从ProcArray数组中移除。
(4) TransactionIdIsInProgress:判断xid是否还在运行之中。
2. 多版本快照机制
因为openGauss使用一段共享内存来实现快照的获取以及各线程事务信息的管理,计算快照持有共享锁以及事务结束持有排他锁有严重的锁争抢问题。为了解决该冲突,openGauss引入了多版本快照机制解决锁冲突。每当事务结束时,持有排他锁、计算快照的一个版本,记录到一个环形缓冲区队列内存里;当别的线程获取快照时,并不持有共享锁去重新计算,而是通过原子操作到该环形队列顶端获取最新快照并将其引用计数加1;待拷贝完了快照信息后,将引用计数减1;当槽位引用计数为0时,表示可以被新的快照复用。
1) 多版本快照数据结构
多版本快照数据结构代码如下:
typedef struct _snapxid {
TransactionId xmin;
TransactionId xmax;
CommitSeqNo snapshotcsn;
TransactionId localxmin;
bool takenDuringRecovery;
ref_cnt_t ref_cnt[NREFCNT]; /* 该快照的引用计数,如果为0则可复用 */
} snapxid_t; /*多版本快照内容,在openGauss CSN方案下,仅需要记录xmin、xmax、snapshotcsn等关键信息即可。*/
static snapxid_t* g_snap_buffer = NULL; /* 缓冲区队列内存区指针 */
static snapxid_t* g_snap_buffer_copy = NULL; /* 缓冲区队列内存的浅拷贝 */
static size_t g_bufsz = 0;
static bool g_snap_assigned = false; /*多版本快照buffer队列是否已初始化 */
#define SNAP_SZ sizeof(snapxid_t) /* 每一个多版本快照的size大小 */
#define MaxNumSnapVersion 64 /* 多版本快照队列的大小,64个版本 */
static volatile snapxid_t* g_snap_current = NULL; /* 当前的快照指针 */
static volatile snapxid_t* g_snap_next = NULL; /* 下一个可用槽位的快照指针 */
在创建共享内存时,根据MaxNumSnapVersion函数的size生成“MaxNumSnapVersion * SNAP_SZ”大小的共享内存区。并将g_snap_current置为0号偏移,g_snap_next置为“1 * SNAP_SZ”偏移。
3) 多版本快照的计算
(1) 获取当前g_snap_next。
(2) 保证当前已持有Proc数组的排他锁,进行xmin、xmax、CSN等关键结构的计算,并存放到g_snap_next中。
(3) 寻找下一个refcount为0可复用的槽位,将g_snap_current赋值为g_snap_next,g_snap_next赋值为可复用的槽位偏移。
4) 多版本快照的获取
(1) 获取g_snap_current指针并将当前快照槽位的引用计数加1,防止并发更新快照时被复用。
(2) 将当前快中的信息拷贝到当前连接的静态快照内存中。
(3) 释放当前多版本快照,并将当前快照槽位的引用计数减1。
5) 关键函数
(1) CreateSharedRingBuffer:创建多版本快照共享内存信息。
(2) GetNextSnapXid:获取下一个多版本快照位置。函数代码如下:
static inline snapxid_t* GetNextSnapXid()
{
return g_snap_buffer ? (snapxid_t*)g_snap_next : NULL;
}
(3) SetNextSnapXid:获取下一个可用的槽位,并且将当前多版本快照最新更新。函数代码如下:
static void SetNextSnapXid()
{
if (g_snap_buffer != NULL) {
g_snap_current = g_snap_next; /* 将最新的多版本快照更新到最新。*/
pg_write_barrier(); /* 此处是防止buffer ring初始化时的ARM乱序问题。*/
g_snap_assigned = true;
snapxid_t* ret = (snapxid_t*)g_snap_current;
size_t idx = SNAPXID_INDEX(ret);
loop: /* 主循环,整体思路是不停遍历多版本槽位信息,一直找到一个refcout为0的可重用槽位。*/
do {
++idx;
/* 如果发生回卷,那么重头再找 */
if (idx == g_bufsz)
idx = 0;
ret = SNAPXID_AT(idx);
if (IsZeroRefCount(ret)) {
g_snap_next = ret;
return;
}
} while (ret != g_snap_next);
ereport(WARNING, (errmsg("snapshot ring buffer overflow.")));
/* 当前多版本快照个数为64个,理论上可能是会出现槽位被占满,如果没有空闲槽位,重新遍历即可。 */
goto loop;
}
}
(4) CalculateLocalLatestSnapshot:计算多版本快照信息。函数代码如下:
void CalculateLocalLatestSnapshot(bool forceCalc)
{
…/* 初始化变量 */
snapxid_t* snapxid = GetNextSnapXid(); /*设置下一个空闲多版本快照槽位信息 */
/* 初始化xmax为 latestCompletedXid + 1 */
xmax = t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid;
TransactionIdAdvance(xmax);
/*并不是每个事务提交都会重新计算xmin和oldestxmin,只有每1000个事务或者每隔1s才会计算,此时xmin及oldestxmin一般偏小,但是不影响可见性判断。 */
currentTimeStamp = GetCurrentTimestamp();
if (forceCalc || ((++snapshotPendingCnt == MAX_PENDING_SNAPSHOT_CNT) ||
(TimestampDifferenceExceeds(snapshotTimeStamp, currentTimeStamp, CALC_SNAPSHOT_TIMEOUT)))) {
snapshotPendingCnt = 0;
snapshotTimeStamp = currentTimeStamp;
/* 初始化xmin */
globalxmin = xmin = xmax;
int* pgprocnos = arrayP->pgprocnos;
int numProcs;
/*
循环遍历proc并计算快照相应值
*/
numProcs = arrayP->numProcs;
/*主要流程,遍历proc_base_all_xacts,将其中pgxact->xid的最小值记为xmin,其中pgxact->xmin的最小值记为oldestxmin。 */
for (index = 0; index < numProcs; index++) {
int pgprocno = pgprocnos[index];
volatile PGXACT* pgxact = &g_instance.proc_base_all_xacts[pgprocno];
TransactionId xid;
if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
continue;
/* 对于autovacuum的xmin,跳过,避免长VACUUM阻塞脏元组回收 */
if (pgxact->vacuumFlags & PROC_IN_VACUUM)
continue;
/* 用最小的xmin来更新globalxmin */
xid = pgxact->xmin;
if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, globalxmin))
globalxmin = xid;
xid = pgxact->xid;
if (!TransactionIdIsNormal(xid))
xid = pgxact->next_xid;
if (!TransactionIdIsNormal(xid) || !TransactionIdPrecedes(xid, xmax))
continue;
if (TransactionIdPrecedes(xid, xmin))
xmin = xid;
}
if (TransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
t_thrd.xact_cxt.ShmemVariableCache->xmin = xmin;
t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin = globalxmin;
}
/* 此处给多版本快照信息赋值,xmin、oldestxmin因为不是及时计算故可能偏小,xmax、CSN号都是当前的准确值,注意计算快照的时候必须持有排他锁。 */
snapxid->xmin = t_thrd.xact_cxt.ShmemVariableCache->xmin;
snapxid->xmax = xmax;
snapxid->localxmin = t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin;
snapxid->snapshotcsn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo;
snapxid->takenDuringRecovery = RecoveryInProgress();
SetNextSnapXid(); /*设置当前多版本快照 */
}
(5) GetLocalSnapshotData:获取最新的多版本快照供事务使用。函数代码如下:
Snapshot GetLocalSnapshotData(Snapshot snapshot)
{
/* 检查是否有多版本快照。在recover启动之前,是没有计算出多版本快照的,此时直接返回。 */
if (!g_snap_assigned || (g_snap_buffer == NULL)) {
ereport(DEBUG1, (errmsg("Falling back to origin GetSnapshotData: not assigned yet or during shutdown\n")));
return NULL;
}
pg_read_barrier(); /*为了防止ringBuffer初始化时的ARM乱序问题*/
snapxid_t* snapxid = GetCurrentSnapXid(); /* 将当前的多版本快照refcount++,避免被并发计算新快照的事务重用。 */
snapshot->user_data = snapxid;
… /* 此处将多版本快照snapxid中的信息赋值给快照,注意此处是深拷贝,因为多版本快照仅有几个变量的关键信息,直接赋值即可,之后就可以将相应的多版本快照refcount释放。 */
u_sess->utils_cxt.RecentXmin = snapxid->xmin;
snapshot->xmin = snapxid->xmin;
snapshot->xmax = snapxid->xmax;
snapshot->snapshotcsn = snapxid->snapshotcsn;
…
ReleaseSnapshotData(snapshot); /* 将多版本快照的refcount释放,以便可以被重用。 */
return snapshot;
}