暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL逻辑解码实践(一)

前言

在日常运维过程中,我们经常需要对数据进行一些同步处理,从系统 A将一个表的数据同步到系统 B。以前我们主要使用 Oracle GoldenGate,后来 MySQL逐渐增多,使用上了分布式,阿里的 otter也开始部分使用。近来我们陆续大量上了 PostgreSQL数据库,对 PG使用较多的 CDC同步工具是 debezium
,它使用 PostgreSQL逻辑解码插件,今天我们将介绍一下它的一些基本知识。

逻辑解码的概念

PostgreSQL在9.4后提供了逻辑解码功能,其基本原理是从 WAL日志解码,跟踪所有 DML (INSERT, UPDATE, DELETE)更改。之后转换成各种可用的格式,比如 Json。最终发送给外部程序进行消费(例如 ElasticSearch)。下图显示了整个流程的前半部分(解码输出):

对于逻辑解码插件,主要有以下几类。按照格式来分主要有

  • JSON format plugins
  • Protobuf Format Plugins
  • Avro Format Plugins
  • SQL Format Plugins
  • Miscellaneous Plugins

具体可以参考Wiki上的Logical Decoding Plugins

尽管如此,我还是先浏览了每一个项目,发现许多项目都停止了更新。仍在更新的主要是下面的几个:

wal2json

Postgres Decoderbufs

pglogical

wal2mongo

wal2json逻辑解码测试

为了能使用逻辑解码,需要先安装wal2json插件。

[postgres@centos8 ~]$ git clone https://github.com/eulerto/wal2json.git

cd wal2json/
export PGHOME=/data/postgresql/pgsql/default
export PATH=$PGHOME/bin:$PATH
make 
make install

接下来要在postgresql.conf中配置三个相关参数。

wal_level=logical
max_replication_slots=10
max_wal_sender=10

wal_level
设置为logical允许WAL记录逻辑解码所需的信息。参数max_replication_slots
指定了发送端可以支持的最大复制槽数量,默认为10个,无需修改。max_wal_sender
指定发送方用于流复制的并发连接的最大数量,默认值也是10。

同时也需要创建一个插槽。pg_recvlogical是 PostgreSQL自带的工具,它管理插槽并使用插槽中的流数据。

[postgres@centos8 ~]$ pg_recvlogical --version
pg_recvlogical (PostgreSQL) 13.1


pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
2021-01-04 13:27:57.921 UTC [20543] LOG:  logical decoding found consistent point at 0/155A850
2021-01-04 13:27:57.921 UTC [20543] DETAIL:  There are no running transactions.

[postgres@centos8 ~]$ psql
psql (13.1)
Type "help" for help.

postgres=# select slot_name,slot_type,active,restart_lsn from pg_replication_slots;
 slot_name | slot_type | active | restart_lsn 
-----------+-----------+--------+-------------
 test_slot | logical   | f      | 0/155A850
(1 row)


[postgres@centos8 ~]$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
2021-01-04 13:30:56.709 UTC [20554] LOG:  starting logical decoding for slot "
test_slot"
2021-01-04 13:30:56.709 UTC [20554] DETAIL:  Streaming transactions committing after 0/155A888, reading WAL from 0/155A850.
2021-01-04 13:30:56.709 UTC [20554] LOG:  logical decoding found consistent point at 0/155A850
2021-01-04 13:30:56.709 UTC [20554] DETAIL:  There are no running transactions.

在执行pg_recvlogical创建槽之后,输出一个日志,该日志指出逻辑解码的 lsn位置,并告诉您这里没有长事务。一旦槽被创建,再次运行pg_recvlogical命令,使用参数-- start 代表从指定的逻辑复制槽开始进行流式传送更改,将一直持续到被一个信号终止。pretty-print表示添加空格和 JSON结构的缩进,默认值是 false。add-msg-prefixes
代表仅当前缀在列表中时才包括消息。默认为所有前缀。它是一个逗号分隔的值。我们开的这个终端将始终输出,并等待接收逻辑解码流。

此时,打开另一个窗口,登录 到PSQL中的制造一些事务。

1.执行建表语句

##窗口2
postgres=# CREATE TABLE test (id SERIAL,name VARCHAR(30), PRIMARY KEY(id));
CREATE TABLE

##窗口1
{
        "change": [
        ]
}

可以看到DDL语句不记录,只显示了change。

2.执行DML语句

##窗口2
INSERT INTO test (nameVALUES ('yibao');
UPDATE test SET name = 'mingyoushiren' WHERE id = 1;
DELETE FROM test WHERE id = 1;

##窗口1
{
        "change": [
                {
                        "
kind": "insert",
                        "
schema": "public",
                        "
table": "test",
                        "
columnnames": ["id", "name"],
                        "
columntypes": ["integer", "character varying(30)"],
                        "
columnvalues": [1, "yibao"]
                }
        ]
}
{
        "
change": [
                {
                        "
kind": "update",
                        "
schema": "public",
                        "
table": "test",
                        "
columnnames": ["id", "name"],
                        "
columntypes": ["integer", "character varying(30)"],
                        "
columnvalues": [1, "mingyoushiren"],
                        "
oldkeys": {
                                "
keynames": ["id"],
                                "
keytypes": ["integer"],
                                "
keyvalues": [1]
                        }
                }
        ]
}
{
        "
change": [
                {
                        "
kind": "delete",
                        "
schema": "public",
                        "
table": "test",
                        "
oldkeys": {
                                "
keynames": ["id"],
                                "
keytypes": ["integer"],
                                "
keyvalues": [1]
                        }
                }
        ]
}

Kind在这里表示语句操作的类型。对于insert和update,会插入新行到表中,这个新行的数据也会被捕捉并发送输出。而update和delete,默认不设置表的REPLICA IDENTITY
属性的话,oldkeys就只会显示历史记录的key id。

REPLICA IDENTITY
有四个选项。

  • default,记录pk列
  • using index index_name,记录索引列。该索引必须是唯一索引、不能是部分索引、不能是可延迟索引并且只包括被标记成 not null的列。
  • full  记录行中所有列的旧值。
  • nothing 什么也不记录(这是系统表的默认值)
## 窗口2
alter table test REPLICA IDENTITY full;
INSERT INTO test (nameVALUES ('yibao');
UPDATE test SET name = 'mingyoushiren' WHERE id = 2;
DELETE FROM test WHERE id = 2;

## 窗口1
{
        "change": [
        ]
}
{
        "
change": [
                {
                        "
kind": "update",
                        "
schema": "public",
                        "
table": "test",
                        "
columnnames": ["id", "name"],
                        "
columntypes": ["integer", "character varying(30)"],
                        "
columnvalues": [2, "mingyoushiren"],
                        "
oldkeys": {
                                "
keynames": ["id", "name"],
                                "
keytypes": ["integer", "character varying(30)"],
                                "
keyvalues": [2, "yibao"]
                        }
                }
        ]
}
{
        "
change": [
                {
                        "
kind": "delete",
                        "
schema": "public",
                        "
table": "test",
                        "
oldkeys": {
                                "
keynames": ["id", "name"],
                                "
keytypes": ["integer", "character varying(30)"],
                                "
keyvalues": [2, "mingyoushiren"]
                        }
                }
        ]
}

我们把test表的REPLICA IDENTITY
属性修改成full之后,再次执行上述dml操作,可以看到这次把oldkeys全部都输出了。

后记

今天我们研究了逻辑解码的插件wal2json,下次我们来研究怎么把json输出转换成MQ消息,然后在进行消费(Consumer)。


文章转载自励志成为PostgreSQL大神,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论