概述
在之前的版本中,我们使用逻辑复制时,只可以在主节点使用逻辑解码和创建逻辑复制槽,使用逻辑复制时当数据库发生主备切换后,备库没有复制槽信息,导致故障期间的数据无法通过逻辑复制同步给订阅端。
pg17_beta1版本中提到了复制槽同步功能,首先看一下如何使用该功能,在文档中找到相关描述,https://www.postgresql.org/docs/17/logicaldecoding-explanation.html
The logical replication slots on the primary can be synchronized to the hot standby by using the failover parameter of pg_create_logical_replication_slot, or by using the failover option of CREATE SUBSCRIPTION during slot creation, and then calling pg_sync_replication_slots on the standby. By setting sync_replication_slots on the standby, the failover slots can be synchronized periodically in the slotsync worker. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby (i.e., primary_slot_name should be configured on the standby), and hot_standby_feedback must be enabled on the standby. It is also necessary to specify a valid dbname in the primary_conninfo. It's highly recommended that the said physical replication slot is named in standby_slot_names list on the primary, to prevent the subscriber from consuming changes faster than the hot standby. Even when correctly configured, some latency is expected when sending changes to logical subscribers due to the waiting on slots named in standby_slot_names. When standby_slot_names is utilized, the primary server will not completely shut down until the corresponding standbys, associated with the physical replication slots specified in standby_slot_names, have confirmed receiving the WAL up to the latest flushed position on the primary server.
复制
机器翻译,整理一下要点
如果要同步复制槽到备库,复制槽的failover属性需要指定,可以使用函数pg_create_logical_replication_slot在创建复制槽时指定failover参数,可以看到创建复制槽函数中多了一个failover参数,默认是false,指定为true就代表该复制槽需要同步到备库
postgres=# \df+ pg_create_logical_replication_slot
List of functions
-[ RECORD 1 ]-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------
Schema | pg_catalog
Name | pg_create_logical_replication_slot
Result data type | record
Argument data types | slot_name name, plugin name, temporary boolean DEFAULT false, twophase boolean DEFAULT false, failover boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn
Type | func
Volatility | volatile
Parallel | unsafe
Owner | postgres
Security | invoker
Access privileges |
Language | internal
Internal name | pg_create_logical_replication_slot
Description | set up a logical replication slot
复制
或者创建订阅并创建复制槽时指定faileover参数,官方文档中创建订阅部分也有该参数的说明。https://www.postgresql.org/docs/17/sql-createsubscription.html
failover (boolean) Specifies whether the replication slots associated with the subscription are enabled to be synced to the standbys so that logical replication can be resumed from the new primary after failover. The default is false.
复制
需要同步的复制槽创建后,在备库执行函数pg_sync_replication_slots即可手工同步;
还可以通过在备库设置参数sync_replication_slots=on,实现定期自动同步复制槽。
接下来是同步复制槽需要具备的前提条件
1)主备库之间有一个物理复制槽(备库设置参数primary_slot_name,可以pg_basebackup时指定复制槽)
2)备库开启hot_standby_feedback
3)备库参数primary_conninfo中还需要指定一个可用的dbname
另外还强烈建议在主库配置参数standby_slot_names=主备间的物理复制槽,确保复制槽同步的一致性。
验证过程
手动同步
首先搭建一个主备的环境,其中备库创建前先在主库创建一个物理复制槽,设置pg_hba,创建流复制用户,修改数据库参数hot_standby_feedback = on等
通过pg_basebackup创建备库
pg_basebackup -D data -h mydb1a -p 5417 -U rep_user -R -S s_5417
复制
主库创建需要同步的复制槽
select pg_create_logical_replication_slot('test_slot','test_decoding',false,false,true);
复制
备库参数primary_conninfo中增加dbname选项,备库使用函数pg_sync_replication_slots同步复制槽信息,执行同步函数后,复制槽被手工同步到备库一次,并不会随主库复制槽变化而更新
cat data/postgresql.auto.conf
# Do not edit this file manually!
# It will be overwritten by the ALTER SYSTEM command.
primary_conninfo = 'user=rep_user passfile=''/home/postgres/.pgpass'' host=mydb1a port=5417 dbname=postgres sslmode=prefer sslcompression=0 gssencmode=disable krbsrvname=postgres target_session_attrs=any'
primary_slot_name = 's_5417'
[postgres@mydb1b pg17_beta1]$ psql -p 5417
psql (17beta1)
Type "help" for help.
postgres=# select pg_sync_replication_slots();
pg_sync_replication_slots
---------------------------
(1 row)
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_st
atus | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
-----------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+-------
-----+---------------+-----------+-------------------------------+-------------+---------------------+----------+--------
test_slot | test_decoding | logical | 5 | postgres | f | f | | | 769 | 0/9021540 | 0/9021578 | reserv
ed | | f | 2024-05-28 15:25:57.046706+08 | f | | t | t
(1 row)
复制
自动同步
在备库设置参数sync_replication_slots = on,可自动定期同步主库复制槽变更
主库创建表test,并发布该表,使用pg_recvlogical模拟逻辑复制
会话1-主库
给发布表插入数据,观察复制槽变化
postgres=# select slot_name,catalog_xmin , restart_lsn , confirmed_flush_lsn from pg_replication_slots where slot_type='logical';
slot_name | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+--------------+-------------+---------------------
test_slot | 774 | 0/90228F0 | 0/90229D8
(1 row)
postgres=# insert into test select 1;
INSERT 0 1
postgres=# insert into test select 1;
INSERT 0 1
postgres=# select slot_name,catalog_xmin , restart_lsn , confirmed_flush_lsn from pg_replication_slots where slot_type='logical';
slot_name | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+--------------+-------------+---------------------
test_slot | 776 | 0/9022E20 | 0/9022E20
(1 row)
复制
会话2-备库
修改sync_replication_slots参数,发现复制槽变更已同步到备库
postgres=# select slot_name,catalog_xmin , restart_lsn , confirmed_flush_lsn from pg_replication_slots where slot_type='logical';
slot_name | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+--------------+-------------+---------------------
test_slot | 769 | 0/9021540 | 0/9021578
(1 row)
postgres=# alter system set sync_replication_slots TO on;
ALTER SYSTEM
postgres=# select pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
postgres=# select slot_name,catalog_xmin , restart_lsn , confirmed_flush_lsn from pg_replication_slots where slot_type='logical';
slot_name | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+--------------+-------------+---------------------
test_slot | 776 | 0/9022E20 | 0/9022E58
(1 row)
复制
会话3-pg_recvlogical
[postgres@mydb1a ~]$ pg_recvlogical -p 5417 -S test_slot -d postgres -U postgres -h mydb1a -P test_decoding --start -f -
BEGIN 773
table public.test: INSERT: id[integer]:1
COMMIT 773
BEGIN 774
table public.test: INSERT: id[integer]:1
COMMIT 774
BEGIN 775
table public.test: INSERT: id[integer]:1
COMMIT 775
复制
模拟主备切换
切换后逻辑复制可以在新主库接续
会话1-备库promote
[postgres@mydb1b pg17_beta1]$ pg_ctl -D data promote
waiting for server to promote.... done
server promoted
[postgres@mydb1b pg17_beta1]$ psql -p 5417
psql (17beta1)
Type "help" for help.
postgres=# select pg_is_in_recovery();
pg_is_in_recovery
-------------------
f
(1 row)
postgres=# insert into test select 1;
INSERT 0 1
postgres=#
复制
会话2-pg_recvlogical
注意此时host修改为mydb1b(新主库)
[postgres@mydb1a ~]$ pg_recvlogical -p 5417 -S test_slot -d postgres -U postgres -h mydb1b -P test_decoding --start -f -
BEGIN 776
table public.test: INSERT: id[integer]:1
COMMIT 776
复制
总结
- 复制槽同步需要满足几个必要条件
1)复制槽属性failover=true
2)主备库之间有一个物理复制槽
3)备库开启hot_standby_feedback
4)备库参数primary_conninfo中还需要指定一个可用的dbname
##5)主库配置参数standby_slot_names=主备间的物理复制槽(建议设置,非必要)
- 备库可以使用函数pg_sync_replication_slots手工同步,也可以配置参数sync_replication_slots 实现定期自动同步,且该参数修改无需重启数据库
- 通过模拟主备切换,备库promote后,订阅端重新连接新主库即可实现逻辑复制接续,实际高可用场景下使用还需详细验证