Debezium 的 Oracle 连接器在第一次启动时,默认会执行数据库的初始一致性快照,相当于导出全量数据再导入到Kafka。
可以通过设置连接器配置属性 snapshot.mode (默认:initial) 的值来自定义连接器创建快照的方式。
当 snapshot.mode 设置为 (默认:initial) 时,连接器完成以下任务来创建快照:
- 确定要捕获的表
- 获取每个要捕获表的 ROW SHARE MODE 锁,以防止在创建快照期间更改表结构,Debezium 持有锁的时间很短。
- 从数据库的 Redo 日志中读取当前系统更改号(SCN)位置。
- 捕获所有相关表的表结构。
- 释放 步骤2 中获得的锁。
- 在 步骤3 中读取的 SCN 位置扫描所有相关的数据库表(SELECT * FROM … AS OF SCN 123),为每一行生成一个 READ 事件,然后将事件记录写入到 Kafka 主题(topic)。
- 在连接器偏移(offsets)中记录快照的成功完成。
执行创建快照的进程开始后,如果进程因连接器故障、重新平衡或其他原因而中断,连接器重启后快照进程也会重新启动。连接器完成初始一致性快照后,它会继续从 步骤3 中读取的 SCN 位置进行流式传输,以免丢失任何数据。如果连接器由于某种原因再次停止,则在连接器重新启动后,它将从之前停止的位置继续恢复数据的流式传输。
snapshot.mode 支持的参数配置,这个参数只在连接器在第一次启动时起作用
参数值 | 描述 |
---|---|
initial(默认) | 连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。 |
initial_only | 连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。 |
schema_only | 连接器只捕获所有相关表的表结构,不捕获初始数据,但是会同步后续数据库的更改记录 |
schema_only_recovery | 设置此选项可恢复丢失或损坏的数据库历史主题(database.history.kafka.topic)。 |
snapshot.mode = initial
- 向 Kafka Connect 注册并启动一个新的 Debezium Oracle Connector,添加选项 “snapshot.mode”: “initial”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
-
源端数据库里的数据情况,SCOTT 用户下有5张表,其中一张表BONUS里面没有数据
-
Debezium Oracle Connector 第一次启动后执行初始一致性快照,将全量数据导出转换写入到Kafka,可以看到 Kafka 中为每张有数据的表建了一个 Topics,Topics 的名称格式是 <serverName.schemaName.tableName>,其中表 BONUS 没有数据,所以没有建立 Topics,但是在 Topics initial 中记录的表的DDL语句,同时还建立了一个历史Topics schema-changes.initial,表里的每条数据在 Kafka 中存储为一条消息(Messages),以下截图也可以看出每个 Topics 的 Messages 个数与 Oracle 中表的数据行数一致。
-
源端数据库执行 DML 操作,自动同步到 Kafka
SQL> insert into dept values (50,'AAAA','A');
SQL> commit;
SQL> update dept set DNAME='BBBB' where DEPTNO=50;
SQL> commit;
SQL> delete from dept where DEPTNO=50;
SQL> commit;
snapshot.mode = initial_only
- 向 Kafka Connect 注册并启动一个新的 Debezium Oracle Connector,添加选项 “snapshot.mode”: “initial_only”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial-only",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "initial-only",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "initial_only",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial_only"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
-
源端数据库里的数据情况
-
Debezium Oracle Connector 第一次启动后执行初始一致性快照
-
源端数据库执行 DML 操作,此时不会同步一致性快照以后的变更数据
SQL> insert into dept values (60,'BBBB','B');
SQL> commit;
- 可以看到 snapshot.mode = initial 的表已经同步数据了,但是 snapshot.mode = initial_only 的表并没有同步数据
- 连接器的状态还是RUNNING
curl -s -X GET localhost:8083/connectors/snapshot-mode-initial-only/status | jq
- Topics my_connect_offsets 中也记录了快照信息
snapshot.mode = schema_only
- 向 Kafka Connect 注册并启动一个新的 Debezium Oracle Connector,添加选项 “snapshot.mode”: “schema_only”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-schema-only",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "schema-only",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "schema_only",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.schema_only"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
- 源端数据库里的数据情况
- Debezium Oracle Connector 第一次启动后不会执行初始一致性快照,只将表的DDL表结构写入到Kafka,可以看到 Kafka 的没有存放数据的 Topics,只有一个存放表的DDL的 Topics
[kafka@4c24d79ab670 ~]$ bin/kafka-topics.sh --bootstrap-server kafka:9092 --list |grep schema-only schema-only
- 源端数据库执行 DML 操作,自动同步到 Kafka
SQL> insert into dept values (70,'CCCC','C');
SQL> commit;
snapshot.mode = schema_only_recovery
当 database.history.kafka.topic 被删除了,可以使用 snapshot.mode = schema_only_recovery 来恢复
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic schema-changes.initial
- 向 Kafka Connect 注册并启动一个新的 Debezium Oracle Connector,添加选项 “snapshot.mode”: “schema_only_recovery”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "SCHEMA_ONLY_RECOVERY",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
# 删除连接器
curl -s -X DELETE localhost:8083/connectors/snapshot-mode-initial
# 重新注册连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
总结一点
功能可以实现,但是捕获延迟很严重,变更一条记录要好长时间才能捕获到,不知道是配置的问题还是连接器本身的问题,就是感觉对Oracle的兼容不是太友好。