问题:如何同步无主键表的 DELETE 操作?
- Kafka 要想将 DELETE 操作同步到目标库,需要使用墓碑事件(Tombstone events),也就是 Kafka 消息的 Key 不为空,而 Value 是空。
- 对于存在主键的表,Kafka 消息的 Key 值使用这个表的主键列。
- 对于不存在主键的表,Kafka 消息的 Key 值默认是空,这样 DELETE 消息就会被跳过。
- 对于不存在主键的表,Debezium 连接器提供了配置参数 message.key.columns,使用指定列(复合列)生成 Kafka 消息的 Key 值,但是要保证指定列(复合列)不会出现空值的情况,就像 OGG 针对无主键表使用全列一样。
- 源端 PostgreSQL
postgres=# create database test_dml;
postgres=# \c test_dml
test_dml=# create schema inventory;
test_dml=# CREATE TABLE inventory.orders (
id integer NOT NULL,
order_date date NOT NULL,
purchaser integer NOT NULL,
quantity integer NOT NULL,
product_id integer NOT NULL
test_dml=# ALTER TABLE ONLY inventory.orders ADD CONSTRAINT orders_pkey PRIMARY KEY (id);
test_dml=# insert into inventory.orders values (10001,now(),1001,1,102);
insert into inventory.orders values (10002,now(),1002,2,105);
insert into inventory.orders values (10003,now(),1003,2,106);
insert into inventory.orders values (10004,now(),1004,1,107);
- 目标端 Oracle 19C PDB
# sqlplus sys/oracle@ as sysdba
SQL> create user test identified by test;
SQL> grant connect,resource,create view to test;
SQL> grant unlimited tablespace to test;
-- 存在时间列转换的问题,以后研究
SQL> CREATE TABLE test.orders (
id number NOT NULL,
order_date number NOT NULL,
purchaser number NOT NULL,
quantity number NOT NULL,
product_id number NOT NULL
SQL> alter table test.orders add constraint orders_pkey primary key(id);
# 捕获源端数据的连接器初始配置
[root@docker tutorial]# cat register-postgres-key.json
"name": "inventory-connector-key",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test_dml",
"database.server.name": "test_dml",
"snapshot.mode": "always",
"schema.include.list": "inventory",
"slot.name": "test_dml_slot"
curl -s -X DELETE localhost:8083/connectors/inventory-connector-key
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json
# 同步给目标端的连接器初始配置
[root@docker tutorial]# cat oracle-testdml-sink.json
"name": "oracle-testdml-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//",
"connection.user": "test",
"connection.password": "test",
"tasks.max": "1",
"topics": "test_dml.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
curl -s -X DELETE localhost:8083/connectors/oracle-testdml-sink
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json
insert into inventory.orders values (11001,now(),1003,1,102);
- Kafka 消息的 Key 值默认使用这个表的主键列。
- before: 是这条记录的旧值,对于 INSERT 操作没有旧值,而 UPDATE 和 DELETE 操作会记录旧值,但针对源端是 PostgreSQL 数据库的情况下,可以记录旧值的前提是被同步的表配置了 REPLICA IDENTITY,后面有介绍。
- after: 是这条记录的新值,对于 DELETE 操作,alter 为 NULL。
- op: 是这条消息的类型:c = create、u = update、d = delete、r = read (applies to only snapshots)、t = truncate、m = message
update inventory.orders set quantity=2 where id=11001;
- before 没有之前的数据,原因是没有配置表的 REPLICA IDENTITY 属性。
delete from inventory.orders where id = 11001;
- 针对 DELETE 操作,会在 Kafka 中产生两条消息事件,其中一条消息事件的 Value 是 NULL,此消息事件就是墓碑事件(Tombstone events),用作删除目标端的记录,实现 DELETE 操作的同步。
- before 除了主键,其他列都是0,原因是没有配置表的 REPLICA IDENTITY 属性。
- 特定于 PostgreSQL 的表级设置,仅在使用逻辑复制时有效。
- 控制表的更改写入WAL日志的信息,以识别 UPDATE 或 DELETE 事件的行。
- 每当发生 UPDATE 或 DELETE 事件时,REPLICA IDENTITY 的设置控制了哪些信息(如果有)可用于所涉及的表列的先前值。
- 4个设置选项值
- DEFAULT:记录主键列(如果有)的旧值,这是非系统表的默认值。
- USING INDEX index_name:记录指定索引所包含的列的旧值。
- FULL:记录行中所有列的旧值。
- NOTHING:不记录旧值,这是系统表的默认设置。
- 如果表没有主键,则连接器不会为该表发出 UPDATE 或 DELETE 事件,对于没有主键的表,连接器仅发出 CREATE 事件。
- 设置方式
- 目标端连接器使用本文开头 <连接器默认配置> 章节的默认配置,此处会涉及三个参数,默认配置 delete.enabled=false,pk.mode=none,insert.mode=insert
问题一:不能同步 DELETE 操作
- Kafka Connector 的日志显示的报错信息
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Sink connector 'oracle-testdml-sink' is configured with 'delete.enabled=false' and 'pk.mode=none'
and therefore requires records with a non-null Struct value and non-null Struct schema,
but found record at (topic='test_dml.inventory.orders',partition=0,offset=7,timestamp=1651152793683)
with a null value and null value schema.
- 不能同步 DELETE 操作,连接器上需要添加 “delete.enabled”: “true”, “pk.mode”: “record_key”
- delete.enabled 默认是 false,pk.mode 默认是 none
ORA-00001: unique constraint (TEST.SYS_C007736) violated
- 主备端都是有主键的表,且备端的数据都是从主端同步过来的,为什么会违反唯一约束?
- 这里先不说这个问题,但是解决方法是在问题一的基础上再添加参数 “insert.mode”: “upsert”
- insert.mode 默认是 insert
- 正常同步了上面的 INSERT、UPDATE 和 DELETE 操作。
- 针对无主键的表,数据写入 Kafka ,默认情况下消息的 Key 为空。
insert into inventory.orders2 values (11001,now(),1003,1,102);
- INSERT 操作可以正常同步,但是 UPDATE 和 DELETE 操作的记录被跳过,在源端是 PostgreSQL 数据库时,就体现出配置表的REPLICA IDENTITY属性的重要性。
update inventory.orders2 set quantity=2 where id=11001;
tutorial-connect-1 | 2022-04-28 13:41:53,005 WARN Postgres|test_dml|streaming no new values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.Value", "type" : "STRUCT", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.Date", "type" : "INT32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}]} }'
from update message at 'Struct{version=1.9.0.Final,connector=postgresql,name=test_dml,ts_ms=1651153312826,db=test_dml,sequence=["152831320","152831376"],schema=inventory,table=orders2,txId=833,lsn=152831376}';
skipping record [io.debezium.relational.RelationalChangeRecordEmitter]
delete from inventory.orders2 where id = 11001;
tutorial-connect-1 | 2022-04-28 13:43:12,205 WARN Postgres|test_dml|streaming no old values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.Value", "type" : "STRUCT", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.Date", "type" : "INT32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}]} }'
from delete message at 'Struct{version=1.9.0.Final,connector=postgresql,name=test_dml,ts_ms=1651153391920,db=test_dml,sequence=["152831512","152831800"],schema=inventory,table=orders2,txId=834,lsn=152831800}';
skipping record [io.debezium.relational.RelationalChangeRecordEmitter]
- 为表配置 REPLICA IDENTITY,无主键表的 UPDATE 和 DELETE 操作可以正常到 Kafka
- 重新初始化环境,删除源端的连接器,删除 Topics,重新注册连接器,保持无主键表的原始状态
- 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “true”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=true' and 'pk.mode=record_key'
and therefore requires records with a non-null key and non-null Struct or primitive key schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687)
with a null key and null key schema.
- 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_key'
and therefore requires records with a non-null key and non-null Struct or primitive key schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687)
with a null key and null key schema.
- 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “none”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Write to table '"ORDERS2"' in UPSERT mode requires key field names to be known, check the primary key configuration
- 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert”
# INSERT 和 UPDATE 都能支持,一样不支持 DELETE 操作
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_value'
and therefore requires records with a non-null Struct value and non-null Struct schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=9,timestamp=1651160261336)
with a null value and null value schema.
- 问题一:无主键表的 Key 是空值,所以不能使用 “delete.enabled”: “true”,“pk.mode”: “record_key”
- 修改 “pk.mode”: “record_value”,加入参数 “pk.fields”: “id”,让 Key 使用 Value 中的字段
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):
Deletes are only supported for pk.mode record_key
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
“delete.enabled”: “true” 只能和 “pk.mode”: “record_key” 搭配使用,如果想要同步 DELETE 操作,Key 必须不为空
问题二:“insert.mode”: “upsert” 需要主键,对于没有主键的表需要配置 “pk.mode”: “record_value”,“pk.fields”: “id”
问题三:“delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert” 一样不支持 DELETE 操作,如果想要同步 DELETE 操作,Key 必须不为空
下一章节的 message.key.columns 参数均可解决这这几个问题
Debezium 为无主键表提供 message.key.columns 参数
[root@docker tutorial]# cat register-postgres-key.json
"name": "inventory-connector-key",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test_dml",
"database.server.name": "test_dml",
"snapshot.mode": "always",
"table.include.list": "inventory.orders3",
"slot.name": "test_dml_slot",
"message.key.columns": "inventory.orders3:id,product_id"
curl -s -X DELETE localhost:8083/connectors/inventory-connector-key
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json
[root@docker tutorial]# cat oracle-testdml-sink.json
"name": "oracle-testdml-sink3",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//",
"connection.user": "test",
"connection.password": "test",
"tasks.max": "1",
"topics": "test_dml.inventory.orders3",
"table.name.format": "ORDERS3",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"delete.enabled": "true",
"pk.mode": "record_key",
"insert.mode": "upsert"
curl -s -X DELETE localhost:8083/connectors/oracle-testdml-sink3
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json
insert into inventory.orders3 values (11001,now(),1003,1,102);
update inventory.orders3 set quantity=2 where id=11001;
delete from inventory.orders3 where id = 11001;