暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

debezium系列-实操篇

程序学社 2022-06-05
2529


准备工作

本文以mysql作为demo,同步mysql里面的数据到kafka消息中间件,我们以docker compose构建zk,kafka,mysql,debezium-connect相关服务

  • docker-compose
  • zookeeper
  • kafka
  • mysql
  • debezium-connect

使用docker-compose启动服务

编写docker-compose.yaml

version: '3'
services:
  zookeeper:
    container_name: zookeeper
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
      - '2888:2888'
      - '3888:3888'
    networks:
      - dbz-net
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    container_name: kafka
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
      - '9093:9093'
    depends_on:
      - zookeeper
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      # - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    networks:
      - dbz-net
  db-mysql:
    container_name: db-mysql
    image: quay.io/debezium/example-mysql:2.0
    ports:
      - "63306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=debezium
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
    networks:
      - dbz-net
  connect:
    container_name: connect
    image: quay.io/debezium/connect:nightly
    ports:
      - "8083:8083"
    depends_on:
      - kafka
      - db-mysql
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      # connector相关信息持久化到kafka指定的topic
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - ENABLE_DEBEZIUM_SCRIPTING=true
      # 这个选项是启用restful api的插件,以供debezium-ui服务使用
      - CONNECT_REST_EXTENSION_CLASSES=io.debezium.kcrestextension.DebeziumConnectRestExtension
    networks:
      - dbz-net
  debezium-ui:
    container_name: debezium-ui
    image: quay.io/debezium/debezium-ui:nightly
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CONNECT_URIS=http://connect:8083
    depends_on:
      - connect
    networks:
      - dbz-net
networks:
#定义网络
  dbz-net:
    external: false

我们使用docker-compose起一组服务,有zk,kafka,mysql,connect,debezium-ui等服务,处理相互的依赖关系。

启动服务 在docker-compose.yaml这个文件所在路径执行命令

docker compose up -d

等待镜像下载完成并启动服务即可

docker compose ps
NAME                COMMAND                  SERVICE             STATUS              PORTS
connect             "/docker-entrypoint.…"   connect             running             0.0.0.0:8083->8083/tcp
db-mysql            "docker-entrypoint.s…"   db-mysql            running             0.0.0.0:63306->3306/tcp
debezium-ui         "/deployments/run-ja…"   debezium-ui         running             0.0.0.0:8080->8080/tcp
kafka               "/docker-entrypoint.…"   kafka               running             0.0.0.0:9092->9092/tcp

服务启动成功之后,我们就可以添加debezium-connector配置了,我们可以在web界面上添加,可以通过接口直接添加提前准备好的配置,生产环境配置我更喜欢使用配置文件进行配置。

mysql配置文件

{
    "name""mysql-connector",
    "config": {
        "connector.class""io.debezium.connector.mysql.MySqlConnector",
        "tasks.max""1",
        // 由于使用docker-compose编排,connect服务可以通过服务名访问mysql,这里可以是域名和ip
        "database.hostname""db-mysql",
        "database.port""3306",
        "database.user""debezium",
        "database.password""dbz",
        // mysql集群的唯一id,建议设置,v2.0版本不设置配置校验无法通过
        "database.server.id""10000",
        // 这个配置很重要,有些默认topic和它有关
        "database.server.name""mysql_demo",
        "database.include.list""inventory",
        "database.history.kafka.bootstrap.servers""kafka:9092",
        // 服务启动后会保存一份表结构到这个topic
        "database.history.kafka.topic""schema-changes.inventory",
        // 快照方式,一般新创建的connector需要进行全量同步需要这个选项
        "snapshot.mode""initial",
        "key.converter.schemas.enable"false,
        "value.converter.schemas.enable"false,
        "decimal.handing.mode""double",
        "transforms""snapshotasinsert",
        "transforms.snapshotasinsert.type""io.debezium.connector.mysql.transforms.ReadToInsertEvent"
    }
}

使用命令注册connector

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @mysql.json

执行完毕之后返回http status为201即为成功,我们也可以通过web界面进行查看。

查看启动后初始化的消息 这里我们可以使用offset explorer工具很方便的查看kafka中的消息内容。

前三个topic是debezium持久化connector信息用的,内部使用。mysql_demo这个topic是用于投递目标数据库schema变更事件的消息,mysql_demo.*.*相关topic是每个表对应数据变更,默认每个表一个topic,我们前面有提到mysql_demo其实是配置里面database.server.name属性,所以topic是serverName.databaseName.tableName的形式。

当然同步的数据可以同步到单个topic或者按照database投递到指定topic都是可以,这个依赖tranformation属性,下一章节将专门介绍。

schema-changes.inventory这个topic是通过database.history.kafka.topic属性设置,是connector内部用于记录schema结构的消息。

本文主要讲述了debezium connector的构建流程,后续的章节会介绍消息内容,将数据变更通过flinksql写入其他存储中,后续讲解flinkCDC模块也会基于debezium进行介绍。


文章转载自程序学社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论