暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

在Docker环境上使用Kafka Connect JDBC将变更数据从Kafka应用到Oracle 19C PDB

原创 张玉龙 2022-04-22
2600

image.png

实验环境

启动 Oracle 19C 数据库,创建一个测试用户

参考文章:使用Docker装一个Oracle 19C的单机测试环境

# sqlplus sys/oracle@192.168.0.40:1521/pdbtt as sysdba SQL> create user inventory identified by inventory; SQL> grant connect,resource,create view to inventory; SQL> grant unlimited tablespace to inventory;

配置 JDBC Sink Connector

上传驱动和JDBC连接器

将下载的 Oracle JDBC 驱动和 Kafka Connect JDBC Connector(连接器) 上传服务器并复制到 connect 容器中

[root@docker ~]# ls -lrt -rw-r--r--. 1 root root 4458107 Apr 17 22:12 ojdbc8-19.14.0.0.jar -rw-r--r--. 1 root root 20208429 Apr 17 22:12 confluentinc-kafka-connect-jdbc-10.4.1.zip # 上传 Oracle JDBC 驱动,如果使用 Kafka Connect JDBC Connector 自带的驱动可以忽略此处 docker cp ojdbc8-19.14.0.0.jar connect:/kafka/libs # 上传 Kafka Connect JDBC Connector unzip confluentinc-kafka-connect-jdbc-10.4.1.zip chown -R 1001:1001 confluentinc-kafka-connect-jdbc-10.4.1 docker cp confluentinc-kafka-connect-jdbc-10.4.1 connect:/kafka/connect # 重启 Kafka Connect 连接器 docker restart connect

查看现有连接器信息

  • 安装 jq,用于格式化 JSON 格式
yum install -y wget wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum install -y jq
  • 查看当前存在哪些连接器
curl -s localhost:8083/connectors/ | jq

image.png

  • 查看连接器的具体信息
curl -s localhost:8083/connectors/snapshot-mode-initial | jq

image.png

配置连接目标端 Oracle 19C PDB 的连接器

  • 查看下目标端 Oracle 容器的IP地址
[root@docker ~]# docker inspect ora19c |grep IPAddress "SecondaryIPAddresses": null, "IPAddress": "172.17.0.2", "IPAddress": "172.17.0.2",
[root@docker ~]# vi oracle-jdbc-sink.json { "name": "oracle-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "initial.inventory.orders", "table.name.format": "orders", "dialect.name": "OracleDatabaseDialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } }
  • 向 Kafka 连接器注册 JDBC Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-jdbc-sink.json

查看已注册的连机器信息

# 当前已注册的连接器 curl -s localhost:8083/connectors/ | jq # 连接器的具体信息 curl -s localhost:8083/connectors/oracle-jdbc-sink | jq

image.png

  • 查看连接器的运行状态
curl -s localhost:8083/connectors/oracle-jdbc-sink/status | jq

image.png

遇到第一个问题,但是这个问题是由方法解决的,后面再说吧

  • 向 Kafka 连接器注册 JDBC Sink Connector 之后,连接器会自动连接到 Oracle PDB 上建表插入数据,但是自动建的表名上带有双引号,强制将表名转为了小写。
    image.png
  • 查看 Oracle PDB 表中的数据
    image.png

验证目标端的数据

image.png

Oracle 端模拟业务

  • INSERT
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id -------+------------+-----------+----------+------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 (4 rows) postgres=# insert into inventory.orders values (11001,now(),1003,1,102); INSERT 0 1

image.png

  • UPDATE
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id -------+------------+-----------+----------+------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 11001 | 2022-04-22 | 1003 | 1 | 102 (5 rows) postgres=# update inventory.orders set quantity=2 where id=11001; UPDATE 1

image.png

  • DELETE
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id -------+------------+-----------+----------+------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 11001 | 2022-04-22 | 1003 | 2 | 102 (5 rows) postgres=# delete from inventory.orders where id = 11001; DELETE 1

遇到第二个问题,不同步 DELETE 操作

  • 源端 PostgreSQL 执行 DELETE 操作,发现目标端 Oracle 没有同步执行 DELETE 操作,查看 Kafka Connect 日志发现这是有参数限制的呀
    image.png
    tasks 也失败了
    image.png

  • 更新 sink 连接器,配置 delete.enabled 和 pk.mode

[root@docker ~]# vi oracle-jdbc-sink_update.json { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "initial.inventory.orders", "table.name.format": "orders", "dialect.name": "OracleDatabaseDialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "pk.mode": "record_key" } curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/oracle-jdbc-sink/config -d @oracle-jdbc-sink_update.json

tasks 恢复正常
image.png
DELETE 操作也同步到目标端
image.png

查看消费者组

image.png

最后修改时间:2022-04-27 16:18:06
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
1人已赞赏
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论