暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL特性矩阵解析系列25之Two Phase commit

957

引言

PREPARE TRANSACTION为两阶段提交准备 当前事务。在这个命令之后,该事务不再与当前会话关联。相反,它的状态 被完全存储在磁盘上,并且有很高的可能性它会被提交成功(即便在请求提 交前发生数据库崩溃)。

一旦被准备好,事务稍后就可以分别用 COMMIT PREPARED 或者ROLLBACK PREPARED提交或者回滚。可以从任何 会话而不仅仅是执行原始事务的会话中发出这些命令。

从发出命令的会话的角度来看,PREPARE TRANSACTION不像ROLLBACK命令:在执行它之后,就没有活跃的当前事务,并且该预备事务的效果也不再可见( 如果该事务被提交,效果将重新变得可见)。

如果由于任何原因PREPARE TRANSACTION 命令失败,它会变成一个ROLLBACK:当前事务会被取消。

介绍

使用两阶段提交来支持分布式的事物处理。

预提交阶段

两阶段提交协议(2pc)第一个过程是预提交阶段。

  1. 对于分布式事务,事务中的某个DBMS节点充当"协调者"。协调者在本地开始一个分布式事务,并向其它DBMS节点发送"Prepare"消息。发送消息时,会使用专门的事务ID来标识此分布式事务。

    2.其它DBMS接收到"Prepare"消息后,会开启一个本地事务以完成分布式事务的功能,它自行决定这个事物是提交还是终止,然后把结论发送给协调者。

    3.如果数据库决定提交一个本地事务,当前状态就变成"预提交"阶段。在此阶段,如果协调者没有发送终止的消息,它不能终止这个本地事务。

    4.如果数据库绝额定终止这个事物,它会向邪恶跳着发送取消的消息,然后由协调者进行全局性的取消动作。

全局提交阶段

2PC的第2个阶段是全局提交阶段

  1. 协调者没有收到消息,就默认收到了取消的消息。所有DBMS节点都返回给协调者准备完成消息,协调者就会提交这个分布式事务,然后把提交消息发送给所有DBMS节点。如果协调者收到一条取消的消息,则发送消息给所有DBMS节点全局取消分布式事务。

  2. 本地数据库根据协调者的消息,对本地事务进行提交或者终止操作。

