暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

Debezium对源库DML操作的同步测试

原创 张玉龙 2022-04-29
5467

image.png

问题:如何同步无主键表的 DELETE 操作?

  • Kafka 要想将 DELETE 操作同步到目标库,需要使用墓碑事件(Tombstone events),也就是 Kafka 消息的 Key 不为空,而 Value 是空。
  • 对于存在主键的表,Kafka 消息的 Key 值使用这个表的主键列。
  • 对于不存在主键的表,Kafka 消息的 Key 值默认是空,这样 DELETE 消息就会被跳过。
  • 对于不存在主键的表,Debezium 连接器提供了配置参数 message.key.columns,使用指定列(复合列)生成 Kafka 消息的 Key 值,但是要保证指定列(复合列)不会出现空值的情况,就像 OGG 针对无主键表使用全列一样。

初始测试环境

  • 源端 PostgreSQL
postgres=# create database test_dml; postgres=# \c test_dml test_dml=# create schema inventory; test_dml=# CREATE TABLE inventory.orders ( id integer NOT NULL, order_date date NOT NULL, purchaser integer NOT NULL, quantity integer NOT NULL, product_id integer NOT NULL ); test_dml=# ALTER TABLE ONLY inventory.orders ADD CONSTRAINT orders_pkey PRIMARY KEY (id); test_dml=# insert into inventory.orders values (10001,now(),1001,1,102); insert into inventory.orders values (10002,now(),1002,2,105); insert into inventory.orders values (10003,now(),1003,2,106); insert into inventory.orders values (10004,now(),1004,1,107);
  • 目标端 Oracle 19C PDB
# sqlplus sys/oracle@192.168.0.40:1521/pdbtt as sysdba SQL> create user test identified by test; SQL> grant connect,resource,create view to test; SQL> grant unlimited tablespace to test; -- 存在时间列转换的问题,以后研究 SQL> CREATE TABLE test.orders ( id number NOT NULL, order_date number NOT NULL, purchaser number NOT NULL, quantity number NOT NULL, product_id number NOT NULL ); SQL> alter table test.orders add constraint orders_pkey primary key(id);

连接器默认配置

# 捕获源端数据的连接器初始配置 [root@docker tutorial]# cat register-postgres-key.json { "name": "inventory-connector-key", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test_dml", "database.server.name": "test_dml", "snapshot.mode": "always", "schema.include.list": "inventory", "slot.name": "test_dml_slot" } } curl -s -X DELETE localhost:8083/connectors/inventory-connector-key curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json # 同步给目标端的连接器初始配置 [root@docker tutorial]# cat oracle-testdml-sink.json { "name": "oracle-testdml-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "test", "connection.password": "test", "tasks.max": "1", "topics": "test_dml.inventory.orders", "table.name.format": "ORDERS", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false" } } curl -s -X DELETE localhost:8083/connectors/oracle-testdml-sink curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json

先看看有主键表的DML操作同步情况

image.png
image.png

insert into inventory.orders values (11001,now(),1003,1,102);

image.png

  • Kafka 消息的 Key 值默认使用这个表的主键列。

image.png

  • before: 是这条记录的旧值,对于 INSERT 操作没有旧值,而 UPDATE 和 DELETE 操作会记录旧值,但针对源端是 PostgreSQL 数据库的情况下,可以记录旧值的前提是被同步的表配置了 REPLICA IDENTITY,后面有介绍。
  • after: 是这条记录的新值,对于 DELETE 操作,alter 为 NULL。
  • op: 是这条消息的类型:c = create、u = update、d = delete、r = read (applies to only snapshots)、t = truncate、m = message
update inventory.orders set quantity=2 where id=11001;

image.png
image.png

  • before 没有之前的数据,原因是没有配置表的 REPLICA IDENTITY 属性。
delete from inventory.orders where id = 11001;

image.png

  • 针对 DELETE 操作,会在 Kafka 中产生两条消息事件,其中一条消息事件的 Value 是 NULL,此消息事件就是墓碑事件(Tombstone events),用作删除目标端的记录,实现 DELETE 操作的同步。

image.png
image.png

  • before 除了主键,其他列都是0,原因是没有配置表的 REPLICA IDENTITY 属性。

image.png
image.png

REPLICA IDENTITY

  • 特定于 PostgreSQL 的表级设置,仅在使用逻辑复制时有效。
  • 控制表的更改写入WAL日志的信息,以识别 UPDATE 或 DELETE 事件的行。
  • 每当发生 UPDATE 或 DELETE 事件时,REPLICA IDENTITY 的设置控制了哪些信息(如果有)可用于所涉及的表列的先前值。
  • 4个设置选项值
    • DEFAULT:记录主键列(如果有)的旧值,这是非系统表的默认值。
    • USING INDEX index_name:记录指定索引所包含的列的旧值。
    • FULL:记录行中所有列的旧值。
    • NOTHING:不记录旧值,这是系统表的默认设置。
  • 如果表没有主键,则连接器不会为该表发出 UPDATE 或 DELETE 事件,对于没有主键的表,连接器仅发出 CREATE 事件。
  • 设置方式
ALTER TABLE ONLY inventory.orders REPLICA IDENTITY FULL;
  • UPDATE

image.png

  • DELETE

image.png

目标端消费

  • 目标端连接器使用本文开头 <连接器默认配置> 章节的默认配置,此处会涉及三个参数,默认配置 delete.enabled=false,pk.mode=none,insert.mode=insert

