暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何设置和运行 PostgreSQL 变更数据捕获

原创 Ellison 2022-08-05
4483

PostgreSQL 提供了一种逻辑解码方法,使基于日志的变更数据捕获成为可能。了解如何通过几个步骤设置和运行 Postgres CDC。


现代 Web 应用程序的架构由几个软件组件组成,例如仪表板、分析、数据库、数据湖、缓存、搜索等。

数据库通常是任何应用程序的核心部分。实时数据更新使不同的数据系统保持连续同步并快速响应新信息。那么如何让你的应用生态系统保持同步呢?这些其他组件如何获取有关数据库更改的信息?更改数据捕获CDC是指识别新数据或更改数据的任何解决方案。

这篇文章是关于 PostgreSQL CDC以及实现这一点的方法。
变更数据捕获 (CDC)是一种数据集成方法,用于检测、捕获和交付对数据库数据源所做的更改。

一般来说,基于 CDC 的数据集成包括以下步骤:

  1. 捕获源数据库中的更改数据。
  2. 将更改的数据转换为您的消费者可以接受的格式。
  3. 将数据发布到消费者或目标数据库。

PostgreSQL 提供了两种使 CDC 成为可能的内置方法:

  • 来自 事务日志、 PostgreSQL WAL,又名预写日志。
  • 数据库触发器。

让我们简要讨论一下使用事务日志 (WAL) 和触发器来捕获数据更改的优缺点。

触发器

基于触发器的方法涉及在数据库上创建审计触发器以捕获与 INSERT、UPDATE 和 DELETE 方法相关的所有事件。

触发器可以附加到表(分区或不分区)或视图。

触发器也可以触发 TRUNCATE 语句。如果发生触发事件,则在适当的时间调用触发器的函数来处理该事件。

  • 这种方法最重要的优点是所有这些都可以在 SQL 级别完成,这与事务日志不同。
  • 但是,触发器的使用对源数据库的性能有很大的影响,因为当对数据进行更改时,这些触发器需要在应用程序数据库上运行。

事务日志

另一方面,对于现代 DBMS,事务日志(PostgreSQL 的 WAL) 通常用于事务日志记录和 复制。

在 PostgreSQL 中,所有事务(如 INSERT、UPDATE 和 DELETE)都会在客户端收到事务结果之前写入 WAL。

  • 这种方法的优点是它不会以任何方式影响数据库的性能。
  • 它也不需要修改数据库表或应用程序。无需在源数据库中创建额外的表。
  • 基于日志的 CDC 通常被认为是捕获变更数据的最佳方法,适用于所有可能的场景,包括具有极高事务量的系统。
请注意,目前大多数 DDL 语句,如 CREATE、DROP、ALTER 都没有被跟踪。但是,TRUNCATE 命令位于逻辑复制流中。

如果您希望 Postgres 数据更改发生时逐行流式传输,则需要逻辑解码或 Postgres 逻辑复制功能。

使用 Postgres 逻辑解码

逻辑解码是 PostgreSQL 的基于日志的 CDC(逻辑复制)的正式名称。

逻辑解码使用 PostgreSQL 预写日志的内容来存储数据库中发生的所有活动。预写日志是一个内部日志,它在存储级别上描述数据库更改。

1.使用逻辑解码的第一步是在Postgres配置中设置如下参数 `postgresql.conf`

wal_level = logical
max_replication_slots = 5
max_wal_senders = 10
  • 设置wal_levellogical允许 WAL 记录逻辑解码所需的信息。
  • 确保您的max_replication_slots值等于或高于使用 WAL 的 PostgreSQL 连接器的数量加上您的数据库使用的其他复制槽的数量。
  • 确保max_wal_senders指定 WAL 的最大并发连接数的参数至少是逻辑复制槽数的两倍。例如,如果您的数据库总共使用 5 个复制槽,则该max_wal_senders值必须为 10 或更大。

重新启动 Postgres 服务器以应用更改。

2.第二步,使用输出插件设置逻辑复制test_decoding

通过运行以下命令为要同步的数据库创建逻辑复制槽。

SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');


注意: 每个复制槽 都有一个名称,可以包含小写字母、数字和下划线字符。

要验证插槽是否已成功创建,请运行以下命令。

SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;

3. 在下一步中,为所有表或仅特定表创建一个发布。如果您指定表,则稍后会在发布中添加或删除表。

