暂无图片
暂无图片
5
暂无图片
暂无图片
暂无图片

在Docker环境上使用Debezium捕获Oracle 19C PDB中的变更数据到Kafka

原创 张玉龙 2022-04-17
3910

实验环境

启动 Zookeeper

# 后台运行 docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9 # 实时查看 zookeeper 的日志信息 docker logs -f -t --tail 10 zookeeper

启动 Kafka

# 后台运行 docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9 # 实时查看 kafka 的日志信息 docker logs -f -t --tail 10 kafka

启动 Oracle 19C 数据库

[oracle@ora11g ~]$ sqlplus scott/scott@192.168.0.40:1521/pdbtt SQL> set line 100 pages 100 SQL> select * from scott.emp; EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO ---------- ---------- --------- ---------- --------- ---------- ---------- ---------- 7369 SMITH CLERK 7902 17-DEC-80 800 20 7499 ALLEN SALESMAN 7698 20-FEB-81 1600 300 30 7521 WARD SALESMAN 7698 22-FEB-81 1250 500 30 7566 JONES MANAGER 7839 02-APR-81 2975 20 7654 MARTIN SALESMAN 7698 28-SEP-81 1250 1400 30 7698 BLAKE MANAGER 7839 01-MAY-81 2850 30 7782 CLARK MANAGER 7839 09-JUN-81 2450 10 7788 SCOTT ANALYST 7566 19-APR-87 3000 20 7839 KING PRESIDENT 17-NOV-81 5000 10 7844 TURNER SALESMAN 7698 08-SEP-81 1500 0 30 7876 ADAMS CLERK 7788 23-MAY-87 1100 20 7900 JAMES CLERK 7698 03-DEC-81 950 30 7902 FORD ANALYST 7566 03-DEC-81 3000 20 7934 MILLER CLERK 7782 23-JAN-82 1300 10 14 rows selected.

配置 Oracle 19C 数据库

  • 确保数据库启动归档模式,使用CDB的服务登录
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba SQL> archive log list; Database log mode Archive Mode Automatic archival Enabled Archive destination /opt/oracle/oradata/ORCL/archive_logs Oldest online log sequence 5 Next log sequence to archive 7 Current log sequence 7
  • 启用最小补充日志,使用CDB的服务登录
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba -- 启用最小补充日志 SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; -- 切换到 PDB 中,为表启用补充日志 SQL> alter session set container=pdbtt; ALTER TABLE scott.DEPT ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; ALTER TABLE scott.EMP ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; ALTER TABLE scott.BONUS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; ALTER TABLE scott.SALGRADE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  • 在 CDB 和 PDB 中创建 LogMiner 用户使用的表空间
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCL/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; -- 切换到 PDB 中,创建表空间 SQL> alter session set container=pdbtt; CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCL/PDBTT/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  • 在 CDB 中创建 LogMiner 用户并授予相关权限
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL; GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL; GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL; GRANT LOGMINING TO c##dbzuser CONTAINER=ALL; GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL; GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL; GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL;

测试数据库层面 Logminer

[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba SQL> select member from v$logfile; MEMBER -------------------------------------------------------------------------------- /opt/oracle/oradata/ORCL/redo03.log /opt/oracle/oradata/ORCL/redo02.log /opt/oracle/oradata/ORCL/redo01.log SQL> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/ORCL/redo01.log',dbms_logmnr.new); SQL> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/ORCL/redo02.log',dbms_logmnr.addfile); SQL> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/ORCL/redo03.log',dbms_logmnr.addfile); SQL> execute dbms_logmnr.start_logmnr(options=>dbms_logmnr.dict_from_online_catalog+dbms_logmnr.committed_data_only); SQL> select sql_redo,sql_undo from v$logmnr_contents where table_name like '%DEPT%' and OPERATION='INSERT'; SQL_REDO ---------------------------------------------------------------------------------------------------- SQL_UNDO --------------------------------------------------------------------------------------------------------------- insert into "SCOTT"."DEPT"("DEPTNO","DNAME","LOC") values ('10','ACCOUNTING','NEW YORK'); delete from "SCOTT"."DEPT" where "DEPTNO" = '10' and "DNAME" = 'ACCOUNTING' and "LOC" = 'NEW YORK' and ROWID = 'AAAR1DAAMAAAACDAAA'; insert into "SCOTT"."DEPT"("DEPTNO","DNAME","LOC") values ('20','RESEARCH','DALLAS'); delete from "SCOTT"."DEPT" where "DEPTNO" = '20' and "DNAME" = 'RESEARCH' and "LOC" = 'DALLAS' and ROWID = 'AAAR1DAAMAAAACDAAB'; insert into "SCOTT"."DEPT"("DEPTNO","DNAME","LOC") values ('30','SALES','CHICAGO'); delete from "SCOTT"."DEPT" where "DEPTNO" = '30' and "DNAME" = 'SALES' and "LOC" = 'CHICAGO' and ROWID = 'AAAR1DAAMAAAACDAAC'; insert into "SCOTT"."DEPT"("DEPTNO","DNAME","LOC") values ('40','OPERATIONS','BOSTON'); delete from "SCOTT"."DEPT" where "DEPTNO" = '40' and "DNAME" = 'OPERATIONS' and "LOC" = 'BOSTON' and ROWID = 'AAAR1DAAMAAAACDAAD'; -- 下面语句为结束语句 SQL> execute dbms_logmnr.end_logmnr;