问题一:不能同步 DELETE 操作

  • Kafka Connector 的日志显示的报错信息
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException: Sink connector 'oracle-testdml-sink' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='test_dml.inventory.orders',partition=0,offset=7,timestamp=1651152793683) with a null value and null value schema.
  • 不能同步 DELETE 操作,连接器上需要添加 “delete.enabled”: “true”, “pk.mode”: “record_key”
  • delete.enabled 默认是 false,pk.mode 默认是 none

问题二:违反唯一约束?

ORA-00001: unique constraint (TEST.SYS_C007736) violated
  • 主备端都是有主键的表,且备端的数据都是从主端同步过来的,为什么会违反唯一约束?
  • 这里先不说这个问题,但是解决方法是在问题一的基础上再添加参数 “insert.mode”: “upsert”
  • insert.mode 默认是 insert

image.png

  • 正常同步了上面的 INSERT、UPDATE 和 DELETE 操作。

再看看无主键表的DML操作同步情况

image.png
image.png

  • 针对无主键的表,数据写入 Kafka ,默认情况下消息的 Key 为空。
insert into inventory.orders2 values (11001,now(),1003,1,102);

image.png
image.png

  • INSERT 操作可以正常同步,但是 UPDATE 和 DELETE 操作的记录被跳过,在源端是 PostgreSQL 数据库时,就体现出配置表的REPLICA IDENTITY属性的重要性。
update inventory.orders2 set quantity=2 where id=11001;
tutorial-connect-1 | 2022-04-28 13:41:53,005 WARN Postgres|test_dml|streaming no new values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.Value", "type" : "STRUCT", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.Date", "type" : "INT32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}]} }' from update message at 'Struct{version=1.9.0.Final,connector=postgresql,name=test_dml,ts_ms=1651153312826,db=test_dml,sequence=["152831320","152831376"],schema=inventory,table=orders2,txId=833,lsn=152831376}'; skipping record [io.debezium.relational.RelationalChangeRecordEmitter]
delete from inventory.orders2 where id = 11001;
tutorial-connect-1 | 2022-04-28 13:43:12,205 WARN Postgres|test_dml|streaming no old values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.Value", "type" : "STRUCT", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.Date", "type" : "INT32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}]} }' from delete message at 'Struct{version=1.9.0.Final,connector=postgresql,name=test_dml,ts_ms=1651153391920,db=test_dml,sequence=["152831512","152831800"],schema=inventory,table=orders2,txId=834,lsn=152831800}'; skipping record [io.debezium.relational.RelationalChangeRecordEmitter]

REPLICA IDENTITY

  • 为表配置 REPLICA IDENTITY,无主键表的 UPDATE 和 DELETE 操作可以正常到 Kafka
ALTER TABLE ONLY inventory.orders2 REPLICA IDENTITY FULL;
  • UPDATE

image.png

  • DELETE

image.png
image.png
image.png

目标端消费

  • 重新初始化环境,删除源端的连接器,删除 Topics,重新注册连接器,保持无主键表的原始状态
    image.png
  • 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “true”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException: Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687) with a null key and null key schema.
  • 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException: Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687) with a null key and null key schema.
  • 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “none”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException: Write to table '"ORDERS2"' in UPSERT mode requires key field names to be known, check the primary key configuration
  • 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert”
    image.png
# INSERT 和 UPDATE 都能支持,一样不支持 DELETE 操作 tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException: Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_value' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='test_dml.inventory.orders2',partition=0,offset=9,timestamp=1651160261336) with a null value and null value schema.

无主键表的几个问题

  • 问题一:无主键表的 Key 是空值,所以不能使用 “delete.enabled”: “true”,“pk.mode”: “record_key”
  • 修改 “pk.mode”: “record_value”,加入参数 “pk.fields”: “id”,让 Key 使用 Value 中的字段
{ "error_code": 400, "message": "Connector configuration is invalid and contains the following 2 error(s): Deletes are only supported for pk.mode record_key You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`" }
  • “delete.enabled”: “true” 只能和 “pk.mode”: “record_key” 搭配使用,如果想要同步 DELETE 操作,Key 必须不为空

  • 问题二:“insert.mode”: “upsert” 需要主键,对于没有主键的表需要配置 “pk.mode”: “record_value”,“pk.fields”: “id”

  • 问题三:“delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert” 一样不支持 DELETE 操作,如果想要同步 DELETE 操作,Key 必须不为空

  • 下一章节的 message.key.columns 参数均可解决这这几个问题

Debezium 为无主键表提供 message.key.columns 参数

image.png

[root@docker tutorial]# cat register-postgres-key.json { "name": "inventory-connector-key", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test_dml", "database.server.name": "test_dml", "snapshot.mode": "always", "table.include.list": "inventory.orders3", "slot.name": "test_dml_slot", "message.key.columns": "inventory.orders3:id,product_id" } } curl -s -X DELETE localhost:8083/connectors/inventory-connector-key curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json [root@docker tutorial]# cat oracle-testdml-sink.json { "name": "oracle-testdml-sink3", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "test", "connection.password": "test", "tasks.max": "1", "topics": "test_dml.inventory.orders3", "table.name.format": "ORDERS3", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "delete.enabled": "true", "pk.mode": "record_key", "insert.mode": "upsert" } } curl -s -X DELETE localhost:8083/connectors/oracle-testdml-sink3 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json

image.png

insert into inventory.orders3 values (11001,now(),1003,1,102);

image.png
image.png

update inventory.orders3 set quantity=2 where id=11001;

image.png
image.png

delete from inventory.orders3 where id = 11001;

image.png
image.png
image.png
image.png
image.png

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论