实验环境
- Debezium 版本 1.9 (2022-04-05)
- Debezium Tested Versions
- Oracle 版本是单机的 19.3
- 本测试参考文档:https://debezium.io/documentation/reference/1.9/
- 基于 Debezium 的变更数据捕获的架构:
启动 Zookeeper
# 后台运行
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9
# 实时查看 zookeeper 的日志信息
docker logs -f -t --tail 10 zookeeper
启动 Kafka
# 后台运行
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9
# 实时查看 kafka 的日志信息
docker logs -f -t --tail 10 kafka
启动 Oracle 19C 数据库
- 参考文章:使用Docker装一个Oracle 19C的单机测试环境
- 19C 数据库里面创建 SCOTT 测试用户和数据。
[oracle@ora11g ~]$ sqlplus scott/scott@192.168.0.40:1521/pdbtt
SQL> set line 100 pages 100
SQL> select * from scott.emp;
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
---------- ---------- --------- ---------- --------- ---------- ---------- ----------
7369 SMITH CLERK 7902 17-DEC-80 800 20
7499 ALLEN SALESMAN 7698 20-FEB-81 1600 300 30
7521 WARD SALESMAN 7698 22-FEB-81 1250 500 30
7566 JONES MANAGER 7839 02-APR-81 2975 20
7654 MARTIN SALESMAN 7698 28-SEP-81 1250 1400 30
7698 BLAKE MANAGER 7839 01-MAY-81 2850 30
7782 CLARK MANAGER 7839 09-JUN-81 2450 10
7788 SCOTT ANALYST 7566 19-APR-87 3000 20
7839 KING PRESIDENT 17-NOV-81 5000 10
7844 TURNER SALESMAN 7698 08-SEP-81 1500 0 30
7876 ADAMS CLERK 7788 23-MAY-87 1100 20
7900 JAMES CLERK 7698 03-DEC-81 950 30
7902 FORD ANALYST 7566 03-DEC-81 3000 20
7934 MILLER CLERK 7782 23-JAN-82 1300 10
14 rows selected.
配置 Oracle 19C 数据库
- 确保数据库启动归档模式,使用CDB的服务登录
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /opt/oracle/oradata/ORCL/archive_logs
Oldest online log sequence 5
Next log sequence to archive 7
Current log sequence 7
- 启用最小补充日志,使用CDB的服务登录
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
-- 启用最小补充日志
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 切换到 PDB 中,为表启用补充日志
SQL> alter session set container=pdbtt;
ALTER TABLE scott.DEPT ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE scott.EMP ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE scott.BONUS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE scott.SALGRADE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
- 在 CDB 和 PDB 中创建 LogMiner 用户使用的表空间
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCL/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
-- 切换到 PDB 中,创建表空间
SQL> alter session set container=pdbtt;
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCL/PDBTT/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
- 在 CDB 中创建 LogMiner 用户并授予相关权限
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL;
测试数据库层面 Logminer
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
SQL> select member from v$logfile;
MEMBER
--------------------------------------------------------------------------------
/opt/oracle/oradata/ORCL/redo03.log
/opt/oracle/oradata/ORCL/redo02.log
/opt/oracle/oradata/ORCL/redo01.log
SQL> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/ORCL/redo01.log',dbms_logmnr.new);
SQL> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/ORCL/redo02.log',dbms_logmnr.addfile);
SQL> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/ORCL/redo03.log',dbms_logmnr.addfile);
SQL> execute dbms_logmnr.start_logmnr(options=>dbms_logmnr.dict_from_online_catalog+dbms_logmnr.committed_data_only);
SQL> select sql_redo,sql_undo from v$logmnr_contents where table_name like '%DEPT%' and OPERATION='INSERT';
SQL_REDO
----------------------------------------------------------------------------------------------------
SQL_UNDO
---------------------------------------------------------------------------------------------------------------
insert into "SCOTT"."DEPT"("DEPTNO","DNAME","LOC") values ('10','ACCOUNTING','NEW YORK');
delete from "SCOTT"."DEPT" where "DEPTNO" = '10' and "DNAME" = 'ACCOUNTING' and "LOC" = 'NEW YORK' and ROWID = 'AAAR1DAAMAAAACDAAA';
insert into "SCOTT"."DEPT"("DEPTNO","DNAME","LOC") values ('20','RESEARCH','DALLAS');
delete from "SCOTT"."DEPT" where "DEPTNO" = '20' and "DNAME" = 'RESEARCH' and "LOC" = 'DALLAS' and ROWID = 'AAAR1DAAMAAAACDAAB';
insert into "SCOTT"."DEPT"("DEPTNO","DNAME","LOC") values ('30','SALES','CHICAGO');
delete from "SCOTT"."DEPT" where "DEPTNO" = '30' and "DNAME" = 'SALES' and "LOC" = 'CHICAGO' and ROWID = 'AAAR1DAAMAAAACDAAC';
insert into "SCOTT"."DEPT"("DEPTNO","DNAME","LOC") values ('40','OPERATIONS','BOSTON');
delete from "SCOTT"."DEPT" where "DEPTNO" = '40' and "DNAME" = 'OPERATIONS' and "LOC" = 'BOSTON' and ROWID = 'AAAR1DAAMAAAACDAAD';
-- 下面语句为结束语句
SQL> execute dbms_logmnr.end_logmnr;
启动 Kafka Connect
# 后台运行
docker run -d --name connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
quay.io/debezium/connect:1.9
# 实时查看 Kafka Connect 的日志信息
docker logs -f -t --tail 10 connect
Debezium Oracle connector
- 下载 ojdbc8.jar 连接驱动
https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.3.0.0/ojdbc8-19.3.0.0.jar - 将驱动上传到 connect 容器中,重启 connect 容器
[root@docker ~]# mv ojdbc8-19.3.0.0.jar ojdbc8.jar
[root@docker ~]# docker cp ojdbc8.jar connect:/kafka/libs
[root@docker ~]# docker restart connect
- 准备 Debezium Oracle connector 配置文件
将配置文件创建在 docker 宿主机上即可,connect 容器开放了 REST API 来管理 Debezium 的连接器
database.hostname 需要使用容器内的IP地址,不然加不上
[root@docker ~]# vi oracle-scott-connector.json
{
"name": "oracle-scott-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.3",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "oracle19c",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
- 向 Kafka 连接器注册 Debezium Oracle connector
[root@docker ~]# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-scott-connector.json
HTTP/1.1 201 Created
Date: Sat, 16 Apr 2022 17:03:12 GMT
Location: http://192.168.0.40:8083/connectors/oracle-scott-connector
Content-Type: application/json
Content-Length: 534
Server: Jetty(9.4.43.v20210629)
{"name":"oracle-scott-connector","config":{"connector.class":"io.debezium.connector.oracle.OracleConnector","database.hostname":"172.17.0.3","database.port":"1521","database.user":"c##dbzuser","database.password":"dbz","database.dbname":"ORCL","database.pdb.name":"PDBTT","database.server.name":"oracle19c","tasks.max":"1","schema.include.list":"SCOTT","database.history.kafka.bootstrap.servers":"192.168.0.40:9092","database.history.kafka.topic":"schema-changes.inventory","name":"oracle-scott-connector"},"tasks":[],"type":"source"}
核对捕获到的数据
- 进入到connect容器内部 执行
[root@docker ~]# docker exec -it connect bash
[kafka@839c4a43b889 ~]$ bin/kafka-topics.sh --list --bootstrap-server kafka:9092
__consumer_offsets
my_connect_configs
my_connect_offsets
my_connect_statuses
oracle19c
oracle19c.SCOTT.DEPT
oracle19c.SCOTT.EMP
oracle19c.SCOTT.SALGRADE
schema-changes.inventory
[kafka@839c4a43b889 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic schema-changes.inventory --from-beginning
[kafka@839c4a43b889 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic oracle19c.SCOTT.DEPT --from-beginning
-
DDL
-
DML
模拟业务
- INSERT
使用 kafka-ui 查看 Kafka 里的消息
kafka-ui:Open-Source Web GUI for Apache Kafka Management:https://github.com/provectus/kafka-ui
docker run -p 8811:8080 \ -e KAFKA_CLUSTERS_0_NAME=oracle-scott-connector \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=192.168.0.40:9092 \ -d provectuslabs/kafka-ui:latest
网页登录:http://192.168.0.40:8811/
最后修改时间:2022-04-18 09:29:42
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。