CREATE PUBLICATION pub FOR ALL TABLES;


或者

CREATE PUBLICATION pub FOR TABLE table1, table2, table3;

您可以选择在发布中包含哪些操作。例如,以下发布仅包括table1.

CREATE PUBLICATION insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');


4. 验证您选择的表是否在发布中。

psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
Output
pubname | schemaname | tablename
---------+------------+-----------
pub     | public     | table1
pub     | public     | table2
pub     | public     | table3
(3 rows)


从那时起,我们的出版物pub跟踪数据库中所有表的更改psql-stream

5. 让我们创建一个抽象表t并用一些记录填充它。

create table t (id int, name text);
INSERT INTO t(id, name) SELECT g.id, k.name FROM generate_series(1, 10) as g(id), substr(md5(random()::text), 0, 25) as k(name);


结果,我们在表中有 10 条记录t

psql-stream=# SELECT count(*) FROM t;
count
-------
10
(1 row)


6. 最后,是时候检查我们的逻辑复制是否工作了。

在 PostgreSQL 控制台中运行以下命令以查看 Postgres WAL 条目。

SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);


结果,您会得到类似的结果:

    lsn    | xid  |                          data                          
-----------+------+--------------------------------------------------------
 0/19EA2C0 | 1045 | BEGIN 1045
 0/19EA2C0 | 1045 | table public.t: INSERT: id[integer]:1 name[text]:51459cbc211647e7b31c8720
 0/19EA300 | 1045 | table public.t: INSERT: id[integer]:2 name[text]:51459cbc211647e7b31c8720
 0/19EA340 | 1045 | table public.t: INSERT: id[integer]:3 name[text]:51459cbc211647e7b31c8720
 0/19EA380 | 1045 | table public.t: INSERT: id[integer]:4 name[text]:51459cbc211647e7b31c8720
 0/19EA3C0 | 1045 | table public.t: INSERT: id[integer]:5 name[text]:51459cbc211647e7b31c8720
 0/19EA400 | 1045 | table public.t: INSERT: id[integer]:6 name[text]:51459cbc211647e7b31c8720
 0/19EA440 | 1045 | table public.t: INSERT: id[integer]:7 name[text]:51459cbc211647e7b31c8720
 0/19EA480 | 1045 | table public.t: INSERT: id[integer]:8 name[text]:51459cbc211647e7b31c8720
 0/19EA4C0 | 1045 | table public.t: INSERT: id[integer]:9 name[text]:51459cbc211647e7b31c8720
 0/19EA500 | 1045 | table public.t: INSERT: id[integer]:10 name[text]:51459cbc211647e7b31c8720
 0/19EA5B0 | 1045 | COMMIT 1045
(13 rows)



pg_logical_slot_peek_changes是另一个 PostgreSQL 命令,用于在不使用 WAL 条目的情况下查看更改。pg_logical_slot_peek_changes所以多次调用每次都会返回相同的结果。

另一方面,  pg_logical_slot_get_changes  仅在第一次返回结果。以下调用pg_logical_slot_get_changes返回空结果集。这意味着当get命令执行时,结果会被提供和删除,这极大地增强了我们编写使用这些事件创建表副本的逻辑的能力。


7. 记得销毁一个不再需要阻止它消耗的插槽

SELECT pg_drop_replication_slot('replication_slot');



输出插件

我们已经讨论过test_decodingPostgres 9.4+ 上可用的输出插件。虽然创建为输出插件的示例,但如果您的消费者支持它,它仍然很有用。

与插件一起,PostgreSQL 原生附带了test_decoding另一个插件。从 Postgres 10 开始可用。一些消费者支持它进行解码(例如 Debezium)。pgoutputpgoutput

运行以下命令以根据pgoutput上述步骤 2 创建插件。

SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');

以下命令使用类似于步骤 6 中描述的数据更改。

psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
    lsn    | xid  |                                           data                                           
-----------+------+------------------------------------------------------------------------------------------
 0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
 0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
 0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
 0/19A1890 | 1038 | \x49000080384e0002740000000234316e
 0/19A1910 | 1038 | \x49000080384e0002740000000234326e
 0/19A1990 | 1038 | \x49000080384e0002740000000234336e
 0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
 0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
 0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
 0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
 0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
 0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
 0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
(13 rows)

在这里您可以注意到结果以二进制格式返回。pgoutput插件产生二进制输出。

