实验环境
- Kafka 中的数据来自于捕获的 Oracle 19C PDB 的变更数据,参考文章:在Docker环境上使用Debezium捕获Oracle 19C PDB中的变更数据到Kafka
- 准备 Kafka Connect JDBC Connector(连接器),本实验使用的版本是 10.4.1,下载地址:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
- 准备 PostgreSQL jdbc 驱动,Kafka Connect JDBC Connector里面包含了 PostgreSQL jdbc 驱动(postgresql-42.3.3.jar),如果想使用新版的驱动,也可以自行下载,下载地址:https://jdbc.postgresql.org/download.html
- 本文参考postgresql大神的文章:Debezium替换ogg的测试(三) 下游PG数据库实时同步,感谢分享。
启动 PostgreSQL 并创建测试库
PostgreSQL 的 Docker 仓库:https://hub.docker.com/_/postgres
# 创建一个数据持久化目录
mkdir -p /docker_data/postgres
chmod -R a+rwx /docker_data/postgres/
# 后台运行14.2版本的 PostgreSQL 数据库
docker run -d --name postgres \
-p 5432:5432 \
-e POSTGRES_PASSWORD=postgres \
-e PGDATA=/var/lib/postgresql/data/pgdata \
-v /docker_data/postgres:/var/lib/postgresql/data \
postgres:14.2
# 运行 psql 容器
[root@docker ~]# alias psql="docker run -it --rm --name psql postgres:14.2 psql -h 192.168.0.40 -U postgres -p 5432"
[root@docker ~]# psql
Password for user postgres:
psql (14.2 (Debian 14.2-1.pgdg110+1))
Type "help" for help.
postgres=# create database scott;
CREATE DATABASE
postgres=# \l
List of databases
Name | Owner | Encoding | Collate | Ctype | Access privileges
-----------+----------+----------+------------+------------+-----------------------
postgres | postgres | UTF8 | en_US.utf8 | en_US.utf8 |
scott | postgres | UTF8 | en_US.utf8 | en_US.utf8 |
template0 | postgres | UTF8 | en_US.utf8 | en_US.utf8 | =c/postgres +
| | | | | postgres=CTc/postgres
template1 | postgres | UTF8 | en_US.utf8 | en_US.utf8 | =c/postgres +
| | | | | postgres=CTc/postgres
(4 rows)
配置 JDBC Sink Connector
上传驱动和JDBC连接器
将下载的 postgresql JDBC 驱动和 Kafka Connect JDBC Connector(连接器) 上传服务器并复制到 connect 容器中
[root@docker ~]# ls -lrt
-rw-r--r--. 1 root root 1006732 Apr 17 22:12 postgresql-42.2.25.jar
-rw-r--r--. 1 root root 20208429 Apr 17 22:12 confluentinc-kafka-connect-jdbc-10.4.1.zip
# 上传 postgresql JDBC 驱动,如果使用 Kafka Connect JDBC Connector 自带的驱动可以忽略此处
docker cp postgresql-42.2.25.jar connect:/kafka/libs
# 上传 Kafka Connect JDBC Connector
unzip confluentinc-kafka-connect-jdbc-10.4.1.zip
chown -R 1001:1001 confluentinc-kafka-connect-jdbc-10.4.1
docker cp confluentinc-kafka-connect-jdbc-10.4.1 connect:/kafka/connect
# 重启 Kafka Connect 连接器
docker restart connect
查看现有连接器信息
- 安装 jq,用于格式化 JSON 格式
yum install -y wget wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum install -y jq
- 查看当前存在哪些连接器
curl localhost:8083/connectors/ | jq
- 查看连接器的具体信息
curl localhost:8083/connectors/oracle-scott-connector | jq
配置连接目标端 PostgreSQL 的连接器
- 查看下目标端 PostgreSQL 容器的IP地址
[root@docker ~]# docker inspect postgres |grep IPAddress
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.7",
"IPAddress": "172.17.0.7",
- 编辑一个 JSON 文件,配置连接器信息
JDBC Sink Connector Configuration Properties: https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html#sink-config-options
[root@docker ~]# vi pgsql-scott-jdbc-sink.json
{
"name": "pgsql-scott-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://172.17.0.7:5432/scott?user=postgres&password=postgres&sslMode=require",
"tasks.max": "1",
"topics": "oracle19c.SCOTT.DEPT",
"table.name.format": "dept",
"dialect.name": "PostgreSqlDatabaseDialect",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true"
}
}
- 向 Kafka 连接器注册 JDBC Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
查看已注册的连机器信息
# 当前已注册的连接器
curl localhost:8083/connectors/ | jq
# 连接器的具体信息
curl localhost:8083/connectors/pgsql-scott-jdbc-sink | jq
- 查看连接器的运行状态
curl localhost:8083/connectors/pgsql-scott-jdbc-sink/status | jq
一个排错过程(可选择性跳过)
- 向 Kafka 连接器注册 JDBC Sink Connector 之后,连接器会自动连接到 PostgreSQL 上建表插入数据
- 查看连接器的运行状态发现有问题
- 查看 Kafka 连接器日志发现建表语句有问题
- 这才了解为啥要在连接器的配置信息中加 “table.name.format”: “dept”
- postgresql大神的文章:Debezium替换ogg的测试(三) 下游PG数据库实时同步 提供了两种更改连接器配置信息的方法,我直接使用暴力的方法
# 1.删除连接器
curl -v -X DELETE 192.168.0.40:8083/connectors/pgsql-scott-jdbc-sink
# 2.修改json文件
# 3.重新添加连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
验证目标端的数据
Oracle 端模拟业务
- INSERT
[root@docker ~]# sqlplus scott/scott@192.168.0.40:1521/pdbtt
SQL> insert into dept values (70,'BBBB','BB');
SQL> commit;
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。