实验环境
- Debezium 版本 1.9 (2022-04-05)
- Debezium Tested Versions
- PostgreSQL 版本是单机的 14.2
- 本测试参考文档:https://debezium.io/documentation/reference/1.9/
- 基于 Debezium 的变更数据捕获的架构:
- https://github.com/debezium/docker-images
启动 Zookeeper
# 后台运行
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper
# 实时查看 zookeeper 的日志信息
docker logs -f -t --tail 10 zookeeper
启动 Kafka
# 后台运行
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka
# 实时查看 kafka 的日志信息
docker logs -f -t --tail 10 kafka
启动 PostgreSQL 14.2,使用 debezium 提供的示例镜像,里面自带了一个测试的 schemas inventory
# 创建一个数据持久化目录
mkdir -p /docker_data/postgres
chmod -R a+rwx /docker_data/postgres/
# 后台运行 14.2 版本的 PostgreSQL 数据库
docker run -d --name postgres \
-p 5432:5432 \
-e POSTGRES_PASSWORD=postgres \
-e PGDATA=/var/lib/pgdata \
-v /docker_data/postgres:/var/lib/pgdata \
quay.io/debezium/example-postgres
# 运行 psql 容器
[root@docker ~]# alias psql='docker run -it --rm --name psql debezium/example-postgres psql -h 192.168.0.40 -U postgres -p 5432'
[root@docker ~]# psql
Password for user postgres:
psql (14.2 (Debian 14.2-1.pgdg110+1))
Type "help" for help.
postgres=# select version();
version
-----------------------------------------------------------------------------------------------------------------------------
PostgreSQL 14.2 (Debian 14.2-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit
(1 row)
postgres=# \l
List of databases
Name | Owner | Encoding | Collate | Ctype | Access privileges
-----------+----------+----------+------------+------------+-----------------------
postgres | postgres | UTF8 | en_US.utf8 | en_US.utf8 |
template0 | postgres | UTF8 | en_US.utf8 | en_US.utf8 | =c/postgres +
| | | | | postgres=CTc/postgres
template1 | postgres | UTF8 | en_US.utf8 | en_US.utf8 | =c/postgres +
| | | | | postgres=CTc/postgres
(3 rows)
postgres=# \dn
List of schemas
Name | Owner
-----------+----------
inventory | postgres
public | postgres
(2 rows)
postgres=# \dt inventory.*
List of relations
Schema | Name | Type | Owner
-----------+------------------+-------+----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
postgres=# select schemaname,relname,n_live_tup from pg_stat_user_tables;
schemaname | relname | n_live_tup
------------+------------------+------------
inventory | customers | 4
inventory | products_on_hand | 9
inventory | orders | 4
inventory | products | 9
inventory | spatial_ref_sys | 8500
inventory | geom | 3
(6 rows)
看看这个 debezium 提供的 PostgreSQL 镜像中都做了哪些配置
- pg_hba.conf
[root@docker ~]# cd /docker_data/postgres/
[root@docker postgres]# cat pg_hba.conf
host all all all scram-sha-256
host replication postgres 0.0.0.0/0 trust
- postgresql.conf
listen_addresses = '*'
shared_preload_libraries = 'decoderbufs,wal2json'
wal_level = logical
max_wal_senders = 4
#wal_keep_segments = 4
#wal_sender_timeout = 60s
max_replication_slots = 4
启动 Kafka Connect
# 后台运行
docker run -d --name connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link postgres:postgres \
quay.io/debezium/connect
# 实时查看 Kafka Connect 的日志信息
docker logs -f -t --tail 10 connect
Debezium PostgreSQL connector
- 准备 Debezium PostgreSQL connector 配置文件
将配置文件创建在 docker 宿主机上即可,connect 容器开放了 REST API 来管理 Debezium 的连接器
[root@docker ~]# vi pgsql-inventory-connector.json
{
"name": "pgsql-inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"database.server.name": "pgsql",
"slot.name": "inventory_slot",
"table.include.list": "inventory.orders,inventory.products",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered",
"plugin.name": "pgoutput"
}
}
- 向 Kafka 连接器注册 Debezium PostgreSQL connector
[root@docker ~]# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-inventory-connector.json
HTTP/1.1 201 Created
Date: Fri, 22 Apr 2022 00:08:47 GMT
Location: http://192.168.0.40:8083/connectors/pgsql-inventory-connector
Content-Type: application/json
Content-Length: 551
Server: Jetty(9.4.43.v20210629)
{"name":"pgsql-inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","database.server.name":"pgsql","slot.name":"inventory_slot","table.include.list":"inventory.orders,inventory.products","publication.name":"dbz_inventory_connector","publication.autocreate.mode":"filtered","plugin.name":"pgoutput","name":"pgsql-inventory-connector"},"tasks":[],"type":"source"}
使用 kafka-ui 核对捕获到的数据
kafka-ui:Open-Source Web GUI for Apache Kafka Management:https://github.com/provectus/kafka-ui
docker run -p 8811:8080 \ -e KAFKA_CLUSTERS_0_NAME=oracle-scott-connector \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=192.168.0.40:9092 \ -d provectuslabs/kafka-ui:latest
网页登录:http://192.168.0.40:8811/
模拟业务
- INSERT
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
(4 rows)
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);
INSERT 0 1
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 1 | 102
(5 rows)
- UPDATE
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 1 | 102
(5 rows)
postgres=# update inventory.orders set quantity=2 where id=11001;
UPDATE 1
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 2 | 102
(5 rows)
- DELETE
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 2 | 102
(5 rows)
postgres=# delete from inventory.orders where id = 11001;
DELETE 1
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
(4 rows)
一个问题,“schema.include.list” 捕获的表不全
当连接器属性配置 “schema.include.list”: “inventory”,正常来说会捕获 schema inventory 里面的所有表,但是测试发现少捕获一张 spatial_ref_sys 表,没整明白啥情况。
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "initial",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
监控 PostgreSQL 的复制槽
postgres=# select * from pg_replication_slots;
最后修改时间:2022-04-27 16:17:38
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。