暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

PostgreSQL扩展之wal2json

原创 贺晓群 2021-11-16
4623

介绍

wal2json是用于逻辑解码的输出插件。这意味着插件可以访问由INSERT和UPDATE生成的元组。此外,还可以根据配置的复制标识(REPLICA IDENTITY)访问更新/删除旧行版本。可以使用流协议(逻辑复制插槽)或专有的SQL API。
生成的格式版本有两种:
格式版本1(format-version=1),每个事务生成一个JSON对象。所有新/旧元组都可以在JSON对象中使用。此外,还可以通过参数选择包括事务时间戳、模式限定、数据类型和事务ID等属性;格式版本2(format-version=2),为每个元组生成一个JSON对象,用于事务开始和结束的JSON对象可选,此外,还有许多属性可供选择,具体参照参数说明部分。有两种方法可以从wal2json插件获取更改(JSON对象):通过SQL调用函数或使用pg_recvlogic命令行工具。

安装

git clone https://github.com/eulerto/wal2json.git && cd wal2json USE_PGXS=1 make USE_PGXS=1 make install
复制

配置

#修改postgresql.conf参数 wal_level=logical #根据实际情况修改以下数值 max_replication_slots=2 max_wal_senders=2 #重启数据库 systemctl restart postgresql-10
复制

参数说明

  • include-xids: add xid to each changeset. Default is false.
  • include-timestamp: add timestamp to each changeset. Default is false.
  • include-schemas: add schema to each change. Default is true.
  • include-types: add type to each change. Default is true.
  • include-typmod: add modifier to types that have it (eg. varchar(20) instead of varchar). Default is true.
  • include-type-oids: add type oids. Default is false.
  • include-domain-data-type: replace domain name with the underlying data type. Default is false.
  • include-column-positions: add column position (pg_attribute.attnum). Default is false.
  • include-origin: add origin of a piece of data. Default is false.
  • include-not-null: add not null information as columnoptionals. Default is false.
  • include-default: add default expression. Default is false.
  • include-pk: add primary key information as pk. Column name and data type is included. Default is false.
  • pretty-print: add spaces and indentation to JSON structures. Default is false.
  • write-in-chunks: write after every change instead of every changeset. Default is false.
  • include-lsn: add nextlsn to each changeset. Default is false.
  • include-transaction: emit records denoting the start and end of each transaction. Default is true.
  • include-unchanged-toast (deprecated): Don’t use it. It is deprecated.
  • filter-origins: exclude changes from the specified origins. Default is empty which means that no origin will be filtered. It is a comma separated value.
  • filter-tables: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. .foo means table foo in all schemas and bar. means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table “public”.“Foo bar” should be specified as public.Foo\ bar.
  • add-tables: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from filter-tables.
  • filter-msg-prefixes: exclude messages if prefix is in the list. Default is empty which means that no message will be filtered. It is a comma separated value.
  • add-msg-prefixes: include only messages if prefix is in the list. Default is all prefixes. It is a comma separated value. wal2json applies filter-msg-prefixes before this parameter.
  • format-version: defines which format to use. Default is 1.
  • actions: define which operations will be sent. Default is all actions (insert, update, delete, and truncate). However, if you are using format-version 1, truncate is not enabled (backward compatibility).

示例

有两种方法可以从wal2json插件获取更改(JSON对象):通过SQL调用函数或pg_recvlogic命令行工具。

  • pg_recvlogic命令行监听更改:
#先起一个会话(会话1)创建插槽,然后监听此槽位 test$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json test$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f - #另外起一个会话(会话2)测试 postgres=# CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE postgres=# CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); CREATE TABLE postgres=# BEGIN; BEGIN postgres=# INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT 0 1 postgres=# INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now()); INSERT 0 1 postgres=# INSERT INTO table1_with_pk (b, c) VALUES('Replication', now()); INSERT 0 1 postgres=# SELECT pg_logical_emit_message(true, 'wal2json', '此消息将被传递'); pg_logical_emit_message ------------------------- 81/C4EE6D00 (1 row) postgres=# DELETE FROM table1_with_pk WHERE a < 3; DELETE 2 postgres=# SELECT pg_logical_emit_message(false, 'wal2json', '即使回滚事务,也会传递此非事务性消息'); pg_logical_emit_message ------------------------- 81/C4EE6E88 (1 row) postgres=# postgres=# INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir'); INSERT 0 1 --这条修改不会添加到流中,因为表没有主键或复制标识 postgres=# UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir'; UPDATE 1 postgres=# COMMIT; COMMIT #再查看会话1的监听信息 test$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f - { "change": [ ] } { "change": [ ] } { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "即使回滚事务,也会传递此非事务性消息" } ] } WARNING: table "table1_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table1_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2021-11-16 20:04:11.786145"] } ,{ "kind": "insert", "schema": "public", "table": "table1_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2021-11-16 20:04:11.786145"] } ,{ "kind": "insert", "schema": "public", "table": "table1_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2021-11-16 20:04:11.786145"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "此消息将被传递" } ,{ "kind": "delete", "schema": "public", "table": "table1_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2021-11-16 20:04:11.786145"] } } ,{ "kind": "delete", "schema": "public", "table": "table1_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2021-11-16 20:04:11.786145"] } } ,{ "kind": "insert", "schema": "public", "table": "table1_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } #在会话2中对table1_without_pk表添加复制标识 postgres=# ALTER TABLE table1_without_pk REPLICA IDENTITY FULL; ALTER TABLE postgres=# UPDATE table1_without_pk SET c = 'Tapir' WHERE c = 'Anta'; UPDATE 1 #再查看会话1监听信息 test$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f - { "change": [ { "kind": "update", "schema": "public", "table": "table1_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"], "oldkeys": { "keynames": ["a", "b", "c"], "keytypes": ["integer", "numeric(5,2)", "text"], "keyvalues": [1, 2.34, "Anta"] } } ] } #清理测试数据 --会话1 Ctrl+C pg_recvlogical -d postgres --slot test_slot --drop-slot --会话2 postgres=# DROP TABLE table1_with_pk; DROP TABLE postgres=# DROP TABLE table1_without_pk; DROP TABLE
复制
  • 通过SQL调用函数消费更改(使用格式版本1)
postgres=# CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);CREATE TABLE postgres=# CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); CREATE TABLE postgres=# SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); ?column? ---------- init (1 row) postgres=# BEGIN; BEGIN postgres=# INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT 0 1 postgres=# INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now()); INSERT 0 1 postgres=# INSERT INTO table2_with_pk (b, c) VALUES('Replication', now()); INSERT 0 1 postgres=# SELECT pg_logical_emit_message(true, 'wal2json', '此消息将被传递'); pg_logical_emit_message ------------------------- 81/C4F33608 (1 row) postgres=# DELETE FROM table2_with_pk WHERE a < 3; DELETE 2 postgres=# SELECT pg_logical_emit_message(false, 'wal2json', '即使回滚事务,也会传递此非事务性消息'); pg_logical_emit_message ------------------------- 81/C4F33738 (1 row) postgres=# postgres=# INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir'); INSERT 0 1 --这条修改不会添加到流中,因为表没有主键或复制标识 postgres=# UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir'; UPDATE 1 postgres=# COMMIT; COMMIT --通过SQL调用函数消费更改(使用格式版本1) postgres=# SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json'); WARNING: table "table2_without_pk" without primary key or replica identity is nothing data ---------------------------------------------------------------------------------------------------------------------------- { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "即使回滚事务,也会传递此非事务性消息" } ] } { "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2021-11-16 20:32:34.509156"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2021-11-16 20:32:34.509156"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2021-11-16 20:32:34.509156"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "此消息将被传递" } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2021-11-16 20:32:34.509156"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2021-11-16 20:32:34.509156"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } (2 rows) --清理测试数据 postgres=# SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); ?column? ---------- stop (1 row) postgres=# postgres=# DROP TABLE table2_with_pk; DROP TABLE postgres=# DROP TABLE table2_without_pk;
复制
  • 通过SQL调用函数消费更改(使用格式版本2)
