暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

PostgreSQL逻辑复制原理详解

作者:李传成
李传成
中国PG分会认证专家
PostgreSQL资深内核研发工程师

目录

1 前言

2 流复制和逻辑复制

3 认清逻辑家族

     3.1 核心技术:logical decoding

     3.2 test_decoding plugin

     3.3 wal2json

     3.4 logical replication

     3.5 BDR、pglogical

4 logical2sql

     4.1 logical2sql 设计用途

     4.2 logical2sql 设计实现(1)

     4.3 logical2sql 设计实现(2)

前言

      逻辑复制是Postgres10出现的功能,一提到逻辑复制很多人可能会联想到很多其他的词汇,比如‘流复制’、‘逻辑解析’、‘逻辑订阅’、‘pglogical’、‘wal2json’、'BDR',这些鱼龙混杂的术语和功能看的头都大了,本文将围绕逻辑复制逐步讲解一下这些概念之间的关系。

      最终会提出一个logical2sql的新名词,它是设计实现Postgres到不同架构、不同数据库的数据同步的构想。

流复制和逻辑复制

       流复制(或者现在称它为物理复制)是一个更为古老或者传统数据同步方式,在pg10之前流复制承载了pg主备之间数据同步的功能,它的实现方式是将wal日志中记录的内容按照确切的块地址逐字节的拷贝到备库,因此主备之间数据分布是一致的,这意味着在主备机器上,同一条记录的ctid是相同的。

      本文的主要目的是讲述逻辑复制,这里提到流复制这是让读者有一个印象,流复制和逻辑复制是对wal日志的不同使用。


流复制原理图

        逻辑复制与流复制最直观的不同就是,逻辑复制支持表级复制。但是他们最大的区分点是他们的实现原理不一样。如下详细分析:

       逻辑复制同步数据的原理是,在wal日志产生的数据库上,由逻辑解析模块对wal日志进行初步的解析,它的解析结果为ReorderBufferChange(可以简单理解为HeapTupleData),再由pgoutput plugin对中间结果进行过滤和消息化拼接后,然后将其发送到订阅端,订阅端根据接收到的HeapTupleData重新对其执行insert、delete、update的操作。这里要注意,流复制是将数据从walrecord拷贝到数据页,逻辑复制是将数据重新执行一次insert、update或delete。


逻辑复制原理图


认清逻辑家族

     

一张图认清逻辑家族


        logical decoding是实现所有logical功能的核心技术,下面有对它的详细解释。跟这个核心技术一起加入PG内核的还有一个demo级别的功能testdecoding,但是testdecoding的输出结果看起来没有那么容易被二次开发使用,wal2json以一个contrib的形式出现,它的输出结果对二次开发更加友好。BDR和pglogical是实现了逻辑复制的插件,BDR特点是支持多主机、DDL复制、全局序列、全局DDL锁,pglogical特点是灵活性高、支持级联逻辑复制,在他们的基础上PG内核引入了logical replication,它实现了逻辑复制的功能,从使用方法来看logicalreplication与pglogical更为一致。


1.核心技术:logical decoding

logical decoding原理说明图


        walsender进程不断的从自己的复制槽获取新产生的wal record,并通过内核中的LogicalDecodingProcessRecord()函数进行wal record的初步过滤和解析,解析结果为一个ReorderBufferChange结构(对于DML语句而言这个结构里面主要的信息为oldtuple和newtuple),并将这个解析结果放置到当前事务ID对应的解析buf中去。当某个事务发生了提交,这个事务对应的buf里面的ReorderBufferChange会被指定的plugin做二次处理,并将处理结果按照指定的途径输出。


2.test_decoding plug

        test_decoding是Postgres现有的一个plugin,它的主要作用是将筛选过后的wal日志,转化为人们可以理解的形式。现在PG内部有两种方法可以使用plugin。如下是对这两种情况的简单说明。


