Debezium 提供的 Docker 测试环境
- 才发现 Debezium 提供了一个使用 docker-compose 进行自动部署的测试环境,非常的好用,根据自己的环境改改,可以测试很多场景。
- 下载地址:https://github.com/debezium/debezium-examples
- 使用教程:https://github.com/debezium/debezium-examples/tree/main/tutorial
下载解压,可以看到 tutorial 提供的测试用例
- Debezium 提供的测试环境主要是针对 MySQL 的,本文测试主要针对 PostgreSQL,根据 MySQL 的相关测试修改一下,就可以用于 PostgreSQL 的测试了。
[root@docker ~]# unzip debezium-examples-main.zip
[root@docker ~]# cd debezium-examples-main/tutorial
[root@docker tutorial]# ls -lrt
total 768
-rw-r--r--. 1 root root 602664 Apr 19 22:42 vitess-sharding-setup.png
drwxr-xr-x. 2 root root 30 Apr 19 22:42 secrets
-rw-r--r--. 1 root root 521 Apr 19 22:42 register-vitess.json
-rw-r--r--. 1 root root 538 Apr 19 22:42 register-sqlserver.json
-rw-r--r--. 1 root root 448 Apr 19 22:42 register-postgres.json
-rw-r--r--. 1 root root 582 Apr 19 22:42 register-oracle-logminer.json
-rw-r--r--. 1 root root 568 Apr 19 22:42 register-mysql.json
-rw-r--r--. 1 root root 637 Apr 19 22:42 register-mysql-ext-secrets.json
-rw-r--r--. 1 root root 860 Apr 19 22:42 register-mysql-avro.json
-rw-r--r--. 1 root root 878 Apr 19 22:42 register-mysql-apicurio.json
-rw-r--r--. 1 root root 1172 Apr 19 22:42 register-mysql-apicurio-converter-json.json
-rw-r--r--. 1 root root 1166 Apr 19 22:42 register-mysql-apicurio-converter-avro.json
-rw-r--r--. 1 root root 437 Apr 19 22:42 register-mongodb.json
-rw-r--r--. 1 root root 576 Apr 19 22:42 register-db2.json
-rw-r--r--. 1 root root 22923 Apr 19 22:42 README.md
-rw-r--r--. 1 root root 1955 Apr 19 22:42 docker-compose-zookeeperless-kafka.yaml
-rw-r--r--. 1 root root 1616 Apr 19 22:42 docker-compose-zookeeperless-kafka-combined.yaml
-rw-r--r--. 1 root root 885 Apr 19 22:42 docker-compose-vitess.yaml
-rw-r--r--. 1 root root 1119 Apr 19 22:42 docker-compose-sqlserver.yaml
-rw-r--r--. 1 root root 1082 Apr 19 22:42 docker-compose-postgres.yaml
-rw-r--r--. 1 root root 927 Apr 19 22:42 docker-compose-oracle.yaml
-rw-r--r--. 1 root root 887 Apr 19 22:42 docker-compose-mysql.yaml
-rw-r--r--. 1 root root 1068 Apr 19 22:42 docker-compose-mysql-ext-secrets.yml
-rw-r--r--. 1 root root 1671 Apr 19 22:42 docker-compose-mysql-avro-worker.yaml
-rw-r--r--. 1 root root 1391 Apr 19 22:42 docker-compose-mysql-avro-connector.yaml
-rw-r--r--. 1 root root 1036 Apr 19 22:42 docker-compose-mysql-apicurio.yaml
-rw-r--r--. 1 root root 43764 Apr 19 22:42 docker-compose-mysql-apicurio.png
-rw-r--r--. 1 root root 1094 Apr 19 22:42 docker-compose-mongodb.yaml
-rw-r--r--. 1 root root 1098 Apr 19 22:42 docker-compose-db2.yaml
-rw-r--r--. 1 root root 930 Apr 19 22:42 docker-compose-cassandra.yaml
drwxr-xr-x. 3 root root 36 Apr 19 22:42 debezium-with-oracle-jdbc
drwxr-xr-x. 3 root root 74 Apr 19 22:42 debezium-vitess-init
drwxr-xr-x. 2 root root 27 Apr 19 22:42 debezium-sqlserver-init
drwxr-xr-x. 4 root root 41 Apr 19 22:42 debezium-db2-init
drwxr-xr-x. 2 root root 141 Apr 19 22:42 debezium-cassandra-init
drwxr-xr-x. 2 root root 26 Apr 19 22:42 db2data
安装 docker-compose
- Docker Compose 是一个在 Docker 上运行多容器应用程序的工具。
- Docker Compose V2 是 Docker Compose 的主要升级版本,使用 Golang 完全重写的,V1 是用 Python 编写的。
- 在 Github 上的地址:https://github.com/docker/compose
- 本示例使用的 docker-compose 的下载地址:https://github.com/docker/compose/releases/download/v2.4.1/docker-compose-linux-x86_64
# 下载的二进制文件,给个可执行权限就可以直接运行,为了方便修改下文件名
[root@docker ~]# chmod +x docker-compose-linux-x86_64
[root@docker ~]# mv docker-compose-linux-x86_64 docker-compose
简单管理 docker-compose
# 帮助
/root/docker-compose -h
# 创建并启动项目中的所有容器
/root/docker-compose -f xxx.yaml up
# 停止并删除项目中的所有容器
/root/docker-compose -f xxx.yaml down
# 重启项目中的服务(单个容器),以下示例重启connect容器
/root/docker-compose -f xxx.yaml restart connect
# 列出项目中所有的容器
/root/docker-compose -f xxx.yaml ps
测试 Avro
Avro 有三种配置方式,第一种在 Kafka Connect Worker 配置,第二种在 Debezium 连接器上配置,第三种是使用 Apicurio 注册表
第一种配置方式:在 Kafka Connect Worker 配置
编辑 docker-compose 的配置文件
- tutorial 里面只有 MySQL 的 docker-compose-mysql-avro-worker.yaml,仿照这个写一个 PostgreSQL 的 docker-compose 配置文件
- 添加了一个 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-avro-worker.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
links:
- zookeeper
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- KAFKA_CLUSTERS_0_NAME=test
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
启动 docker-compose
- 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、schema-registry、connect 和一个 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-worker.yaml up
注册 PostgreSQL connector
- 使用 Debezium tutorial 中自带的 register-postgres.json
# cd /root/debezium-examples-main/tutorial
# cat register-postgres.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
登录 Kafka Web 查看 Topics 的情况
- http://192.168.0.40:8811/
- 可以看到自动为源端的每个表创建的 Topics
- 可以看到自动为 schemas 创建的 Topics _schemas
- 可以看到每条消息的 key 和 value 都是二进制的
配置网络
- 本实验的源端是 PostgreSQL,目标端是 Oracle 19C PDB,Debezium 提供了 PostgreSQL 的 Docker 镜像,但是没有 Oracle 的镜像。
- 在 Docker 上安装 Oracle 参考:使用Docker装一个Oracle 19C的单机测试环境
- 使用 docker-compose 部署的环境会建立一个默认的网络,名称为 docker-compose.yml 所在目录名称小写形式加上 “_default”,这里就是 tutorial_default。
- 在 Docker 上安装 Oracle 使用的默认网络,这样和 docker-compose 部署的环境,网络是相互隔离的。
- 为了让 docker-compose 部署后的 connect 容器能与 Oracle 相连通,需要在 connect 容器上添加 Docker 的默认网络。
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
# docker inspect tutoral-connect-1 |grep IPAddress
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.3",
"IPAddress": "172.17.0.3",
"IPAddress": "172.26.0.3",
注册一个消费者连接器
- 消费者连接器使用的是 Kafka Connect JDBC,消费到 Oracle 19C PDB 中
- Debezium 提供的 connect 容器中没有 Kafka Connect JDBC,需要自行下载并上传,重启 connect 容器
# 上传 Kafka Connect JDBC
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重启 connect 服务
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-worker.yaml restart connect
- 编辑消费者的连接器并注册到 Kafka Connect
[root@docker ~]# cat oracle-jdbc-sink.json
{
"name": "oracle-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//10.16.0.1:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink.json
- 查看消费的数据
SQL> desc INVENTORY.ORDERS;
SQL> select * from INVENTORY.ORDERS;
停止并删除容器,清理测试环境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-avro-worker.yaml down
第二种配置方式:在 Debezium 连接器上配置
编辑 docker-compose 的配置文件
- tutorial 里面只有 MySQL 的 docker-compose-mysql-avro-connector.yaml,仿照这个写一个 PostgreSQL 的 docker-compose 配置文件
- 添加了一个 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-avro-connector.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
links:
- zookeeper
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- KAFKA_CLUSTERS_0_NAME=test
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
启动 docker-compose
- 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、schema-registry、connect 和一个 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml up
注册 PostgreSQL connector
- tutorial 里面只有 MySQL 的 register-mysql-avro.json,仿照这个写一个 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-avro.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-avro.json
查看 customers schema
curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1
curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
- 服务注册表还带有一个可以读取 Avro 消息的控制台使用者:
# cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml exec schema-registry /usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property schema.registry.url=http://schema-registry:8081 \
--topic dbserver1.inventory.customers
配置网络
- 详细说明见 上一种配置方式
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
注册一个消费者连接器
- 消费者连接器使用的是 Kafka Connect JDBC,消费到 Oracle 19C PDB 中
- Debezium 提供的 connect 容器中没有 Kafka Connect JDBC,需要自行下载并上传,重启 connect 容器
# 上传 Kafka Connect JDBC
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重启 connect 服务
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml restart connect
- 编辑消费者的连接器并注册到 Kafka Connect
[root@docker tutorial]# cat oracle-jdbc-sink-avro.json
{
"name": "oracle-jdbc-sink-avro",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-avro.json
- 查看消费的数据
SQL> desc INVENTORY.ORDERS;
SQL> select * from INVENTORY.ORDERS;
停止并删除容器,清理测试环境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-avro-connector.yaml down
第三种配置方式:使用 Apicurio 注册表
Apicurio Registry 是一个开源 API 和 schema 注册表,除其他外,可用于存储 Kafka 记录的 schema。 它提供
- 它自己的原生 Avro 转换器和 Protobuf 序列化器
- 将其 schema 导出到注册表的 JSON 转换器
- 与 IBM 或 Confluent 等其他 schema 注册表的兼容层; 它可以与 Confluent Avro 转换器一起使用。
编辑 docker-compose 的配置文件
- tutorial 里面只有 MySQL 的 docker-compose-mysql-apicurio.yaml,仿照这个写一个 PostgreSQL 的 docker-compose 配置文件
- 添加了一个 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-apicurio.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
apicurio:
image: apicurio/apicurio-registry-mem:2.0.0.Final
ports:
- 8080:8080
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
- apicurio
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- ENABLE_APICURIO_CONVERTERS=true
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- KAFKA_CLUSTERS_0_NAME=test
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
启动 docker-compose
- 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、apicurio、connect 和一个 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml up
注册 PostgreSQL connector (Apicurio - JSON 格式)
- tutorial 里面只有 MySQL 的 register-mysql-apicurio-converter-json.json,仿照这个写一个 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio-converter-json.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio-converter-json.json
注册 PostgreSQL connector (Apicurio - Avro 格式)
- tutorial 里面只有 MySQL 的 register-mysql-apicurio-converter-avro.json,仿照这个写一个 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio-converter-avro.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio-converter-avro.json
注册 PostgreSQL connector (Confluent - Avro 格式)
- tutorial 里面只有 MySQL 的 register-mysql-apicurio.json,仿照这个写一个 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6",
"value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio.json
查看 customers schema
# Apicurio - JSON 格式和 Avro 格式
curl -X GET http://localhost:8080/apis/registry/v2/groups/default/artifacts/dbserver1.inventory.customers-value | jq .
# Confluent - Avro 格式
curl -X GET http://localhost:8080/apis/ccompat/v6/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
- 服务注册表还带有一个可以读取 Avro 消息的控制台使用者:
# Apicurio - JSON 格式
# cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
当您查看数据消息时,您会注意到它仅包含payload但不包含schema部分,因为它已外部化到注册表中。
查看 Topics
- Apicurio - JSON
- Apicurio - Avro
- Confluent - Avro
配置网络
- 详细说明见 上一种配置方式
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
注册一个消费者连接器
- 消费者连接器使用的是 Kafka Connect JDBC,消费到 Oracle 19C PDB 中
- Debezium 提供的 connect 容器中没有 Kafka Connect JDBC,需要自行下载并上传,重启 connect 容器
# 上传 Kafka Connect JDBC
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重启 connect 服务
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml restart connect
- 编辑消费者的连接器并注册到 Kafka Connect (Apicurio - JSON 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-json.json
{
"name": "oracle-jdbc-sink-apicurio-json",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-json.json
- 消费端没走通,报错:Tolerance exceeded in error handler
- 编辑消费者的连接器并注册到 Kafka Connect (Apicurio - Avro 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-avro.json
{
"name": "oracle-jdbc-sink-apicurio-avro",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-avro.json
- 编辑消费者的连接器并注册到 Kafka Connect (Confluent - Avro 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-avro2.json
{
"name": "oracle-jdbc-sink-apicurio-avro2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6",
"value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-avro2.json
停止并删除容器,清理测试环境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-apicurio.yaml down
最后修改时间:2022-04-27 19:52:15
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。