postgres=# CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE postgres=# CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); CREATE TABLE postgres=# postgres=# SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); ?column? ---------- init (1 row) postgres=# postgres=# BEGIN; BEGIN postgres=# INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT 0 1 postgres=# INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now()); INSERT 0 1 postgres=# INSERT INTO table3_with_pk (b, c) VALUES('Replication', now()); INSERT 0 1 postgres=# SELECT pg_logical_emit_message(true, 'wal2json', '此消息将被传递'); pg_logical_emit_message ------------------------- 81/C4F66138 (1 row) postgres=# DELETE FROM table3_with_pk WHERE a < 3; DELETE 2 postgres=# SELECT pg_logical_emit_message(false, 'wal2json', '即使回滚事务,也会传递此非事务性消息'); pg_logical_emit_message ------------------------- 81/C4F66268 (1 row) postgres=# postgres=# INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir'); INSERT 0 1 postgres=# --这条修改不会添加到流中,因为表没有主键或复制标识 postgres=# UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir'; UPDATE 1 postgres=# COMMIT; COMMIT postgres=# SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json'); WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk" data --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- {"action":"M","transactional":false,"prefix":"wal2json","content":"即使回滚事务,也会传递此非事务性消息"} {"action":"B"} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Resto re"},{"name":"c","type":"timestamp without time zone","value":"2021-11-16 20:39:12.126043"}]} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name" :"c","type":"timestamp without time zone","value":"2021-11-16 20:39:12.126043"}]} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{" name":"c","type":"timestamp without time zone","value":"2021-11-16 20:39:12.126043"}]} {"action":"M","transactional":true,"prefix":"wal2json","content":"此消息将被传递"} {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2021-11-1 6 20:39:12.126043"}]} {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2021-11-1 6 20:39:12.126043"}]} {"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type ":"text","value":"Tapir"}]} {"action":"C"} (10 rows) postgres=# SELECT * FROM table3_with_pk; a | b | c ---+-------------+---------------------------- 3 | Replication | 2021-11-16 20:39:12.126043 (1 row) --获取需要的操作数据 postgres=# BEGIN; BEGIN postgres=# INSERT INTO table3_with_pk (b, c) VALUES('joan', now()); INSERT 0 1 postgres=# UPDATE table3_with_pk SET b = 'hexq' WHERE b = 'joan'; UPDATE 1 postgres=# COMMIT; COMMIT postgres=# SELECT * FROM table3_with_pk; a | b | c ---+-------------+---------------------------- 3 | Replication | 2021-11-16 20:39:12.126043 4 | hexq | 2021-11-16 20:44:38.425181 (2 rows) --只获取insert动作的数据 postgres=# SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'format-version', '2', 'actions', 'insert'); data --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- {"action":"B"} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":4},{"name":"b","type":"character varying(30)","value":"joan"},{"name":" c","type":"timestamp without time zone","value":"2021-11-16 20:44:38.425181"}]} {"action":"C"} (3 rows) --只获取update动作的数据 postgres=# SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'format-version', '2', 'actions', 'update'); data --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- {"action":"B"} {"action":"U","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":4},{"name":"b","type":"character varying(30)","value":"hexq"},{"name":" c","type":"timestamp without time zone","value":"2021-11-16 20:44:38.425181"}],"identity":[{"name":"a","type":"integer","value":4},{"name":"c","type":"timestamp without time zon e","value":"2021-11-16 20:44:38.425181"}]} {"action":"C"} (3 rows)
复制

注意

  • 需监控槽的消费情况,如果中断消费,且槽位未删除会导致WAL无法自动清理
  • 表必须要有主键或复制标识

参考:
https://github.com/eulerto/wal2json
https://www.postgresql.org/docs/10/functions-admin.html#FUNCTIONS-REPLICATION

最后修改时间:2021-11-17 21:23:05
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论

目录
  • 介绍
  • 安装
  • 配置
  • 参数说明
  • 示例
  • 注意