wal2json是另一个流行的逻辑解码输出插件。

这是wal2json插件的示例输出

json

  1. {
  2. "change":[
  3. {
  4. "kind":"insert",
  5. "schema":"public",
  6. "table":"t",
  7. "columnnames":[
  8. "id",
  9. "name"
  10. ],
  11. "columntypes":[
  12. "integer",
  13. "character varying(255)"
  14. ],
  15. "columnvalues":[
  16. 1,
  17. ""
  18. ]
  19. }
  20. ]
  21. }
  22. {
  23. "change":[
  24. {
  25. "kind":"update",
  26. "schema":"public",
  27. "table":"t",
  28. "columnnames":[
  29. "id",
  30. "name"
  31. ],
  32. "columntypes":[
  33. "integer",
  34. "character varying(255)"
  35. ],
  36. "columnvalues":[
  37. 1,
  38. "New Value"
  39. ],
  40. "oldkeys":{
  41. "keynames":[
  42. "id"
  43. ],
  44. "keytypes":[
  45. "integer"
  46. ],
  47. "keyvalues":[
  48. 1
  49. ]
  50. }
  51. }
  52. ]
  53. }
  54. {
  55. "change":[
  56. {
  57. "kind":"delete",
  58. "schema":"public",
  59. "table":"t",
  60. "oldkeys":{
  61. "keynames":[
  62. "id"
  63. ],
  64. "keytypes":[
  65. "integer"
  66. ],
  67. "keyvalues":[
  68. 1
  69. ]
  70. }
  71. }
  72. ]
  73. }
  74.       

关于老虎机的重要提示

使用插槽时请记住以下几点:

  • 每个插槽只有一个输出插件(您选择哪一个)。
  • 每个插槽仅提供来自一个数据库的更改。
  • 一个数据库可以有多个插槽。
  • 每个数据更改通常在每个插槽中发出一次。
  • 但是当 Postgres 实例重新启动时,插槽可能会重新发出更改。消费者必须处理这种情况。
  • 未使用的插槽对 Postgres 实例的可用性构成威胁。Postgres 将为这些未使用的更改保存所有 WAL 文件。这可能导致存储溢出。

PostgreSQL WAL 消费者

消费者是可以摄取 Postgres 逻辑解码流的任何应用程序。  pg_recvlogical是一个 PostgreSQL 应用程序,它可以管理槽并使用槽中的流。它包含在 Postgres 发行版中,因此它可能已经随 PostgreSQL 一起安装。

Golang 示例代码

