暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

Kafka Connect -- How to remove committed offsets for a connector?

原创 张玉龙 2022-04-21
2833

一个问题困扰我好长时间,重建同名连接器不能重新执行一致性快照

我在测试 Debezium Connector for Oracle ,将 Oracle 中的数据同步到 Kafka 中,创建了一个源端连接器,用于捕获 Oracle 中的数据,连接器的名称为 snapshot-mode-initial,出于一些测试的原因,我删除了这个连接器,并且删除了这个连接器在 Kafka 创建的所有 Topic,再次使用相同的连接器名称(snapshot-mode-initial)创建连接器时,数据不能重新初始化,以下是测试示例:

  • 创建源端连接器,连接器命名 snapshot-mode-initial
[root@docker ~]# cat oracle-snapshot-mode.json { "name": "snapshot-mode-initial", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "database.hostname" : "172.17.0.2", "database.port" : "1521", "database.user" : "c##dbzuser", "database.password" : "dbz", "database.dbname" : "ORCL", "database.pdb.name" : "PDBTT", "database.server.name" : "initial", "tasks.max" : "1", "schema.include.list": "SCOTT", "snapshot.mode": "initial", "database.history.kafka.bootstrap.servers" : "192.168.0.40:9092", "database.history.kafka.topic": "schema-changes.initial" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
  • 连接器创建并自动启动后执行一致性快照,进行数据初始化,自动在Kafka中创建了以下几个 Topic
[kafka@839c4a43b889 ~]$ bin/kafka-topics.sh --bootstrap-server kafka:9092 --list __consumer_offsets my_connect_configs my_connect_offsets my_connect_statuses initial initial.SCOTT.DEPT initial.SCOTT.EMP initial.SCOTT.SALGRADE schema-changes.initial

image.png

  • 出于测试原因,删除了连接器和Topic
# 删除连接器 curl -s -X DELETE localhost:8083/connectors/snapshot-mode-initial # 删除Topic docker exec -it connect bash bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.SCOTT.DEPT bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.SCOTT.EMP bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.SCOTT.SALGRADE bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic schema-changes.initial
  • 重建连接器,还以为可以重新创建 Topic 并初始化数据呢,结果报出以下错误
# 重新注册连接器 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json # Kafka Connect 日志报以下错误 2022-04-20T12:13:45.826691242Z 2022-04-20 12:13:45,826 ERROR || WorkerSourceTask{id=snapshot-mode-initial-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] 2022-04-20T12:13:45.826696807Z io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY

image.png

  • 没想象中的重新初始化数据,确报出找不到 history topic,这里的 history topic 就是注册连接器时配置的 “database.history.kafka.topic”: “schema-changes.initial”,至于是干什么用的,这里先不提了,然而 schema-changes.initial topic 已经被我 delete 了,这可以证明一点,新注册的连接器还在用之前的信息。

那么如果重新执行一致性快照呢?–两种方法

  • 第一种方法:重新注册一个其他名称的连接器,例如:snapshot-mode-initial2,但是这个方法总是有一种不舒适的感觉。
[root@docker ~]# cat oracle-snapshot-mode.json { "name": "snapshot-mode-initial2", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "database.hostname" : "172.17.0.2", "database.port" : "1521", "database.user" : "c##dbzuser", "database.password" : "dbz", "database.dbname" : "ORCL", "database.pdb.name" : "PDBTT", "database.server.name" : "initial", "tasks.max" : "1", "schema.include.list": "SCOTT", "snapshot.mode": "initial", "database.history.kafka.bootstrap.servers" : "192.168.0.40:9092", "database.history.kafka.topic": "schema-changes.initial" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
  • 第二种方法:如标题所说,删除连接器上的 committed offsets,这种方法需要使用 kafkacat

如何删除连接器上的 committed offsets,没有测试成功,不知道啥情况

  • 有时在进行实验时(或者当连接器在开始时配置错误时),有必要删除连接器偏移(offsets)以从干净状态开始。
  • 第一步是找出包含连接器偏移量的主题(Topic)的名称,这是在 offset.storage.topic 选项中配置的,一般默认是 my_connect_offsets,也可以在 Kafka Connect 日志找到。
    image.png
    image.png
  • 下一步是找出给定连接器的最后一个偏移量,存储它的键,并确定用于存储偏移量的分区,需要使用 kafkacat 工具。
# Docker 运行 kafkacat alias kafkacat='docker run --rm edenhill/kcat:1.7.1 kcat' kafkacat -b 172.17.0.5:9092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'

image.png
连接器 snapshot-mode-initial,分区号是13,但是 {“snapshot_scn”:“14208424”,“snapshot”:true,“scn”:“14208424”,“snapshot_completed”:true} 不像是最后的偏移量,这像是记录第一次启动源端连接器执行的一致性快照的信息,执行快照的scn已经快照是否执行完成等。
{“commit_scn”:“14854345”,“transaction_id”:null,“snapshot_scn”:“14676262”,“scn”:“14844029”}像是最后的偏移量。

  • 修改连接器的信息,应停止连接器并且发出以下命令:
# 停止连接器,不知道咋停止单个连接器,还是先删了吧 curl -s -X DELETE localhost:8083/connectors/snapshot-mode-initial # 修改连接器的信息,不起作用 echo '["snapshot-mode-initial",{"server":"initial"}]#' | kafkacat -P -Z -b 172.17.0.5:9092 -t my_connect_offsets -K# -p 13 echo '["snapshot-mode-initial",{"server":"initial"}]#{"snapshot_scn":"14208424","snapshot":false,"scn":"14208424","snapshot_completed":false}' |kafkacat -P -b 172.17.0.5:9092 -t my_connect_offsets -K# -p 13
最后修改时间:2022-04-21 22:11:43
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论