暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

在Docker环境上使用Kafka Connect JDBC将变更数据从Kafka应用到PostgreSQL

原创 张玉龙 2022-04-18
2060

实验环境

启动 PostgreSQL 并创建测试库

PostgreSQL 的 Docker 仓库:https://hub.docker.com/_/postgres

# 创建一个数据持久化目录 mkdir -p /docker_data/postgres chmod -R a+rwx /docker_data/postgres/ # 后台运行14.2版本的 PostgreSQL 数据库 docker run -d --name postgres \ -p 5432:5432 \ -e POSTGRES_PASSWORD=postgres \ -e PGDATA=/var/lib/postgresql/data/pgdata \ -v /docker_data/postgres:/var/lib/postgresql/data \ postgres:14.2 # 运行 psql 容器 [root@docker ~]# alias psql="docker run -it --rm --name psql postgres:14.2 psql -h 192.168.0.40 -U postgres -p 5432" [root@docker ~]# psql Password for user postgres: psql (14.2 (Debian 14.2-1.pgdg110+1)) Type "help" for help. postgres=# create database scott; CREATE DATABASE postgres=# \l List of databases Name | Owner | Encoding | Collate | Ctype | Access privileges -----------+----------+----------+------------+------------+----------------------- postgres | postgres | UTF8 | en_US.utf8 | en_US.utf8 | scott | postgres | UTF8 | en_US.utf8 | en_US.utf8 | template0 | postgres | UTF8 | en_US.utf8 | en_US.utf8 | =c/postgres + | | | | | postgres=CTc/postgres template1 | postgres | UTF8 | en_US.utf8 | en_US.utf8 | =c/postgres + | | | | | postgres=CTc/postgres (4 rows)

配置 JDBC Sink Connector

上传驱动和JDBC连接器

将下载的 postgresql JDBC 驱动和 Kafka Connect JDBC Connector(连接器) 上传服务器并复制到 connect 容器中

[root@docker ~]# ls -lrt -rw-r--r--. 1 root root 1006732 Apr 17 22:12 postgresql-42.2.25.jar -rw-r--r--. 1 root root 20208429 Apr 17 22:12 confluentinc-kafka-connect-jdbc-10.4.1.zip # 上传 postgresql JDBC 驱动,如果使用 Kafka Connect JDBC Connector 自带的驱动可以忽略此处 docker cp postgresql-42.2.25.jar connect:/kafka/libs # 上传 Kafka Connect JDBC Connector unzip confluentinc-kafka-connect-jdbc-10.4.1.zip chown -R 1001:1001 confluentinc-kafka-connect-jdbc-10.4.1 docker cp confluentinc-kafka-connect-jdbc-10.4.1 connect:/kafka/connect # 重启 Kafka Connect 连接器 docker restart connect

查看现有连接器信息

  • 安装 jq,用于格式化 JSON 格式
yum install -y wget wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum install -y jq
  • 查看当前存在哪些连接器
curl localhost:8083/connectors/ | jq

image.png

  • 查看连接器的具体信息
curl localhost:8083/connectors/oracle-scott-connector | jq

image.png

配置连接目标端 PostgreSQL 的连接器

  • 查看下目标端 PostgreSQL 容器的IP地址
[root@docker ~]# docker inspect postgres |grep IPAddress "SecondaryIPAddresses": null, "IPAddress": "172.17.0.7", "IPAddress": "172.17.0.7",
[root@docker ~]# vi pgsql-scott-jdbc-sink.json { "name": "pgsql-scott-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://172.17.0.7:5432/scott?user=postgres&password=postgres&sslMode=require", "tasks.max": "1", "topics": "oracle19c.SCOTT.DEPT", "table.name.format": "dept", "dialect.name": "PostgreSqlDatabaseDialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } }
  • 向 Kafka 连接器注册 JDBC Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json

查看已注册的连机器信息

# 当前已注册的连接器 curl localhost:8083/connectors/ | jq # 连接器的具体信息 curl localhost:8083/connectors/pgsql-scott-jdbc-sink | jq

image.png

  • 查看连接器的运行状态
curl localhost:8083/connectors/pgsql-scott-jdbc-sink/status | jq

image.png

一个排错过程(可选择性跳过)

  • 向 Kafka 连接器注册 JDBC Sink Connector 之后,连接器会自动连接到 PostgreSQL 上建表插入数据
  • 查看连接器的运行状态发现有问题
    image.png
  • 查看 Kafka 连接器日志发现建表语句有问题
    image.png
  • 这才了解为啥要在连接器的配置信息中加 “table.name.format”: “dept”
    image.png
  • postgresql大神的文章:Debezium替换ogg的测试(三) 下游PG数据库实时同步 提供了两种更改连接器配置信息的方法,我直接使用暴力的方法
# 1.删除连接器 curl -v -X DELETE 192.168.0.40:8083/connectors/pgsql-scott-jdbc-sink # 2.修改json文件 # 3.重新添加连接器 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json

验证目标端的数据

image.png

Oracle 端模拟业务

  • INSERT
[root@docker ~]# sqlplus scott/scott@192.168.0.40:1521/pdbtt SQL> insert into dept values (70,'BBBB','BB'); SQL> commit;

image.png

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论