以下 Golang 代码示例展示了如何开始创建您自己的 Postgress WAL 使用者。它使用 PostgreSQL-10.x 逻辑复制从源数据库流式传输数据库更改(解码的 WAL 消息)。


  1. package main

  2. import (
  3.     "context"
  4.     "fmt"
  5.     "os"
  6.     "os/signal"
  7.     "strings"
  8.     "time"

  9.     "github.com/jackc/pgconn"
  10.     "github.com/jackc/pglogrepl"
  11.     "github.com/jackc/pgproto3/v2"
  12. )

  13. // Note that runtime parameter "replication=database" in connection string is obligatory
  14. // replicaiton slot will not be created if replication=database is omitted

  15. const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
  16. const SLOT_NAME = "replication_slot"
  17. const OUTPUT_PLUGIN = "pgoutput"
  18. const INSERT_TEMPLATE = "create table t (id int, name text);"

  19. var Event = struct {
  20.     Relation string
  21.     Columns []string
  22. }{}

  23. func main() {
  24.     ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
  25.     defer cancel()
  26.     conn, err := pgconn.Connect(ctx, CONN)
  27.     if err != nil {
  28.         panic(err)
  29.     }
  30.     defer conn.Close(ctx)

  31.     // 1. Create table
  32.     if _, err := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); err != nil {
  33.         fmt.Errorf("failed to create table: %v", err)
  34.     }

  35.     // 2. ensure publication exists
  36.     if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS pub;").ReadAll(); err != nil {
  37.         fmt.Errorf("failed to drop publication: %v", err)
  38.     }

  39.     if _, err := conn.Exec(ctx, "CREATE PUBLICATION pub FOR ALL TABLES;").ReadAll(); err != nil {
  40.         fmt.Errorf("failed to create publication: %v", err)
  41.     }

  42.     // 3. create temproary replication slot server
  43.     if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
  44.         fmt.Errorf("failed to create a replication slot: %v", err)
  45.     }

  46.     var msgPointer pglogrepl.LSN
  47.     pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}

  48.     // 4. establish connection
  49.     err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
  50.     if err != nil {
  51.         fmt.Errorf("failed to establish start replication: %v", err)
  52.     }

  53.     var pingTime time.Time
  54.     for ctx.Err() != context.Canceled {
  55.         if time.Now().After(pingTime) {
  56.             if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
  57.                 fmt.Errorf("failed to send standby update: %v", err)
  58.             }
  59.             pingTime = time.Now().Add(10 * time.Second)
  60.             //fmt.Println("client: please standby")
  61.         }

  62.         ctx, cancel := context.WithTimeout(ctx, time.Second*10)
  63.         defer cancel()

  64.         msg, err := conn.ReceiveMessage(ctx)
  65.         if pgconn.Timeout(err) {
  66.             continue
  67.         }
  68.         if err != nil {
  69.             fmt.Errorf("something went wrong while listening for message: %v", err)
  70.         }

  71.         switch msg := msg.(type) {
  72.         case *pgproto3.CopyData:
  73.             switch msg.Data[0] {
  74.             case pglogrepl.PrimaryKeepaliveMessageByteID:
  75.             //    fmt.Println("server: confirmed standby")

  76.             case pglogrepl.XLogDataByteID:
  77.                 walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
  78.                 if err != nil {
  79.                     fmt.Errorf("failed to parse logical WAL log: %v", err)
  80.                 }

  81.                 var msg pglogrepl.Message
  82.                 if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
  83.                     fmt.Errorf("failed to parse logical replication message: %v", err)
  84.                 }
  85.                 switch m := msg.(type) {
  86.                 case *pglogrepl.RelationMessage:
  87.                     Event.Columns = []string{}
  88.                     for _, col := range m.Columns {
  89.                         Event.Columns = append(Event.Columns, col.Name)
  90.                     }
  91.                     Event.Relation = m.RelationName
  92.                 case *pglogrepl.InsertMessage:
  93.                     var sb strings.Builder
  94.                     sb.WriteString(fmt.Sprintf("INSERT %s(", Event.Relation))
  95.                     for i := 0; i < len(Event.Columns); i++ {
  96.                         sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
  97.                     }
  98.                     sb.WriteString(")")
  99.                     fmt.Println(sb.String())
  100.                 case *pglogrepl.UpdateMessage:
  101.                     var sb strings.Builder
  102.                     sb.WriteString(fmt.Sprintf("UPDATE %s(", Event.Relation))
  103.                     for i := 0; i < len(Event.Columns); i++ {
  104.                         sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
  105.                     }
  106.                     sb.WriteString(")")
  107.                     fmt.Println(sb.String())
  108.                 case *pglogrepl.DeleteMessage:
  109.                     var sb strings.Builder
  110.                     sb.WriteString(fmt.Sprintf("DELETE %s(", Event.Relation))
  111.                     for i := 0; i < len(Event.Columns); i++ {
  112.                         sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
  113.                     }
  114.                     sb.WriteString(")")
  115.                     fmt.Println(sb.String())
  116.                 case *pglogrepl.TruncateMessage:
  117.                     fmt.Println("ALL GONE (TRUNCATE)")
  118.                 }
  119.             }
  120.         default:
  121.             fmt.Printf("received unexpected message: %T", msg)
  122.         }
  123.     }
  124. }



此代码仅记录传入事件,但在生产环境中,您可以轻松地将它们发送到消息队列或目标数据库。

结论

PostgreSQL 中的逻辑解码为其他应用程序组件提供了一种有效的方式来与您的 Postgres 数据库中的数据更改保持同步。

传统上,一直使用拉通知模型,其中每个应用程序组件以一定的时间间隔查询 Postgres。 逻辑编码使用推送通知模型,其中 Postgres 会在每次更改发生时立即通知应用程序的其他部分。

现在可以在几毫秒内将数据更改事件发送给消费者,而无需查询数据库。通过逻辑解码,PostgreSQL 数据库成为现代动态实时应用程序的核心部分。


原文标题:How to Set Up and Run PostgreSQL Change Data Capture

原文作者:Dmitry Narizhnykh

原文地址:https://dzone.com/articles/postgresql-change-data-capture















「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论