引言
PREPARE TRANSACTION为两阶段提交准备 当前事务。在这个命令之后,该事务不再与当前会话关联。相反,它的状态 被完全存储在磁盘上,并且有很高的可能性它会被提交成功(即便在请求提 交前发生数据库崩溃)。
一旦被准备好,事务稍后就可以分别用 COMMIT PREPARED 或者ROLLBACK PREPARED提交或者回滚。可以从任何 会话而不仅仅是执行原始事务的会话中发出这些命令。
从发出命令的会话的角度来看,PREPARE TRANSACTION不像ROLLBACK命令:在执行它之后,就没有活跃的当前事务,并且该预备事务的效果也不再可见( 如果该事务被提交,效果将重新变得可见)。
如果由于任何原因PREPARE TRANSACTION 命令失败,它会变成一个ROLLBACK:当前事务会被取消。
介绍
使用两阶段提交来支持分布式的事物处理。
预提交阶段
两阶段提交协议(2pc)第一个过程是预提交阶段。
对于分布式事务,事务中的某个DBMS节点充当"协调者"。协调者在本地开始一个分布式事务,并向其它DBMS节点发送"Prepare"消息。发送消息时,会使用专门的事务ID来标识此分布式事务。
2.其它DBMS接收到"Prepare"消息后,会开启一个本地事务以完成分布式事务的功能,它自行决定这个事物是提交还是终止,然后把结论发送给协调者。
3.如果数据库决定提交一个本地事务,当前状态就变成"预提交"阶段。在此阶段,如果协调者没有发送终止的消息,它不能终止这个本地事务。
4.如果数据库绝额定终止这个事物,它会向邪恶跳着发送取消的消息,然后由协调者进行全局性的取消动作。
全局提交阶段
2PC的第2个阶段是全局提交阶段
协调者没有收到消息,就默认收到了取消的消息。所有DBMS节点都返回给协调者准备完成消息,协调者就会提交这个分布式事务,然后把提交消息发送给所有DBMS节点。如果协调者收到一条取消的消息,则发送消息给所有DBMS节点全局取消分布式事务。
本地数据库根据协调者的消息,对本地事务进行提交或者终止操作。
实验
PREPARE TRANSACTION 为当前事务的两阶段提交做准备。在命令之后,事务就不再和当前会话关联了;它的状态完全保存在磁盘上,它提交成功有非常高的可能性,即使是在请求提交之前数据库发生了崩溃也如此。
一旦准备好了,一个事务就可以在稍后用 COMMIT PREPARED 或 ROLLBACK PREPARED 命令分别进行提交或者回滚。这些命令可以从任何会话中发出,而不光是最初执行事务的那个会话。
从发出命令的会话的角度来看,PREPARE TRANSACTION 不同于 ROLLBACK :在执行它之后,就不再有活跃的当前事务了,并且预备事务的效果无法见到(在事务提交的时候其效果会再次可见)。
如果 PREPARE TRANSACTION 因为某些原因失败,那么它就会变成一个 ROLLBACK ,当前事务被取消。
#max_prepared_transactions = 0 # zero disables the feature
# (change requires restart)
需要设置下max_prepared_transactions参数不为0
postgres=# begin;
BEGIN
postgres=# create table test(id int);
CREATE TABLE
postgres=# insert into test values(1);
INSERT 0 1
postgres=# prepare transaction 'test_add';
PREPARE TRANSACTION
/**
1.transaction:事务id
2.gid:用户为prepared transaction定义的名称
3.prepared:prepared日期,创建事务时带有时区的时间戳
4.owner:创建该prepared transaction的事务
5.database:数据库名
*/
postgres=# select * from pg_prepared_xacts;
transaction | gid | prepared | owner | database
-------------+----------+-------------------------------+----------+----------
502 | test_add | 2021-09-06 04:40:35.399164-04 | postgres | postgres
(1 row)
postgres=# commit prepared 'test_add';
COMMIT PREPARED
postgres=# select * from pg_prepared_xacts;
transaction | gid | prepared | owner | database
-------------+-----+----------+-------+----------
(0 rows)
含有一个或多个活跃的prepared transactions的postgresql停止了或者奔溃了,
会为每个活跃的prepared transaction创建一个文件,在目录$PGDATA/pg_twophase中。
源码解析
/*
* 执行PREPAREE TRANSACTION会调用PrepareTransaction函数
*
* NB: if you change this routine, better look at CommitTransaction too!
*/
static void
PrepareTransaction(void)
{
TransactionState s = CurrentTransactionState;
TransactionId xid = GetCurrentTransactionId();
GlobalTransaction gxact;
TimestampTz prepared_at;
Assert(!IsInParallelMode());
ShowTransactionState("PrepareTransaction");
/*
* check the current transaction state
*/
if (s->state != TRANS_INPROGRESS)
elog(WARNING, "PrepareTransaction while in %s state",
TransStateAsString(s->state));
Assert(s->parent == NULL);
/*
* Do pre-commit processing that involves calling user-defined code, such
* as triggers. Since closing cursors could queue trigger actions,
* triggers could open cursors, etc, we have to keep looping until there's
* nothing left to do.
*/
for (;;)
{
/*
* Fire all currently pending deferred triggers.
*/
AfterTriggerFireDeferred();
/*
* Close open portals (converting holdable ones into static portals).
* If there weren't any, we are done ... otherwise loop back to check
* if they queued deferred triggers. Lather, rinse, repeat.
*/
if (!PreCommit_Portals(true))
break;
}
CallXactCallbacks(XACT_EVENT_PRE_PREPARE);
/*
* The remaining actions cannot call any user-defined code, so it's safe
* to start shutting down within-transaction services. But note that most
* of this stuff could still throw an error, which would switch us into
* the transaction-abort path.
*/
/* Shut down the deferred-trigger manager */
AfterTriggerEndXact(true);
/*
* Let ON COMMIT management do its thing (must happen after closing
* cursors, to avoid dangling-reference problems)
*/
PreCommit_on_commit_actions();
/* close large objects before lower-level cleanup */
AtEOXact_LargeObject(true);
/* NOTIFY requires no work at this point */
/*
* Mark serializable transaction as complete for predicate locking
* purposes. This should be done as late as we can put it and still allow
* errors to be raised for failure patterns found at commit.
*/
PreCommit_CheckForSerializationFailure();
/*
* Don't allow PREPARE TRANSACTION if we've accessed a temporary table in
* this transaction. Having the prepared xact hold locks on another
* backend's temp table seems a bad idea --- for instance it would prevent
* the backend from exiting. There are other problems too, such as how to
* clean up the source backend's local buffers and ON COMMIT state if the
* prepared xact includes a DROP of a temp table.
*
* Other objects types, like functions, operators or extensions, share the
* same restriction as they should not be created, locked or dropped as
* this can mess up with this session or even a follow-up session trying
* to use the same temporary namespace.
*
* We must check this after executing any ON COMMIT actions, because they
* might still access a temp relation.
*
* XXX In principle this could be relaxed to allow some useful special
* cases, such as a temp table created and dropped all within the
* transaction. That seems to require much more bookkeeping though.
*/
if ((MyXactFlags & XACT_FLAGS_ACCESSEDTEMPNAMESPACE))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has operated on temporary objects")));
/*
* Likewise, don't allow PREPARE after pg_export_snapshot. This could be
* supported if we added cleanup logic to twophase.c, but for now it
* doesn't seem worth the trouble.
*/
if (XactHasExportedSnapshots())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has exported snapshots")));
/*
* Don't allow PREPARE but for transaction that has/might kill logical
* replication workers.
*/
if (XactManipulatesLogicalReplicationWorkers())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
/*
* set the current transaction state information appropriately during
* prepare processing
*/
s->state = TRANS_PREPARE;
prepared_at = GetCurrentTimestamp();
/* Tell bufmgr and smgr to prepare for commit */
BufmgrCommit();
/*
* Reserve the GID for this transaction. This could fail if the requested
* GID is invalid or already in use.
*/
gxact = MarkAsPreparing(xid, prepareGID, prepared_at,
GetUserId(), MyDatabaseId);
prepareGID = NULL;
/*
* Collect data for the 2PC state file. Note that in general, no actual
* state change should happen in the called modules during this step,
* since it's still possible to fail before commit, and in that case we
* want transaction abort to be able to clean up. (In particular, the
* AtPrepare routines may error out if they find cases they cannot
* handle.) State cleanup should happen in the PostPrepare routines
* below. However, some modules can go ahead and clear state here because
* they wouldn't do anything with it during abort anyway.
*
* Note: because the 2PC state file records will be replayed in the same
* order they are made, the order of these calls has to match the order in
* which we want things to happen during COMMIT PREPARED or ROLLBACK
* PREPARED; in particular, pay attention to whether things should happen
* before or after releasing the transaction's locks.
*/
StartPrepare(gxact);
AtPrepare_Notify();
AtPrepare_Locks();
AtPrepare_PredicateLocks();
AtPrepare_PgStat();
AtPrepare_MultiXact();
AtPrepare_RelationMap();
/*
* Here is where we really truly prepare.
*
* We have to record transaction prepares even if we didn't make any
* updates, because the transaction manager might get confused if we lose
* a global transaction.
*/
EndPrepare(gxact);
/*
* Now we clean up backend-internal state and release internal resources.
*/
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
/*
* Let others know about no transaction in progress by me. This has to be
* done *after* the prepared transaction has been marked valid, else
* someone may think it is unlocked and recyclable.
*/
ProcArrayClearTransaction(MyProc);
/*
* In normal commit-processing, this is all non-critical post-transaction
* cleanup. When the transaction is prepared, however, it's important
* that the locks and other per-backend resources are transferred to the
* prepared transaction's PGPROC entry. Note that if an error is raised
* here, it's too late to abort the transaction. XXX: This probably should
* be in a critical section, to force a PANIC if any of this fails, but
* that cure could be worse than the disease.
*/
CallXactCallbacks(XACT_EVENT_PREPARE);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
true, true);
/* Check we've released all buffer pins */
AtEOXact_Buffers(true);
/* Clean up the relation cache */
AtEOXact_RelationCache(true);
/* notify doesn't need a postprepare call */
PostPrepare_PgStat();
PostPrepare_Inval();
PostPrepare_smgr();
PostPrepare_MultiXact(xid);
PostPrepare_Locks(xid);
PostPrepare_PredicateLocks(xid);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
true, true);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_AFTER_LOCKS,
true, true);
/*
* Allow another backend to finish the transaction. After
* PostPrepare_Twophase(), the transaction is completely detached from our
* backend. The rest is just non-critical cleanup of backend-local state.
*/
PostPrepare_Twophase();
/* PREPARE acts the same as COMMIT as far as GUC is concerned */
AtEOXact_GUC(true, 1);
AtEOXact_SPI(true);
AtEOXact_Enum();
AtEOXact_on_commit_actions(true);
AtEOXact_Namespace(true, false);
AtEOXact_SMgr();
AtEOXact_Files(true);
AtEOXact_ComboCid();
AtEOXact_HashTables(true);
/* don't call AtEOXact_PgStat here; we fixed pgstat state above */
AtEOXact_Snapshot(true, true);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
ResourceOwnerDelete(TopTransactionResourceOwner);
s->curTransactionOwner = NULL;
CurTransactionResourceOwner = NULL;
TopTransactionResourceOwner = NULL;
AtCommit_Memory();
s->fullTransactionId = InvalidFullTransactionId;
s->subTransactionId = InvalidSubTransactionId;
s->nestingLevel = 0;
s->gucNestLevel = 0;
s->childXids = NULL;
s->nChildXids = 0;
s->maxChildXids = 0;
XactTopFullTransactionId = InvalidFullTransactionId;
nParallelCurrentXids = 0;
/*
* done with 1st phase commit processing, set current transaction state
* back to default
*/
s->state = TRANS_DEFAULT;
RESUME_INTERRUPTS();
}
取消一个2PC事物,执行命令ROLLBACK PREEEPAREED可以完成。这个命令回去调用
FinishPreparedTransaction函数,首先读取磁盘上先前预提交阶段所有记录的信息,
然后调用函数RecordTransactionAbortPrepareed进行事物终止的操作。
src/backend/access/transam/twophase.c
/*
* FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
*/
void
FinishPreparedTransaction(const char *gid, bool isCommit)
{
GlobalTransaction gxact;
PGPROC *proc;
PGXACT *pgxact;
TransactionId xid;
char *buf;
char *bufptr;
TwoPhaseFileHeader *hdr;
TransactionId latestXid;
TransactionId *children;
RelFileNode *commitrels;
RelFileNode *abortrels;
RelFileNode *delrels;
int ndelrels;
SharedInvalidationMessage *invalmsgs;
/*
* Validate the GID, and lock the GXACT to ensure that two backends do not
* try to commit the same GID at once.
*/
gxact = LockGXact(gid, GetUserId());
proc = &ProcGlobal->allProcs[gxact->pgprocno];
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
xid = pgxact->xid;
/*
* Read and validate 2PC state data. State data will typically be stored
* in WAL files if the LSN is after the last checkpoint record, or moved
* to disk if for some reason they have lived for a long time.
*/
if (gxact->ondisk)
buf = ReadTwoPhaseFile(xid, false);
else
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
/*
* Disassemble the header area
*/
hdr = (TwoPhaseFileHeader *) buf;
Assert(TransactionIdEquals(hdr->xid, xid));
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
bufptr += MAXALIGN(hdr->gidlen);
children = (TransactionId *) bufptr;
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
commitrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
abortrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
invalmsgs = (SharedInvalidationMessage *) bufptr;
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
/* compute latestXid among all children */
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
/*
* The order of operations here is critical: make the XLOG entry for
* commit or abort, then mark the transaction committed or aborted in
* pg_xact, then remove its PGPROC from the global ProcArray (which means
* TransactionIdIsInProgress will stop saying the prepared xact is in
* progress), then run the post-commit or post-abort callbacks. The
* callbacks will release the locks the transaction held.
*/
if (isCommit)
RecordTransactionCommitPrepared(xid,
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
hdr->initfileinval, gid);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
hdr->nabortrels, abortrels,
gid);
ProcArrayRemove(proc, latestXid);
/*
* In case we fail while running the callbacks, mark the gxact invalid so
* no one else will try to commit/rollback, and so it will be recycled if
* we fail after this point. It is still locked by our backend so it
* won't go away yet.
*
* (We assume it's safe to do this without taking TwoPhaseStateLock.)
*/
gxact->valid = false;
/*
* We have to remove any files that were supposed to be dropped. For
* consistency with the regular xact.c code paths, must do this before
* releasing locks, so do it before running the callbacks.
*
* NB: this code knows that we couldn't be dropping any temp rels ...
*/
if (isCommit)
{
delrels = commitrels;
ndelrels = hdr->ncommitrels;
}
else
{
delrels = abortrels;
ndelrels = hdr->nabortrels;
}
/* Make sure files supposed to be dropped are dropped */
DropRelationFiles(delrels, ndelrels, false);
/*
* Handle cache invalidation messages.
*
* Relcache init file invalidation requires processing both before and
* after we send the SI messages. See AtEOXact_Inval()
*/
if (hdr->initfileinval)
RelationCacheInitFilePreInvalidate();
SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
if (hdr->initfileinval)
RelationCacheInitFilePostInvalidate();
/*
* Acquire the two-phase lock. We want to work on the two-phase callbacks
* while holding it to avoid potential conflicts with other transactions
* attempting to use the same GID, so the lock is released once the shared
* memory state is cleared.
*/
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
/* And now do the callbacks */
if (isCommit)
ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
else
ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
PredicateLockTwoPhaseFinish(xid, isCommit);
/* Clear shared memory state */
RemoveGXact(gxact);
/*
* Release the lock as all callbacks are called and shared memory cleanup
* is done.
*/
LWLockRelease(TwoPhaseStateLock);
/* Count the prepared xact as committed or aborted */
AtEOXact_PgStat(isCommit, false);
/*
* And now we can clean up any files we may have left.
*/
if (gxact->ondisk)
RemoveTwoPhaseFile(xid, true);
MyLockedGxact = NULL;
RESUME_INTERRUPTS();
pfree(buf);
}
参考
http://postgres.cn/docs/12/sql-prepare-transaction.html
http://postgres.cn/docs/12/sql-commit-prepared.html
http://postgres.cn/docs/12/sql-rollback-prepared.html
《PostgreSQL数据库内核分析》