(1)内置SQL函数使用:

    postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name | lsn
    -----------------+-----------
    regression_slot | 0/16569D8
    (1 row)




    postgres=# \d t1
    Table "public.t1"
    Column | Type | Collation | Nullable | Default
    --------+-------------------+-----------+----------+---------
    i | integer | | |
    j | integer | | |
    k | character varying | | |




    postgres=# insert into t1 values(1,1,'test_decoding use by SQL');
    INSERT 0 1
    postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
    lsn | xid | data
    -----------+-----+----------------------------------------------------------------------------------------------------
    0/16569D8 | 492 | BEGIN 492
    0/16569D8 | 492 | table public.t1: INSERT: i[integer]:1 j[integer]:1 k[character varying]:'test_decoding use by SQL'
    0/1656A60 | 492 | COMMIT 492
    (3 rows)


    postgres=#

    原理说明简图


    (2)pg_recvlogical使用

      movead@home:~/ld $ pg_recvlogical -d postgres --slot=test2 --create-slot
      movead@home:~/ld $ pg_recvlogical -d postgres --slot=test2 --start -f ld.out &
      [1] 19807
      movead@home:~/ld $ psql -c "insert into t1 values(2,2,'test_decoding use by pg_recvlogical');"
      INSERT 0 1
      movead@home:~/ld $ vi ld.out
      movead@home:~/ld $

      可以在ld.out文件中看到输出结果

        BEGIN 496
        table public.t1: INSERT: i[integer]:2 j[integer]:2 k[character varying]:'test_decoding use by pg_recvlogical'
        COMMIT 496


        原理说明简图


        3. wal2json

                wal2json plugin是test_decoding的一个升级版,它优化了输出结果的结构,使之更容易被应用。

          movead@home:~/ld $ pg_recvlogical -d postgres --slot test_slot_wal2json --create-slot -P wal2json
          movead@home:~/ld $ pg_recvlogical -d postgres --slot test_slot_wal2json --start -o pretty-print=1 -f ld_wal2json.out
          movead@home:~/ld $ vi ld_wal2json.out
          movead@home:~/ld $

                  查看ld_wal2json.out文件中的输出结果

            {
            "change": [
            {
            "kind": "insert",
            "schema": "public",
            "table": "t1",
            "columnnames": ["i", "j", "k"],
            "columntypes": ["integer", "integer", "character varying"],
            "columnvalues": [1, 1, "wal2json use by pg_recvlogical"]
            }
            ]
            }


            4.logical replication

                  pger可以使用testdecoding去认识逻辑复制,但是并不能真正的应用testdecoding来做什么,除非在testdecoding的基础上写代码去实现它的后续。logical replication真正的让用户可以在Postgres上体验另外一种不同的数据同步方式。换句话说,用户无法直接使用testdecoding功能用于生产环境,除非你使用第三方插件才能使用test_decoding完成数据同步的功能,而logicalreplication使得logical decoding技术更容易的用于生产环境。下面我们演示一次配置logical replicatio的过程,其中会说明执行每一个步骤后,数据库内部实际做了什么。


            (1)环境准备

              /*发布端IP43*/
              create user user_logical43 replication encrypted password '123';
              create table t1(i int, j int ,k varchar);
              alter table t1 add primary key(i);
              grant select on t1 to user_logical43;


              /*订阅端IP89*/
              create table t1(i int,j int,k varchar);

              (2)logical replication操作步骤

                 /*IP43 创建发布
                稍后我们会使用pub43,其他publication只为说明使用*/
                create publication pub43 for table t1;
                create publication pub43_1 for all tables;
                create publication pub43_2 for all tables with (publish='insert,delete');

                        publication的创建语句会修改系统表

                  postgres=# select * from pg_publication ;
                  oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
                  -------+---------+----------+--------------+-----------+-----------+-----------+-------------
                  16420 | pub43 | 10 | f | t | t | t | t
                  16422 | pub43_1 | 10 | t | t | t | t | t
                  16423 | pub43_2 | 10 | t | t | f | t | f
                  (3 rows)
                  postgres=# select * from pg_publication_rel ;
                  oid | prpubid | prrelid
                  -------+---------+---------
                  16421 | 16420 | 16392
                  (1 row)
                  postgres=# select oid,relname from pg_class where relname ='t1' or relname ='t2';
                  oid | relname
                  -------+---------
                  16402 | t2
                  16392 | t1
                  (2 rows)


                          以上可见,创建发布时会在pgpublication插入一条记录,如果这个发布不是对所有的表的发布,那么会在pgpublication_rel表中创建发布所对应的表。同时可以使用publish option来指定更加细节的发布选项(insert,delete,update,truncate)。

                          IP89创建订阅

                    create subscription sub89 connection 'host=192.168.102.43 port=5432 dbname=postgres user=user_logical43 password=123' publication pub43;

                            订阅的创建语句会修改系统表

                      /*对IP89的影响*/
                      postgres=# select * from pg_subscription;
                      oid | subdbid | subname | subowner | subenabled | subconninfo | subslotname | subsync
                      commit | subpublications
                      -------+---------+---------+----------+------------+--------------------------------------------------------------------------------+-------------+--------
                      -------+-----------------
                      16394 | 13591 | sub89 | 10 | t | host=192.168.102.43 port=5432 dbname=postgres user=user_logical43 password=123 | sub89 | off
                      | {pub43}
                      (1 row)




                      postgres=# select * from pg_subscription_rel;
                      srsubid | srrelid | srsubstate | srsublsn
                      ---------+---------+------------+-----------
                      16394 | 16384 | r | 0/16A2700
                      (1 row)




                      postgres=# select oid,relname from pg_class where relname ='t1';
                      oid | relname
                      -------+---------
                      16384 | t1
                      (1 row)




                      /*对IP43的影响*/
                      postgres=# select * from pg_replication_slots ;
                      slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | ca
                      talog_xmin | restart_lsn | confirmed_flush_lsn
                      -----------+----------+-----------+--------+----------+-----------+--------+------------+------+---
                      -----------+-------------+---------------------
                      sub89 | pgoutput | logical | 13594 | postgres | f | t | 8940 | |
                      524 | 0/16A26C8 | 0/16A2700
                      (1 row)


                              除此之外,还在IP43和IP89分别创建了walsend和logical replication work进程为逻辑复制服务。


                      (3)数据验证

                        /*在IP43上执行数据插入*/
                        lichuancheng@IP43:~ $ psql
                        psql (13devel)
                        Type "help" for help.




                        postgres=# insert into t1 values(1,1,'data from ip43');
                        INSERT 0 1
                        postgres=#
                        /*在IP89上执行查询*/
                        [lichuancheng@IP89 data]$ psql
                        psql (13devel)
                        Type "help" for help.




                        postgres=# select *from t1;
                        i | j | k
                        ---+---+----------------
                        1 | 1 | data from ip43
                        (1 row)




                        postgres=#


                                 现在已经完成了逻辑复制的功能简单的演示,相信大家对这个过程是不陌生的,本文主要讲解原理,因此不再做过多的功能演示。在前面流复制与逻辑复制的部分已经对流复制的原理进行了详细的描述,下面是这个简单的insert语句在逻辑复制原理图中的一个缩影。


                        5. BDR、pglogical

                              在github上只有BDR1开源(第一代BDR),很不幸它只支持Postgres9.4,较新的版本BDR3是闭源的。如果2ndquadrant的老板没有开源BDR3的计划,恐怕未来会有人自己开发一个了。

                              pglogical和logicalreplicate的原理是相同的,只不过pglogical有更加强大的冲突处理能力。

                        logical2sql

                              特别说明一下,logical2sql系列只是实现构想并不是已经实现了这样的功能。


                        1.  logical2sql 设计用途

                              使用test_decoding或者wal2json可以转化出需要同步的数据,只要实现一个工具就可以完成数据同步到其他数据库功能。如下是其实现描述图。

                               但是这样会带来一写一读的延迟,看图中的红框部分,pgrecvlogical程序向temp file中写入数据然后forward tool从tempfile中读取数据,这样就造成数据传输效率的损失。而且在读取时会产生转义字符的问题。如果我们将pgrecvlogical和ForwardTool合二为一,那么就能可以避免读写数据的传输效率损失,同时因为没有中间文件的产生所以也不用担心占义字符的问题,这就是logical2sql的理念。


                        2.  logical2sql 设计实现(1)

                                我们需要实现一个新的plugin,它可以在不同的配置下,将RecordBufferChange转化为不同数据库标准的SQL,然后重新实现或者改动pg_recvlogical的工具,这个工具接收到数据后不再写入临时文件,而是直接完成SQL转发的工作。下图是logical2sql构思图(1)


                        3.  logical2sql 设计实现(2)

                                在SQL从newout put plugin到newtool的过程也是有一次数据传输的,我们甚至可以摒弃newtool,直接实现在plugin中完成SQL的转发。在logical2sql构思图Ⅰ中我们可以看到,SQL在newoutput plugin中拼接完成,然后发送到newtool中,由‘new tool’完成SQL的转发。SQL从newoutput plugin到newtool的传输也一定会有一些效率损失,我们可以在newoutput plugin拼接完SQL后不再借助于任何tool,直接完成SQL的转发过程。下图是logical2sql构思图(2)


                        欢迎投稿



                                中国开源软件推进联盟PostgreSQL分会,欢迎大家积极投稿,向PGer分享自己的实践经验、心得体会,共建PG中国生态。

                        投稿邮箱:

                        press@postgresqlchina.com

                        最后修改时间:2019-11-06 10:44:46
                        文章转载自开源软件联盟PostgreSQL分会,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论