参考文档
https://debezium.io/documentation/reference/1.9/configuration/avro.html
背景
Debezium 连接器运行在 Kafka Connect 框架中,通过生成变更事件记录写入到Kafka主题(topic)来捕获和记录数据库中的每个行级更改,在 Debezium 连接器将变更事件记录写入到Kafka主题(topic)之前,需要将捕获的记录进行转换,转换成 Kafka 可以存储的形式,Kafka Connect 提供了一个 JSON 转换器,可将记录的 keys 和 values 序列化为 JSON 格式,但是 JSON 转换器默认是包含记录的消息 schema,这使得每条记录都非常冗长。
- Key
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "DEPTNO"
}
],
"optional": false,
"name": "ora19c.SCOTT.DEPT.Key"
},
"payload": {
"DEPTNO": 10
}
}
- Values
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "DEPTNO"
},
{
"type": "string",
"optional": true,
"field": "DNAME"
},
{
"type": "string",
"optional": true,
"field": "LOC"
}
],
"optional": true,
"name": "ora19c.SCOTT.DEPT.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "DEPTNO"
},
{
"type": "string",
"optional": true,
"field": "DNAME"
},
{
"type": "string",
"optional": true,
"field": "LOC"
}
],
"optional": true,
"name": "ora19c.SCOTT.DEPT.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "txId"
},
{
"type": "string",
"optional": true,
"field": "scn"
},
{
"type": "string",
"optional": true,
"field": "commit_scn"
},
{
"type": "string",
"optional": true,
"field": "lcr_position"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "ora19c.SCOTT.DEPT.Envelope"
},
"payload": {
"before": null,
"after": {
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
},
"source": {
"version": "1.9.0.Final",
"connector": "oracle",
"name": "ora19c",
"ts_ms": 1650415436277,
"snapshot": "true",
"db": "PDBTT",
"sequence": null,
"schema": "SCOTT",
"table": "DEPT",
"txId": null,
"scn": "6853525",
"commit_scn": null,
"lcr_position": null
},
"op": "r",
"ts_ms": 1650415436280,
"transaction": null
}
}
以上是一条记录在Kafka中的存储形式,keys 和 values 包含了消息 schema,使得每条消息特别冗长,这样会导致存储压力和网络传输压力,关于消息 schema 有什么作用暂时还不清楚,但是 Debezium 还是让考虑关闭记录 schema 的属性,涉及到以下两个参数:
key.converter.schemas.enable value.converter.schemas.enable
- 测试发现,在某些场景,关闭记录 schema 的属性会导致消费端出现问题,建议考虑Avro转换器,以下是一个问题案例
Kafka使用JSON序列化关闭schema后在消费端产生的一个问题
配置参数关闭 message schema 的属性
- 在 Kafka Connect 连接器的配置文件(config/connect-distributed.properties)中配置参数值:
# 因 Kafka Connect 是一个 Docker 容器,容器内也没 vi 编辑器,所有可以考虑在 Docker 宿主机上对容器内的文件进行编辑
# 查看容器内的目录在 Docker 宿主机上的挂载位置
[root@docker ~]# docker inspect connect |grep "/kafka/config" -B1
"Source": "/var/lib/docker/volumes/11bb601aeeb27d02bfa1d238bbbf157dfb5b813a42a1af33b1a79ec25df0ad75/_data",
"Destination": "/kafka/config",
# 编辑 connect-distributed.properties 文件,修改参数
[root@docker ~]# cd /var/lib/docker/volumes/11bb601aeeb27d02bfa1d238bbbf157dfb5b813a42a1af33b1a79ec25df0ad75/_data
[root@docker _data]# ll
total 68
-rw-r--r--. 1 1001 1001 906 Apr 19 14:51 connect-console-sink.properties
-rw-r--r--. 1 1001 1001 909 Apr 19 14:51 connect-console-source.properties
-rw-r--r--. 1 1001 1001 5608 Apr 19 14:52 connect-distributed.properties
-rw-r--r--. 1 1001 1001 883 Apr 19 14:51 connect-file-sink.properties
-rw-r--r--. 1 1001 1001 881 Apr 19 14:51 connect-file-source.properties
-rw-r--r--. 1 1001 1001 2103 Apr 19 14:51 connect-log4j.properties
-rw-r--r--. 1 1001 1001 2540 Apr 19 14:51 connect-mirror-maker.properties
-rw-r--r--. 1 1001 1001 2262 Apr 19 14:51 connect-standalone.properties
-rw-r--r--. 1 1001 1001 1221 Apr 19 14:51 consumer.properties
drwxr-xr-x. 2 1001 1001 102 Apr 19 14:51 kraft
-rw-rw-r--. 1 1001 1001 850 Apr 19 10:03 log4j.properties
-rw-r--r--. 1 1001 1001 1925 Apr 19 14:51 producer.properties
-rw-r--r--. 1 1001 1001 6849 Apr 19 14:51 server.properties
-rw-r--r--. 1 1001 1001 1032 Apr 19 14:51 tools-log4j.properties
-rw-r--r--. 1 1001 1001 1169 Apr 19 14:51 trogdor.conf
-rw-r--r--. 1 1001 1001 1205 Apr 19 14:51 zookeeper.properties
[root@docker _data]# vi connect-distributed.properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false
- 重启连接器使参数生效
# docker restart connect
新建一个连接器,观察效果
新建连接器时需要注意:参数 name、database.server.name、database.history.kafka.topic 建议保持唯一。
[root@docker ~]# cat oracle-scott-connector.json
{
"name": "oracle-scott-connector2",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "ora1",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.inventory1"
}
}
[root@docker ~]# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-scott-connector.json
- Key
{
"DEPTNO": 10
}
- Values
{
"before": null,
"after": {
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
},
"source": {
"version": "1.9.0.Final",
"connector": "oracle",
"name": "ora1",
"ts_ms": 1650414625783,
"snapshot": "true",
"db": "PDBTT",
"sequence": null,
"schema": "SCOTT",
"table": "DEPT",
"txId": null,
"scn": "6732490",
"commit_scn": null,
"lcr_position": null
},
"op": "r",
"ts_ms": 1650414625799,
"transaction": null
}
- 相同数据,在 Kafka 中存储大小的对比,第一张图是没有关闭 schema ,第二张图是关闭了 schema
最后修改时间:2022-04-25 09:24:24
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。