实验

    PREPARE TRANSACTION 为当前事务的两阶段提交做准备。在命令之后,事务就不再和当前会话关联了;它的状态完全保存在磁盘上,它提交成功有非常高的可能性,即使是在请求提交之前数据库发生了崩溃也如此。
    一旦准备好了,一个事务就可以在稍后用 COMMIT PREPARED 或 ROLLBACK PREPARED 命令分别进行提交或者回滚。这些命令可以从任何会话中发出,而不光是最初执行事务的那个会话。
    从发出命令的会话的角度来看,PREPARE TRANSACTION 不同于 ROLLBACK :在执行它之后,就不再有活跃的当前事务了,并且预备事务的效果无法见到(在事务提交的时候其效果会再次可见)。
    如果 PREPARE TRANSACTION 因为某些原因失败,那么它就会变成一个 ROLLBACK ,当前事务被取消。

    #max_prepared_transactions = 0 # zero disables the feature
    # (change requires restart)
    需要设置下max_prepared_transactions参数不为0

    postgres=# begin;
    BEGIN
    postgres=# create table test(id int);
    CREATE TABLE
    postgres=# insert into test values(1);
    INSERT 0 1
    postgres=# prepare transaction 'test_add';
    PREPARE TRANSACTION

    /**
    1.transaction:事务id
    2.gid:用户为prepared transaction定义的名称
    3.prepared:prepared日期,创建事务时带有时区的时间戳
    4.owner:创建该prepared transaction的事务
    5.database:数据库名
    */
    postgres=# select * from pg_prepared_xacts;
    transaction | gid | prepared | owner | database
    -------------+----------+-------------------------------+----------+----------
    502 | test_add | 2021-09-06 04:40:35.399164-04 | postgres | postgres
    (1 row)

    postgres=# commit prepared 'test_add';
    COMMIT PREPARED
    postgres=# select * from pg_prepared_xacts;
    transaction | gid | prepared | owner | database
    -------------+-----+----------+-------+----------
    (0 rows)

    含有一个或多个活跃的prepared transactions的postgresql停止了或者奔溃了,
    会为每个活跃的prepared transaction创建一个文件,在目录$PGDATA/pg_twophase中。

    源码解析


      /*
       *  执行PREPAREE TRANSACTION会调用PrepareTransaction函数
      *
      * NB: if you change this routine, better look at CommitTransaction too!
      */
      static void
      PrepareTransaction(void)
      {
      TransactionState s = CurrentTransactionState;
      TransactionId xid = GetCurrentTransactionId();
      GlobalTransaction gxact;
      TimestampTz prepared_at;

      Assert(!IsInParallelMode());

      ShowTransactionState("PrepareTransaction");

      /*
      * check the current transaction state
      */
      if (s->state != TRANS_INPROGRESS)
      elog(WARNING, "PrepareTransaction while in %s state",
      TransStateAsString(s->state));
      Assert(s->parent == NULL);

      /*
      * Do pre-commit processing that involves calling user-defined code, such
      * as triggers. Since closing cursors could queue trigger actions,
      * triggers could open cursors, etc, we have to keep looping until there's
      * nothing left to do.
      */
      for (;;)
      {
      /*
      * Fire all currently pending deferred triggers.
      */
      AfterTriggerFireDeferred();

      /*
      * Close open portals (converting holdable ones into static portals).
      * If there weren't any, we are done ... otherwise loop back to check
      * if they queued deferred triggers. Lather, rinse, repeat.
      */
      if (!PreCommit_Portals(true))
      break;
      }

      CallXactCallbacks(XACT_EVENT_PRE_PREPARE);

      /*
      * The remaining actions cannot call any user-defined code, so it's safe
      * to start shutting down within-transaction services. But note that most
      * of this stuff could still throw an error, which would switch us into
      * the transaction-abort path.
      */

      /* Shut down the deferred-trigger manager */
      AfterTriggerEndXact(true);

      /*
      * Let ON COMMIT management do its thing (must happen after closing
      * cursors, to avoid dangling-reference problems)
      */
      PreCommit_on_commit_actions();

      /* close large objects before lower-level cleanup */
      AtEOXact_LargeObject(true);

      /* NOTIFY requires no work at this point */

      /*
      * Mark serializable transaction as complete for predicate locking
      * purposes. This should be done as late as we can put it and still allow
      * errors to be raised for failure patterns found at commit.
      */
      PreCommit_CheckForSerializationFailure();

      /*
      * Don't allow PREPARE TRANSACTION if we've accessed a temporary table in
      * this transaction. Having the prepared xact hold locks on another
      * backend's temp table seems a bad idea --- for instance it would prevent
      * the backend from exiting. There are other problems too, such as how to
      * clean up the source backend's local buffers and ON COMMIT state if the
      * prepared xact includes a DROP of a temp table.
      *
      * Other objects types, like functions, operators or extensions, share the
      * same restriction as they should not be created, locked or dropped as
      * this can mess up with this session or even a follow-up session trying
      * to use the same temporary namespace.
      *
      * We must check this after executing any ON COMMIT actions, because they
      * might still access a temp relation.
      *
      * XXX In principle this could be relaxed to allow some useful special
      * cases, such as a temp table created and dropped all within the
      * transaction. That seems to require much more bookkeeping though.
      */
      if ((MyXactFlags & XACT_FLAGS_ACCESSEDTEMPNAMESPACE))
      ereport(ERROR,
      (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      errmsg("cannot PREPARE a transaction that has operated on temporary objects")));

      /*
      * Likewise, don't allow PREPARE after pg_export_snapshot. This could be
      * supported if we added cleanup logic to twophase.c, but for now it
      * doesn't seem worth the trouble.
      */
      if (XactHasExportedSnapshots())
      ereport(ERROR,
      (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      errmsg("cannot PREPARE a transaction that has exported snapshots")));

      /*
      * Don't allow PREPARE but for transaction that has/might kill logical
      * replication workers.
      */
      if (XactManipulatesLogicalReplicationWorkers())
      ereport(ERROR,
      (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));

      /* Prevent cancel/die interrupt while cleaning up */
      HOLD_INTERRUPTS();

      /*
      * set the current transaction state information appropriately during
      * prepare processing
      */
      s->state = TRANS_PREPARE;

      prepared_at = GetCurrentTimestamp();

      /* Tell bufmgr and smgr to prepare for commit */
      BufmgrCommit();

      /*
      * Reserve the GID for this transaction. This could fail if the requested
      * GID is invalid or already in use.
      */
      gxact = MarkAsPreparing(xid, prepareGID, prepared_at,
      GetUserId(), MyDatabaseId);
      prepareGID = NULL;

      /*
      * Collect data for the 2PC state file. Note that in general, no actual
      * state change should happen in the called modules during this step,
      * since it's still possible to fail before commit, and in that case we
      * want transaction abort to be able to clean up. (In particular, the
      * AtPrepare routines may error out if they find cases they cannot
      * handle.) State cleanup should happen in the PostPrepare routines
      * below. However, some modules can go ahead and clear state here because
      * they wouldn't do anything with it during abort anyway.
      *
      * Note: because the 2PC state file records will be replayed in the same
      * order they are made, the order of these calls has to match the order in
      * which we want things to happen during COMMIT PREPARED or ROLLBACK
      * PREPARED; in particular, pay attention to whether things should happen
      * before or after releasing the transaction's locks.
      */
      StartPrepare(gxact);

      AtPrepare_Notify();
      AtPrepare_Locks();
      AtPrepare_PredicateLocks();
      AtPrepare_PgStat();
      AtPrepare_MultiXact();
      AtPrepare_RelationMap();

      /*
      * Here is where we really truly prepare.
      *
      * We have to record transaction prepares even if we didn't make any
      * updates, because the transaction manager might get confused if we lose
      * a global transaction.
      */
      EndPrepare(gxact);

      /*
      * Now we clean up backend-internal state and release internal resources.
      */

      /* Reset XactLastRecEnd until the next transaction writes something */
      XactLastRecEnd = 0;

      /*
      * Let others know about no transaction in progress by me. This has to be
      * done *after* the prepared transaction has been marked valid, else
      * someone may think it is unlocked and recyclable.
      */
      ProcArrayClearTransaction(MyProc);

      /*
      * In normal commit-processing, this is all non-critical post-transaction
      * cleanup. When the transaction is prepared, however, it's important
      * that the locks and other per-backend resources are transferred to the
      * prepared transaction's PGPROC entry. Note that if an error is raised
      * here, it's too late to abort the transaction. XXX: This probably should
      * be in a critical section, to force a PANIC if any of this fails, but
      * that cure could be worse than the disease.
      */

      CallXactCallbacks(XACT_EVENT_PREPARE);

      ResourceOwnerRelease(TopTransactionResourceOwner,
      RESOURCE_RELEASE_BEFORE_LOCKS,
      true, true);

      /* Check we've released all buffer pins */
      AtEOXact_Buffers(true);

      /* Clean up the relation cache */
      AtEOXact_RelationCache(true);

      /* notify doesn't need a postprepare call */

      PostPrepare_PgStat();

      PostPrepare_Inval();

      PostPrepare_smgr();

      PostPrepare_MultiXact(xid);

      PostPrepare_Locks(xid);
      PostPrepare_PredicateLocks(xid);

      ResourceOwnerRelease(TopTransactionResourceOwner,
      RESOURCE_RELEASE_LOCKS,
      true, true);
      ResourceOwnerRelease(TopTransactionResourceOwner,
      RESOURCE_RELEASE_AFTER_LOCKS,
      true, true);

      /*
      * Allow another backend to finish the transaction. After
      * PostPrepare_Twophase(), the transaction is completely detached from our
      * backend. The rest is just non-critical cleanup of backend-local state.
      */
      PostPrepare_Twophase();

      /* PREPARE acts the same as COMMIT as far as GUC is concerned */
      AtEOXact_GUC(true, 1);
      AtEOXact_SPI(true);
      AtEOXact_Enum();
      AtEOXact_on_commit_actions(true);
      AtEOXact_Namespace(true, false);
      AtEOXact_SMgr();
      AtEOXact_Files(true);
      AtEOXact_ComboCid();
      AtEOXact_HashTables(true);
      /* don't call AtEOXact_PgStat here; we fixed pgstat state above */
      AtEOXact_Snapshot(true, true);
      pgstat_report_xact_timestamp(0);

      CurrentResourceOwner = NULL;
      ResourceOwnerDelete(TopTransactionResourceOwner);
      s->curTransactionOwner = NULL;
      CurTransactionResourceOwner = NULL;
      TopTransactionResourceOwner = NULL;

      AtCommit_Memory();

      s->fullTransactionId = InvalidFullTransactionId;
      s->subTransactionId = InvalidSubTransactionId;
      s->nestingLevel = 0;
      s->gucNestLevel = 0;
      s->childXids = NULL;
      s->nChildXids = 0;
      s->maxChildXids = 0;

      XactTopFullTransactionId = InvalidFullTransactionId;
      nParallelCurrentXids = 0;

      /*
      * done with 1st phase commit processing, set current transaction state
      * back to default
      */
      s->state = TRANS_DEFAULT;

      RESUME_INTERRUPTS();
      }

        取消一个2PC事物,执行命令ROLLBACK PREEEPAREED可以完成。这个命令回去调用
        FinishPreparedTransaction函数,首先读取磁盘上先前预提交阶段所有记录的信息,
        然后调用函数RecordTransactionAbortPrepareed进行事物终止的操作。

        src/backend/access/transam/twophase.c
        /*
        * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
        */
        void
        FinishPreparedTransaction(const char *gid, bool isCommit)
        {
        GlobalTransaction gxact;
        PGPROC *proc;
        PGXACT *pgxact;
        TransactionId xid;
        char *buf;
        char *bufptr;
        TwoPhaseFileHeader *hdr;
        TransactionId latestXid;
        TransactionId *children;
        RelFileNode *commitrels;
        RelFileNode *abortrels;
        RelFileNode *delrels;
        int ndelrels;
        SharedInvalidationMessage *invalmsgs;

        /*
        * Validate the GID, and lock the GXACT to ensure that two backends do not
        * try to commit the same GID at once.
        */
        gxact = LockGXact(gid, GetUserId());
        proc = &ProcGlobal->allProcs[gxact->pgprocno];
        pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
        xid = pgxact->xid;

        /*
        * Read and validate 2PC state data. State data will typically be stored
        * in WAL files if the LSN is after the last checkpoint record, or moved
        * to disk if for some reason they have lived for a long time.
        */
        if (gxact->ondisk)
        buf = ReadTwoPhaseFile(xid, false);
        else
        XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);


        /*
        * Disassemble the header area
        */
        hdr = (TwoPhaseFileHeader *) buf;
        Assert(TransactionIdEquals(hdr->xid, xid));
        bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
        bufptr += MAXALIGN(hdr->gidlen);
        children = (TransactionId *) bufptr;
        bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
        commitrels = (RelFileNode *) bufptr;
        bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
        abortrels = (RelFileNode *) bufptr;
        bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
        invalmsgs = (SharedInvalidationMessage *) bufptr;
        bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));

        /* compute latestXid among all children */
        latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);

        /* Prevent cancel/die interrupt while cleaning up */
        HOLD_INTERRUPTS();

        /*
        * The order of operations here is critical: make the XLOG entry for
        * commit or abort, then mark the transaction committed or aborted in
        * pg_xact, then remove its PGPROC from the global ProcArray (which means
        * TransactionIdIsInProgress will stop saying the prepared xact is in
        * progress), then run the post-commit or post-abort callbacks. The
        * callbacks will release the locks the transaction held.
        */
        if (isCommit)
        RecordTransactionCommitPrepared(xid,
        hdr->nsubxacts, children,
        hdr->ncommitrels, commitrels,
        hdr->ninvalmsgs, invalmsgs,
        hdr->initfileinval, gid);
        else
        RecordTransactionAbortPrepared(xid,
        hdr->nsubxacts, children,
        hdr->nabortrels, abortrels,
        gid);

        ProcArrayRemove(proc, latestXid);

        /*
        * In case we fail while running the callbacks, mark the gxact invalid so
        * no one else will try to commit/rollback, and so it will be recycled if
        * we fail after this point. It is still locked by our backend so it
        * won't go away yet.
        *
        * (We assume it's safe to do this without taking TwoPhaseStateLock.)
        */
        gxact->valid = false;

        /*
        * We have to remove any files that were supposed to be dropped. For
        * consistency with the regular xact.c code paths, must do this before
        * releasing locks, so do it before running the callbacks.
        *
        * NB: this code knows that we couldn't be dropping any temp rels ...
        */
        if (isCommit)
        {
        delrels = commitrels;
        ndelrels = hdr->ncommitrels;
        }
        else
        {
        delrels = abortrels;
        ndelrels = hdr->nabortrels;
        }

        /* Make sure files supposed to be dropped are dropped */
        DropRelationFiles(delrels, ndelrels, false);

        /*
        * Handle cache invalidation messages.
        *
        * Relcache init file invalidation requires processing both before and
        * after we send the SI messages. See AtEOXact_Inval()
        */
        if (hdr->initfileinval)
        RelationCacheInitFilePreInvalidate();
        SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
        if (hdr->initfileinval)
        RelationCacheInitFilePostInvalidate();

        /*
        * Acquire the two-phase lock. We want to work on the two-phase callbacks
        * while holding it to avoid potential conflicts with other transactions
        * attempting to use the same GID, so the lock is released once the shared
        * memory state is cleared.
        */
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);

        /* And now do the callbacks */
        if (isCommit)
        ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
        else
        ProcessRecords(bufptr, xid, twophase_postabort_callbacks);

        PredicateLockTwoPhaseFinish(xid, isCommit);

        /* Clear shared memory state */
        RemoveGXact(gxact);

        /*
        * Release the lock as all callbacks are called and shared memory cleanup
        * is done.
        */
        LWLockRelease(TwoPhaseStateLock);

        /* Count the prepared xact as committed or aborted */
        AtEOXact_PgStat(isCommit, false);

        /*
        * And now we can clean up any files we may have left.
        */
        if (gxact->ondisk)
        RemoveTwoPhaseFile(xid, true);

        MyLockedGxact = NULL;

        RESUME_INTERRUPTS();

        pfree(buf);
        }

        参考

        http://postgres.cn/docs/12/sql-prepare-transaction.html

        http://postgres.cn/docs/12/sql-commit-prepared.html

        http://postgres.cn/docs/12/sql-rollback-prepared.html

        《PostgreSQL数据库内核分析》

        文章转载自CP的PostgreSQL厨房,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论