一个问题困扰我好长时间,重建同名连接器不能重新执行一致性快照
我在测试 Debezium Connector for Oracle ,将 Oracle 中的数据同步到 Kafka 中,创建了一个源端连接器,用于捕获 Oracle 中的数据,连接器的名称为 snapshot-mode-initial,出于一些测试的原因,我删除了这个连接器,并且删除了这个连接器在 Kafka 创建的所有 Topic,再次使用相同的连接器名称(snapshot-mode-initial)创建连接器时,数据不能重新初始化,以下是测试示例:
- 创建源端连接器,连接器命名 snapshot-mode-initial
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
- 连接器创建并自动启动后执行一致性快照,进行数据初始化,自动在Kafka中创建了以下几个 Topic
[kafka@839c4a43b889 ~]$ bin/kafka-topics.sh --bootstrap-server kafka:9092 --list __consumer_offsets my_connect_configs my_connect_offsets my_connect_statuses initial initial.SCOTT.DEPT initial.SCOTT.EMP initial.SCOTT.SALGRADE schema-changes.initial
- 出于测试原因,删除了连接器和Topic
# 删除连接器
curl -s -X DELETE localhost:8083/connectors/snapshot-mode-initial
# 删除Topic
docker exec -it connect bash
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.SCOTT.DEPT
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.SCOTT.EMP
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.SCOTT.SALGRADE
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic schema-changes.initial
- 重建连接器,还以为可以重新创建 Topic 并初始化数据呢,结果报出以下错误
# 重新注册连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
# Kafka Connect 日志报以下错误
2022-04-20T12:13:45.826691242Z 2022-04-20 12:13:45,826 ERROR || WorkerSourceTask{id=snapshot-mode-initial-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
2022-04-20T12:13:45.826696807Z io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY
- 没想象中的重新初始化数据,确报出找不到 history topic,这里的 history topic 就是注册连接器时配置的 “database.history.kafka.topic”: “schema-changes.initial”,至于是干什么用的,这里先不提了,然而 schema-changes.initial topic 已经被我 delete 了,这可以证明一点,新注册的连接器还在用之前的信息。
那么如果重新执行一致性快照呢?–两种方法
- 第一种方法:重新注册一个其他名称的连接器,例如:snapshot-mode-initial2,但是这个方法总是有一种不舒适的感觉。
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial2",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
- 第二种方法:如标题所说,删除连接器上的 committed offsets,这种方法需要使用 kafkacat
如何删除连接器上的 committed offsets,没有测试成功,不知道啥情况
- 有时在进行实验时(或者当连接器在开始时配置错误时),有必要删除连接器偏移(offsets)以从干净状态开始。
- 第一步是找出包含连接器偏移量的主题(Topic)的名称,这是在 offset.storage.topic 选项中配置的,一般默认是 my_connect_offsets,也可以在 Kafka Connect 日志找到。
- 下一步是找出给定连接器的最后一个偏移量,存储它的键,并确定用于存储偏移量的分区,需要使用 kafkacat 工具。
# Docker 运行 kafkacat
alias kafkacat='docker run --rm edenhill/kcat:1.7.1 kcat'
kafkacat -b 172.17.0.5:9092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
连接器 snapshot-mode-initial,分区号是13,但是 {“snapshot_scn”:“14208424”,“snapshot”:true,“scn”:“14208424”,“snapshot_completed”:true} 不像是最后的偏移量,这像是记录第一次启动源端连接器执行的一致性快照的信息,执行快照的scn已经快照是否执行完成等。
{“commit_scn”:“14854345”,“transaction_id”:null,“snapshot_scn”:“14676262”,“scn”:“14844029”}像是最后的偏移量。
- 修改连接器的信息,应停止连接器并且发出以下命令:
# 停止连接器,不知道咋停止单个连接器,还是先删了吧
curl -s -X DELETE localhost:8083/connectors/snapshot-mode-initial
# 修改连接器的信息,不起作用
echo '["snapshot-mode-initial",{"server":"initial"}]#' | kafkacat -P -Z -b 172.17.0.5:9092 -t my_connect_offsets -K# -p 13
echo '["snapshot-mode-initial",{"server":"initial"}]#{"snapshot_scn":"14208424","snapshot":false,"scn":"14208424","snapshot_completed":false}' |kafkacat -P -b 172.17.0.5:9092 -t my_connect_offsets -K# -p 13
- 查找了好多资料,还是没有搞明白,太难了,这个问题先放着吧,但是我怀疑可能跟 Debezium Connector for Oracle 连接器有关系,因为记录偏移量的 Topic my_connect_offsets 是由连接器管理,而 Debezium Connector for Oracle 在第一次启动执行完成一致性快照后只支持增量快照,如果使用 Debezium Connector for MySQL 可能会成功。
- 参考文章:
https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database
https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/
https://soojong.tistory.com/entry/Source-Connector-Offset-%EC%B4%88%EA%B8%B0%ED%99%94-%ED%95%98%EA%B8%B0
https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/
https://github.com/edenhill/kcat/issues/371
https://issues.redhat.com/browse/DBZ-4820
最后修改时间:2022-04-21 22:11:43
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。