即将推出的 PostgreSQL 15 引入了由富士通 OSS 团队与 PostgreSQL 开源社区合作添加的一项新功能,该功能允许在使用发布/订阅的逻辑复制中支持两阶段提交。让我们来看看如何使用它。
此功能支持创建允许对两阶段事务的复制进行解码的发布/订阅。我们还修改了逻辑解码插件 pgoutput 以支持所有必需的两阶段回调 。
启用两阶段提交时,准备好的事务在 PREPARE TRANSACTION 时发送给订阅者,订阅者也将其作为两阶段事务处理。
一、背景
PostgreSQL 14 已经添加了框架和解码器端基础设施,以允许在 PREPARE TRANSACTION 时解码两阶段提交。PostgreSQL 14 还修改了 test_decoding 插件以使用这个框架。
但是,使用 PUBLICATION/SUBSCRIPTION 进行逻辑复制的客户端无法直接访问该功能。这意味着在 PostgreSQL 14 中,准备好的事务在解码 PREPARE TRANSACTION 时不会发送给订阅者,而是仅在解码相应的 COMMIT PREPARED 时发送给订阅者。
例如,PostgreSQL 14 的行为如下:
1发布方
postgres=# CREATE TABLE test (col1 INT, col2 TEXT, PRIMARY KEY(col1));
CREATE TABLE
postgres=# CREATE PUBLICATION pub FOR TABLE test;
2订阅方
postgres=# CREATE TABLE test (col1 INT, col2 TEXT, PRIMARY KEY(col1));
CREATE TABLE
postgres=# CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres host=localhost' PUBLICATION pub;
NOTICE: created replication slot "sub" on publisher
CREATE SUBSCRIPTION
3 发布方
postgres=# BEGIN;
BEGIN
postgres=*# INSERT INTO test VALUES (7,'aa');
INSERT 0 1
postgres=*# PREPARE TRANSACTION 't1';
PREPARE TRANSACTION
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+-----+-------------------------------+-------+----------
790 | t1 | 2022-03-14 06:59:49.341013-04 | ajin | postgres
(1 row)
4 订阅方
postgres=# SELECT * FORM pg_prepared_xacts;
transaction | gid | prepared | owner | database
-------------+-----+----------+-------+----------
(0 rows)
请注意,准备好的事务不会在订阅者上复制。
二、特征
1.概述
新的 SUBSCRIPTION 选项two_phase指定是否为此 SUBSCRIPTION 启用两阶段提交。默认值为false。
CREATE SUBSCRIPTION sub
CONNECTION 'conninfo'
PUBLICATION pub
WITH (two_phase = on);
启用两阶段提交时,准备好的事务在 PREPARE TRANSACTION 时发送给订阅者,订阅者也将其作为两阶段事务处理。否则,准备好的事务只有在提交时才会发送给订阅者,并立即处理。(我的测试结果和这句话有点不符:从我的测试结果来看,没用两阶段提交,也会把两阶段提交的状态文件传输到订阅节点,只不过在发布节点COMMIT PREPARE的时候,订阅节点的不能正常把变更写到库里)
2.克服PREPARE 并发
两阶段事务在 PREPARE TRANSACTION 重放,然后分别在 COMMIT PREPARED 和 ROLLBACK PREPARED 提交或回滚。
当 tablesync worker仍在忙于执行初始复制时,准备好的事务可能会到达应用worker。在这种情况下,apply worker 启动一个新事务,但随后会跳过所有后续更改(例如,insert),假设正在运行的 tablesync worker 正在处理它们。同时,tablesync worker 可能根本看不到准备好的事务(因为它在 tablesync worker 开始应用更改的一致点之前)。
现在,tablesync worker 退出,没有对准备好的事务做任何事情。稍后,当apply worker执行 COMMIT PREPARED 时,它会得到一个空的PREPARE错误(事务是空的,因为Apply worker之前跳过了插入)。
为了避免这种复杂性,两阶段提交的实现要求复制已成功完成初始表同步阶段。这意味着即使为订阅启用了two_phase,内部两阶段状态也会暂时挂起,直到所有表初始化完成。请参阅以下三态部分。
通过这些步骤,我们有:
1在启用two_phase的情况下创建订阅。
2最初,订阅处于tablesync阶段 - 为每个表启动 tablesync worker。
3每个 tablesync worker 为发布者上的每个表创建一个 tablesync 槽。
4两阶段状态设置为挂起(通过在 pg_subscription 目录中设置列subtwophasestate - 稍后会详细介绍)。
之后,进入应用worker阶段。
在Apply worker阶段,我们有:
1 tablesync worker将其 tablesync 插槽放在发布者上并死亡。
2Apply worker接管。
3Apply worker在发布者上创建订阅复制槽。
4两相状态设置为启用。
3.三态启用
在上图中,两阶段状态的改变是通过设置 pg_subscription 的新列subtwophasestate来完成的,它表示两阶段模式的状态。
即使用户指定他们想要使用two_phase = on的订阅,在内部它也会以pending的三态开始,并且仅在所有 tablesync 初始化完成后才启用- 也就是说,当所有 tablesync worker都达到其就绪状态时. 换句话说,pending只是订阅启动时的一个过渡状态。
在两阶段正确可用(启用三态)之前,订阅的行为就像two_phase = off一样。当apply worker 检测到所有tablesyncs 已经准备好(当三态处于pending时)它会重启apply worker 进程。
当(重新启动的)应用worker发现所有 tablesync worker都已准备好进行两阶段三态挂起时,它调用 wal_startstreaming 以正确 启用发布者以进行两阶段提交并将三态值挂起更新为已启用。
如果用户需要知道三态值,他们可以从 pg_subscription 目录中获取它。例如:
postgres=# SELECT subtwophasestate FROM pg_subscription;
subtwophasestate
------------------
e
4.ALTER SUBSCRIPTION 限制
ALTER SUBSCRIPTION 无法更改two_phase选项。
此限制是为了规避准备好的事务和相应的 COMMIT PREPARED 跨越two_phase选项的启用或禁用的情况。在这种情况下,解码器将无法决定事务是否需要完全解码,或者只发送 COMMIT PREPARED。
5.订阅者的全局 ID (GID)
在订阅者上复制的准备好的事务将与在发布者上指定的 GID 不同。如果有多个订阅者在发布者上应用特定的准备事务,并且所有订阅者都使用与发布者相同的 GID,那么当第二个事务尝试使用相同的 GID 进行准备时,这将失败。
为了避免这种冲突,订阅者上的应用worker根据订阅者 ID 和发布者上的事务 ID 替换生成的唯一 GID:pg_gid_
示例:pg_gid_24576_790
6.回调 API
对于此功能,实现了以下 pgoutput 函数,以便可以分配两阶段提交所需的回调。有关这些回调的详细信息,请参阅我之前的博客文章PostgreSQL 14 中两阶段提交的逻辑解码。
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
cb->prepare_cb = pgoutput_prepare_txn;
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
三、例子
1发布方
- 创建表和发布。
postgres=# CREATE TABLE test (col1 INT, col2 TEXT, PRIMARY KEY(col1));
CREATE TABLE
postgres=# CREATE PUBLICATION pub FOR TABLE test;
2订阅方
- 创建同一个表,并创建一个启用 two_phase 模式的订阅。
- 检查subtwophasestate列以验证它是否启用了two_phase(如果值为e,则开启了)。
postgres=# CREATE TABLE test (col1 INT, col2 TEXT, PRIMARY KEY(col1));
CREATE TABLE
postgres=# CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres host=localhost' PUBLICATION pub WITH (two_phase = on);
NOTICE: created replication slot "sub" on publisher
CREATE SUBSCRIPTION
postgres=# SELECT subtwophasestate FROM pg_subscription;
subtwophasestate
------------------
e
(1 row)
3发布方
- 开始交易。
- 插入一些数据。
- 准备事务并检查 GID。
postgres=# BEGIN;
BEGIN
postgres=*# INSERT INTO test VALUES (7,'aa');
INSERT 0 1
postgres=*# PREPARE TRANSACTION 't1';
PREPARE TRANSACTION
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+-----+-------------------------------+-------+----------
790 | t1 | 2022-03-14 06:59:49.341013-04 | ajin | postgres
(1 row)
4订阅方
- 检查订户端并查看生成的准备好的事务 GID 也在那里复制。
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+------------------+-------------------------------+-------+----------
877 | pg_gid_24576_790 | 2022-03-14 06:59:49.350815-04 | ajin | postgres
(1 row)
5发布方
- 提交准备好的事务。
- 观察准备好的事务 GID 现在已经消失(它已提交)。
- 选择插入的数据。
postgres=# COMMIT PREPARED 't1';
COMMIT PREPARED
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+-----+----------+-------+----------
(0 rows)
postgres=# SELECT * FROM test;
a | b
---+----
7 | aa
(1 row)
6订阅方
- 订阅方生成的 GID 也消失了(已提交)。
- 选择显示已复制发布的数据。
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+-----+----------+-------+----------
(0 rows)
postgres=# SELECT * from test;
a | b
---+----
7 | aa
(1 row)
四、未来展望
PostgreSQL 15 现在提供了支持两阶段提交的分布式数据库的底层框架。对于在分布式数据库中工作的两阶段事务,备用数据库需要通知主数据库有关失败的 PREPARE 并启动回滚。这种类型的备用反馈机制目前在 PostgreSQL 中不存在,并且是未来改进的候选者。
原文链接
参考链接
https://github.com/postgres/postgres/commit/a8fd13cab0ba815e9925dc9676e6309f699b5f72
https://github.com/postgres/postgres/commit/63cf61cdeb7b0450dcf3b2f719c553177bac85a2