启动 Kafka Connect

# 后台运行 docker run -d --name connect \ -p 8083:8083 \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e STATUS_STORAGE_TOPIC=my_connect_statuses \ --link zookeeper:zookeeper \ --link kafka:kafka \ quay.io/debezium/connect:1.9 # 实时查看 Kafka Connect 的日志信息 docker logs -f -t --tail 10 connect

Debezium Oracle connector

[root@docker ~]# mv ojdbc8-19.3.0.0.jar ojdbc8.jar [root@docker ~]# docker cp ojdbc8.jar connect:/kafka/libs [root@docker ~]# docker restart connect
  • 准备 Debezium Oracle connector 配置文件
    将配置文件创建在 docker 宿主机上即可,connect 容器开放了 REST API 来管理 Debezium 的连接器
    database.hostname 需要使用容器内的IP地址,不然加不上
[root@docker ~]# vi oracle-scott-connector.json { "name": "oracle-scott-connector", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "database.hostname" : "172.17.0.3", "database.port" : "1521", "database.user" : "c##dbzuser", "database.password" : "dbz", "database.dbname" : "ORCL", "database.pdb.name" : "PDBTT", "database.server.name" : "oracle19c", "tasks.max" : "1", "schema.include.list": "SCOTT", "database.history.kafka.bootstrap.servers" : "192.168.0.40:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
  • 向 Kafka 连接器注册 Debezium Oracle connector
[root@docker ~]# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-scott-connector.json HTTP/1.1 201 Created Date: Sat, 16 Apr 2022 17:03:12 GMT Location: http://192.168.0.40:8083/connectors/oracle-scott-connector Content-Type: application/json Content-Length: 534 Server: Jetty(9.4.43.v20210629) {"name":"oracle-scott-connector","config":{"connector.class":"io.debezium.connector.oracle.OracleConnector","database.hostname":"172.17.0.3","database.port":"1521","database.user":"c##dbzuser","database.password":"dbz","database.dbname":"ORCL","database.pdb.name":"PDBTT","database.server.name":"oracle19c","tasks.max":"1","schema.include.list":"SCOTT","database.history.kafka.bootstrap.servers":"192.168.0.40:9092","database.history.kafka.topic":"schema-changes.inventory","name":"oracle-scott-connector"},"tasks":[],"type":"source"}

核对捕获到的数据

  • 进入到connect容器内部 执行
[root@docker ~]# docker exec -it connect bash [kafka@839c4a43b889 ~]$ bin/kafka-topics.sh --list --bootstrap-server kafka:9092 __consumer_offsets my_connect_configs my_connect_offsets my_connect_statuses oracle19c oracle19c.SCOTT.DEPT oracle19c.SCOTT.EMP oracle19c.SCOTT.SALGRADE schema-changes.inventory [kafka@839c4a43b889 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic schema-changes.inventory --from-beginning [kafka@839c4a43b889 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic oracle19c.SCOTT.DEPT --from-beginning
  • DDL
    image.png

  • DML
    image.png

模拟业务

  • INSERT
    image.png
    image.png

使用 kafka-ui 查看 Kafka 里的消息

kafka-ui:Open-Source Web GUI for Apache Kafka Management:https://github.com/provectus/kafka-ui

docker run -p 8811:8080 \ -e KAFKA_CLUSTERS_0_NAME=oracle-scott-connector \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=192.168.0.40:9092 \ -d provectuslabs/kafka-ui:latest

网页登录:http://192.168.0.40:8811/

image.png
image.png
image.png
image.png
image.png
image.png

最后修改时间:2022-04-18 09:29:42
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论