准备工作
本文以mysql作为demo,同步mysql里面的数据到kafka消息中间件,我们以docker compose构建zk,kafka,mysql,debezium-connect相关服务
docker-compose zookeeper kafka mysql debezium-connect
使用docker-compose启动服务
编写docker-compose.yaml
version: '3'
services:
zookeeper:
container_name: zookeeper
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
- '2888:2888'
- '3888:3888'
networks:
- dbz-net
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
container_name: kafka
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
- '9093:9093'
depends_on:
- zookeeper
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
# - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
networks:
- dbz-net
db-mysql:
container_name: db-mysql
image: quay.io/debezium/example-mysql:2.0
ports:
- "63306:3306"
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
networks:
- dbz-net
connect:
container_name: connect
image: quay.io/debezium/connect:nightly
ports:
- "8083:8083"
depends_on:
- kafka
- db-mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
# connector相关信息持久化到kafka指定的topic
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- ENABLE_DEBEZIUM_SCRIPTING=true
# 这个选项是启用restful api的插件,以供debezium-ui服务使用
- CONNECT_REST_EXTENSION_CLASSES=io.debezium.kcrestextension.DebeziumConnectRestExtension
networks:
- dbz-net
debezium-ui:
container_name: debezium-ui
image: quay.io/debezium/debezium-ui:nightly
ports:
- "8080:8080"
environment:
- KAFKA_CONNECT_URIS=http://connect:8083
depends_on:
- connect
networks:
- dbz-net
networks:
#定义网络
dbz-net:
external: false
我们使用docker-compose起一组服务,有zk,kafka,mysql,connect,debezium-ui等服务,处理相互的依赖关系。
启动服务 在docker-compose.yaml这个文件所在路径执行命令
docker compose up -d
等待镜像下载完成并启动服务即可
docker compose ps
NAME COMMAND SERVICE STATUS PORTS
connect "/docker-entrypoint.…" connect running 0.0.0.0:8083->8083/tcp
db-mysql "docker-entrypoint.s…" db-mysql running 0.0.0.0:63306->3306/tcp
debezium-ui "/deployments/run-ja…" debezium-ui running 0.0.0.0:8080->8080/tcp
kafka "/docker-entrypoint.…" kafka running 0.0.0.0:9092->9092/tcp
服务启动成功之后,我们就可以添加debezium-connector配置了,我们可以在web界面上添加,可以通过接口直接添加提前准备好的配置,生产环境配置我更喜欢使用配置文件进行配置。

mysql配置文件
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
// 由于使用docker-compose编排,connect服务可以通过服务名访问mysql,这里可以是域名和ip
"database.hostname": "db-mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
// mysql集群的唯一id,建议设置,v2.0版本不设置配置校验无法通过
"database.server.id": "10000",
// 这个配置很重要,有些默认topic和它有关
"database.server.name": "mysql_demo",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
// 服务启动后会保存一份表结构到这个topic
"database.history.kafka.topic": "schema-changes.inventory",
// 快照方式,一般新创建的connector需要进行全量同步需要这个选项
"snapshot.mode": "initial",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"decimal.handing.mode": "double",
"transforms": "snapshotasinsert",
"transforms.snapshotasinsert.type": "io.debezium.connector.mysql.transforms.ReadToInsertEvent"
}
}
使用命令注册connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @mysql.json
执行完毕之后返回http status为201即为成功,我们也可以通过web界面进行查看。

查看启动后初始化的消息 这里我们可以使用offset explorer工具很方便的查看kafka中的消息内容。

前三个topic是debezium持久化connector信息用的,内部使用。mysql_demo这个topic是用于投递目标数据库schema变更事件的消息,mysql_demo.*.*相关topic是每个表对应数据变更,默认每个表一个topic,我们前面有提到mysql_demo其实是配置里面database.server.name属性,所以topic是serverName.databaseName.tableName的形式。
当然同步的数据可以同步到单个topic或者按照database投递到指定topic都是可以,这个依赖tranformation属性,下一章节将专门介绍。
schema-changes.inventory这个topic是通过database.history.kafka.topic属性设置,是connector内部用于记录schema结构的消息。
本文主要讲述了debezium connector的构建流程,后续的章节会介绍消息内容,将数据变更通过flinksql写入其他存储中,后续讲解flinkCDC模块也会基于debezium进行介绍。




