暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Debezium替换ogg的测试(三) 下游PG数据库实时同步

前言

最近7-1忙着各种值班,没时间写东西,第三篇拖更了许久。今天终于抽点时间把这个系列完成了,没有太监。

Kafka到PostgreSQL

其实从kafka到PostgreSQL很简单,我们要理解的一点是kafka既可以push也可以poll。不需要我们重复发明轮子,我们只需要写好配置文件在把它调起来就可以了。

研究了一番,发现目标端都是使用JDBC Connector来实现的。那么我们也采用JDBC Connector的方案。要使用JDBC需要先下载Kafka Connect JDBC 连接器, 它是由Confluent开发的。

我们需要做两件事。

  1. 把PostgreSQL jdbc驱动放在/kafka/libs目录下。我用的是postgresql-42.2.14.jar。下载地址:https://jdbc.postgresql.org/download.html。
  2. 下载Kafka Connect JDBC 连接器放入到插件目录下。下载地址:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

做完上述操作,我们接下来就来进行配置。今天我们要使用RESTAPI进行配置,我们先查看我们的配置情况。

curl localhost:8083/connectors/ | jq

如图所示,我们可以看到我们Oracle数据库配置的连接器testoracledb,可以进一步查看详细的配置。

curl localhost:8083/connectors/testoracledb | jq

这里可以看到它的type是source,这就代表它是源端。

那么目标端我们先配置一个jdbc-sink.json文件。内容如下:

{  "name""jdbc-sink",  "config": {    "connector.class""io.confluent.connect.jdbc.JdbcSinkConnector",         "tasks.max""1",      "topics""12cdb.HR.S1",      "dialect.name""PostgreSqlDatabaseDialect",          "table.name.format""s1",          "connection.url""jdbc:postgresql://192.168.56.170:5432/postgres?user=postgres&password=Sqlite123&sslMode=require",          "transforms""unwrap",          "transforms.unwrap.type""io.debezium.transforms.ExtractNewRecordState",          "transforms.unwrap.drop.tombstones""false",      "auto.create""true"    }}

然后把这个文件POST进去。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json

使用RESTAPI POST成功之后,我们可以查看现在的状态。

curl localhost:8083/connectors/jdbc-sink | jq     

这里出现了一个jdbc-sink的连接器。查看jdbc-sink的状态。

curl localhost:8083/connectors/jdbc-sink | jq

下游数据库应用这里的type就必须是sink。可以查看它的工作状态。

curl localhost:8083/connectors/jdbc-sink/status | jq

这里都显示RUNNING就是正常的。

测试效果

我们在Oracle里面插入记录。

查看我们的PostgreSQL中是否存在这条记录。

配置有问题如何修改

当然可能会出现各种各样的配置问题,那么如何修改呢?

暴力的方法可以直接删除连接器,修改完json文件再添加连接器。这只适用于一开始就配置失败的情况。

1.删除连接器curl -v -X DELETE localhost:8083/connectors/jdbc-sink 2.修改json文件3.重新添加连接器curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json

还可以选择再创建一个jdbc-sink.put的配置文件,修改你的配置,比如这里增加delete.enabled参数

{    "connector.class""io.confluent.connect.jdbc.JdbcSinkConnector",        "tasks.max""1",    "topics""12cdb.HR.S1",    "dialect.name""PostgreSqlDatabaseDialect",        "table.name.format""s1",        "connection.url""jdbc:postgresql://192.168.56.170:5432/postgres?user=postgres&password=Sqlite123",     "transforms""unwrap",        "transforms.unwrap.type""io.debezium.transforms.ExtractNewRecordState",        "transforms.unwrap.drop.tombstones""false",    "auto.create""true",    "delete.enabled""true" }

然后执行更新连接器配置操作。

curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/jdbc-sink/config -d @jdbc-sink.put

更新完成后执行重启就行了。

 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/jdbc-sink/restart

后记

整个Debezium替换ogg的测试文章已经完成,后续我准备在生产割接环境上进行测试。这条路是能走通的,但是肯定有一堆的坑要填埋。

文章转载自励志成为postgresql大神,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论