暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用OGG传统模式将Oracle同步到kafka(全量+增量)

DB宝 2022-07-22
671

环境准备

Oracle环境

 1-- 创建专用网络
2docker network create --subnet=172.72.7.0/24 ora-network
3
4
5
6
7-- oracle 压测工具
8docker pull lhrbest/lhrdbbench:1.0 
9
10docker rm -f lhrdbbench
11docker run -d --name lhrdbbench -h lhrdbbench \
12  --net=ora-network --ip 172.72.7.33 \
13  -v /sys/fs/cgroup:/sys/fs/cgroup \
14  --privileged=true lhrbest/lhrdbbench:1.0 \
15  /usr/sbin/init
16
17
18
19
20-- Oracle 12c
21docker rm -f lhrora1221
22docker run -itd --name lhrora1221 -h lhrora1221 \
23  --net=ora-network --ip 172.72.7.34 \
24  -p 1526:1521 -p 3396:3389 \
25  --privileged=true \
26  lhrbest/oracle_12cr2_ee_lhr_12.2.0.1:2.0 init
27
28
29
30
31-- oracle数据库配置
321.开启数据库归档--如果没有开启
332.开启数据库级别附加日志--如果没有开始最小附加日志
343.开启强制日志--如果没有开启强制日志
354.设置ENABLE_GOLDENGATE_REPLICAT参数为TRUE
365.创建OGG用户包括包括源端用户、目标端用户以及OGG抽取用户
37
38
39alter database add supplemental log data;
40alter database add supplemental log data (all) columns;
41alter database force logging;
42alter system set enable_goldengate_replication=TRUE;
43
44select name,supplemental_log_data_min , force_logging, log_mode from v$database;
45
46
47alter system set streams_pool_size = 128M;
48alter system set sga_max_size = 2scope=spfile;
49alter system set sga_target = 2scope=spfile;
50alter system set pga_aggregate_target=1g;
51startup force
52
53
54-- OGG管理用户
55CREATE USER ogg identified by lhr;
56GRANT DBA to ogg;
57grant SELECT ANY DICTIONARY to ogg;
58GRANT EXECUTE ON SYS.DBMS_LOCK TO ogg;
59grant select any transaction to ogg;
60grant select any table to ogg;
61grant flashback any table to ogg;
62grant alter any table to ogg;
63
64exec dbms_goldengate_auth.grant_admin_privilege('OGG','*',TRUE); 
65
66
67-- 业务用户
68CREATE USER lhr identified by lhr;
69alter user lhr identified by lhr;
70GRANT DBA to lhr ;
71grant SELECT ANY DICTIONARY to lhr;
72GRANT EXECUTE ON SYS.DBMS_LOCK TO lhr;
73
74
75
76-- 启动监听
77vi /u01/app/oracle/product/12.2.0.1/dbhome_1/network/admin/listener.ora
78lsnrctl start
79lsnrctl status

Oracle数据初始化

  1-- 源端数据初始化
2/usr/local/swingbench/bin/oewizard  -s -create -c /usr/local/swingbench/wizardconfigs/oewizard.xml -create \
3-version 2.0  -cs //172.72.7.34/lhrsdb  -dba "sys as sysdba" -dbap lhr -dt thin \
4-ts users -u lhr -p lhr -allindexes  -scale 0.0001  -tc 16 -v -cl
5
6
7col TABLE_NAME format a30
8SELECT a.table_name,a.num_rows FROM dba_tables a where a.OWNER='LHR' ;
9select object_type,count(*) from dba_objects where owner='LHR' group by object_type;
10select object_type,status,count(*) from dba_objects where owner='LHR' group by object_type,status;
11select sum(bytes)/1024/1024 from dba_segments where owner='LHR';
12
13-- 检查键是否正确:https://www.xmmup.com/ogg-01296-biaoyouzhujianhuoweiyijiandanshirengranshiyongquanbulielaijiexixing.html
14-- 否则OGG启动后,会报错:OGG-01296、OGG-06439、OGG-01169 Encountered an update where all key columns for target table LHR.ORDER_ITEMS are not present.
15select owner, constraint_name, constraint_type, status, validated 
16from dba_constraints 
17where owner='LHR' 
18and VALIDATED='NOT VALIDATED';
19
20select 'alter table lhr.'||TABLE_NAME||' enable validate constraint '||CONSTRAINT_NAME||';' 
21from dba_constraints 
22where owner='LHR'
23and VALIDATED='NOT VALIDATED';
24
25
26-- 删除外键
27SELECT 'ALTER TABLE  LHR.'|| D.TABLE_NAME ||' DROP constraint '|| D.CONSTRAINT_NAME||';' 
28FROM DBA_constraints d where  owner='LHR' and d.CONSTRAINT_TYPE='R';
29
30
31
32
33sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb
34
35@/oggoracle/demo_ora_create.sql
36@/oggoracle/demo_ora_insert.sql
37
38
39SQL> select * from tcustmer;
40
41CUST NAME                           CITY                 ST
42---- ------------------------------ -------------------- --
43WILL BG SOFTWARE CO.                SEATTLE              WA
44JANE ROCKY FLYER INC.               DENVER               CO
45
46
47-- 创建2个clob和blob类型的表
48sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb  @/oggoracle/demo_ora_lob_create.sql
49exec testing_lobs;
50select * from lhr.TSRSLOB;
51
52drop table IMAGE_LOB;
53CREATE TABLE IMAGE_LOB (
54   T_ID VARCHAR2 (5NOT NULL,
55   T_IMAGE BLOB,
56   T_CLOB  CLOB 
57   );
58
59-- 插入blob文件  
60CREATE OR REPLACE DIRECTORY D1 AS '/home/oracle/';
61grant all on DIRECTORY  D1 TO PUBLIC;
62CREATE OR REPLACE NONEDITIONABLE PROCEDURE IMG_INSERT(TID      VARCHAR2,
63                                                      FILENAME VARCHAR2,
64                                                      name VARCHAR2) AS
65    F_LOB BFILE;
66    B_LOB BLOB;
67BEGIN
68    INSERT INTO IMAGE_LOB
69        (T_ID, T_IMAGE,T_CLOB)
70    VALUES
71        (TID, EMPTY_BLOB(),nameRETURN T_IMAGE INTO B_LOB;
72    F_LOB := BFILENAME('D1', FILENAME);
73    DBMS_LOB.FILEOPEN(F_LOB, DBMS_LOB.FILE_READONLY);
74    DBMS_LOB.LOADFROMFILE(B_LOB, F_LOB, DBMS_LOB.GETLENGTH(F_LOB));
75    DBMS_LOB.FILECLOSE(F_LOB);
76    COMMIT;
77END;
78/
79
80
81BEGIN
82    IMG_INSERT('1','1.jpg','xmmup.com');
83    IMG_INSERT('2','2.jpg','www.xmmup.com');
84 END;
85/
86
87
88select * from IMAGE_LOB;
89
90
91
92
93
94-----  oracle所有表
95SQL> select * from tab;
96
97TNAME                          TABTYPE  CLUSTERID
98------------------------------ ------- ----------
99ADDRESSES                      TABLE
100CARD_DETAILS                   TABLE
101CUSTOMERS                      TABLE
102IMAGE_LOB                      TABLE
103INVENTORIES                    TABLE
104LOGON                          TABLE
105ORDERENTRY_METADATA            TABLE
106ORDERS                         TABLE
107ORDER_ITEMS                    TABLE
108PRODUCTS                       VIEW
109PRODUCT_DESCRIPTIONS           TABLE
110PRODUCT_INFORMATION            TABLE
111PRODUCT_PRICES                 VIEW
112TCUSTMER                       TABLE
113TCUSTORD                       TABLE
114TSRSLOB                        TABLE
115TTRGVAR                        TABLE
116WAREHOUSES                     TABLE
117
11818 rows selected.
119
120
121
122SELECT COUNT(*) FROM LHR.ADDRESSES                      UNION ALL
123SELECT COUNT(*) FROM LHR.CARD_DETAILS                   UNION ALL
124SELECT COUNT(*) FROM LHR.CUSTOMERS                      UNION ALL
125SELECT COUNT(*) FROM LHR.IMAGE_LOB                      UNION ALL
126SELECT COUNT(*) FROM LHR.INVENTORIES                    UNION ALL
127SELECT COUNT(*) FROM LHR.LOGON                          UNION ALL
128SELECT COUNT(*) FROM LHR.ORDERENTRY_METADATA            UNION ALL
129SELECT COUNT(*) FROM LHR.ORDERS                         UNION ALL
130SELECT COUNT(*) FROM LHR.ORDER_ITEMS                    UNION ALL
131SELECT COUNT(*) FROM LHR.PRODUCT_DESCRIPTIONS           UNION ALL
132SELECT COUNT(*) FROM LHR.PRODUCT_INFORMATION            UNION ALL
133SELECT COUNT(*) FROM LHR.TCUSTMER                       UNION ALL
134SELECT COUNT(*) FROM LHR.TCUSTORD                       UNION ALL
135SELECT COUNT(*) FROM LHR.TSRSLOB                        UNION ALL
136SELECT COUNT(*) FROM LHR.TTRGVAR                        UNION ALL
137SELECT COUNT(*) FROM LHR.WAREHOUSES
138;
139
140  COUNT(*)
141----------
142       150
143       150
144       100
145         2
146    900724
147       239
148         4
149       143
150       773
151      1000
152      1000
153         2
154         2
155         1
156         0
157      1000
158
15916 rows selected.

最终,在Oracle端共包括16张表,2个视图,其中2个表TSRSLOB和IMAGE_LOB包括了blob和clob字段。

目标端kafka环境

 1docker pull lhrbest/kafka:3.2.0
2
3
4docker rm -f lhrkafka
5docker run -itd --name lhrkafka -h lhrkafka \
6  --net=ora-network --ip 172.72.7.44 \
7  -p 9092:9092 -p 2181:2181 \
8  -v /sys/fs/cgroup:/sys/fs/cgroup \
9  --privileged=true lhrbest/kafka:3.2.0 \
10  /usr/sbin/init
11
12docker exec -it lhrkafka bash
13
14
15-- 启动(默认已启动)
16/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
17/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
18
19[root@lhrkafka /]# jps
20161 QuorumPeerMain
21162 Kafka
221127 Jps
23[root@lhrkafka /]# ps -ef|grep java
24root         161       1  7 14:20 ?        00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties
25root         162       1 30 14:20 ?        00:00:14 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties
26root        1167     961  0 14:20 pts/1    00:00:00 grep --color=auto java
27[root@lhrkafka /]# netstat -tulnp | grep java
28tcp        0      0 0.0.0.0:2181            0.0.0.0:*               LISTEN      161/java            
29tcp        0      0 0.0.0.0:9999            0.0.0.0:*               LISTEN      162/java            
30tcp        0      0 0.0.0.0:37691           0.0.0.0:*               LISTEN      161/java            
31tcp        0      0 0.0.0.0:40831           0.0.0.0:*               LISTEN      162/java            
32tcp        0      0 0.0.0.0:38977           0.0.0.0:*               LISTEN      162/java            
33tcp        0      0 0.0.0.0:9092            0.0.0.0:*               LISTEN      162/java 

kafka默认占用9092端口,ZK默认占用2181端口。

kafka日志:

1tailf /usr/local/kafka/logs/server.log

测试一下,在服务器上创建一个topic为test,然后生产几条信息:

 1-- 生产者
2/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
3>hello
4>world
5
6
7
8-- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据
9/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic test --from-beginning
10hello
11word

源端OGG for  oracle环境

OGG下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html

 1-- OGG机器,同时包含oracle和bigdata
2docker rm -f lhrogg21all
3docker run -d --name lhrogg21all -h lhrogg21all \
4  --net=ora-network --ip 172.72.7.7 \
5  -p 39391:3389 -p 37809-37819:7809-7819 \
6  -v /sys/fs/cgroup:/sys/fs/cgroup \
7  --privileged=true lhrbest/ogg21all:v5.0 \
8  /usr/sbin/init
9
10docker exec -it lhrogg21all bash
11
12
13su - oracle
14ogg
15
16
17
18add credentialstore
19alter credentialstore add user ogg@172.72.7.34/lhrsdb, password lhr alias ora12c
20INFO CREDENTIALSTORE
21
22dblogin useridalias ora12c
23sqlplus ogg/lhr@172.72.7.34/lhrsdb
24
25
26dblogin useridalias ora12c
27ADD SCHEMATRANDATA LHR
28INFO  SCHEMATRANDATA LHR
29list tables LHR.*
30
31GGSCI (lhrogg21all as ogg@lhrsdb) 6> INFO  SCHEMATRANDATA LHR
32
332022-06-28 15:26:46  INFO    OGG-06480  Schema level supplemental loggingexcluding non-validated keysis enabled on schema "LHR".
34
352022-06-28 15:26:46  INFO    OGG-01980  Schema level supplemental logging is enabled on schema "LHR" for all scheduling columns.
36
372022-06-28 15:26:46  INFO    OGG-10462  Schema "LHR" have 16 prepared tables for instantiation.
38
39GGSCI (lhrogg21all as ogg@lhrsdb) 7list tables LHR.*
40"LHR"."ADDRESSES"
41"LHR"."CARD_DETAILS"
42"LHR"."CUSTOMERS"
43"LHR"."IMAGE_LOB"
44"LHR"."INVENTORIES"
45"LHR"."LOGON"
46"LHR"."ORDERENTRY_METADATA"
47"LHR"."ORDERS"
48"LHR"."ORDER_ITEMS"
49"LHR"."PRODUCTS"
50"LHR"."PRODUCT_DESCRIPTIONS"
51"LHR"."PRODUCT_INFORMATION"
52"LHR"."PRODUCT_PRICES"
53"LHR"."TCUSTMER"
54"LHR"."TCUSTORD"
55"LHR"."TSRSLOB"
56"LHR"."TTRGVAR"
57"LHR"."WAREHOUSES"
58
59Found 18 tables matching list criteria.
60
61
62
63-- 有2个是视图,后期需要排除掉
64tableexclude LHR.PRODUCTS;
65tableexclude LHR.PRODUCT_PRICES;
66
67
68GGSCI (lhrogg21all as ogg@lhrsdb) 10> info all
69
70Program     Status      Group       Lag at Chkpt  Time Since Chkpt
71
72MANAGER     RUNNING                                           
73JAGENT      STOPPED                                           
74PMSRVR      STOPPED

目标端OGG for  bigdata环境

OGG下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html

1docker exec -it lhrogg21all bash
2su - bigdata
3ogg
4
5
6-- 解决OGG-01201:Error reported by MGR : Access denied
7edit params mgr
8port 8809
9ACCESSRULE, PROG *, IPADDR *, ALLOW

全量同步

注意:在此阶段,源端需要停业务,不能产生新数据。

Oracle端配置

OGG初始化可以将数据直接输入目标端,也可以先抽取到本地,然后再输入目标端,这里我们直接同步到目标端的kafka里,如下:

 1-- oracle端
2edit params ext0
3EXTRACT ext0
4USERIDALIAS ora12c
5rmthost 127.0.0.1,mgrport 8809
6rmttask replicat,group rep0
7tableexclude LHR.PRODUCTS;
8tableexclude LHR.PRODUCT_PRICES;
9TABLE LHR.*;
10
11add extract ext0 ,sourceistable
12
13
14-- 启动mgr
15start mgr

  • SOURCEISTABLE指示Extract直接从源表中读取完整的记录。

kafka端配置

 1edit params rep0
2REPLICAT rep0
3targetdb libfile libggjava.so set property=./dirprm/kafka.props
4REPLACEBADCHAR SKIP
5SOURCECHARSET OVERRIDE GBK
6map LHR.*, target LHR.*;
7
8
9add replicat rep0 ,specialrun
10
11
12start mgr
13
14
15
16-- 配置kafka参数
17vi /oggbigdata/dirprm/kafka.props
18gg.handler.kafkahandler.schemaTopicName=LHR_OGG
19
20
21vi  /oggbigdata/dirprm/custom_kafka_producer.properties
22bootstrap.servers=172.72.7.44:9092

SPECIALRUN –将replicat设定为一次性运行,不需要checkpoint

END RUNTIME –当load完成后终结replicat

gg.handler.kafkahandler.topicMappingTemplate:kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic。

gg.handler.kafkahandler.format:传输文件的格式,支持json,xml,avro_op等。

gg.handler.kafkahandler.mode:传输模式,op为一次SQL传输一次,tx为一次事务传输一次。

gg.classpath:须指定相应的lib路径。

全量同步

 1-- 直接启动源端ext0即可
2start ext0
3info ext0
4view report ext0
5
6
7
8-- 查看所有历史数据
9/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic LHR_OGG --from-beginning
10
11
12
13-- 查看当前服务器中的所有 topic
14/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
15
16
17-- topic详情
18/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server  localhost:9092 --describe --topic LHR_OGG

ext0的日志:

  1GGSCI (lhrogg21all) 11> info ext0
2
3Extract    EXT0      Last Started 2022-07-21 15:23   Status STOPPED
4Checkpoint Lag       Not Available
5Log Read Checkpoint  Table LHR.WAREHOUSES
6                     2022-07-21 15:24:31  Record 1000
7Task                 SOURCEISTABLE
8
9
10GGSCI (lhrogg21all) 12> view report ext0
11
12
132022-07-21 15:23:41  INFO    OGG-01017  Wildcard resolution set to IMMEDIATE because SOURCEISTABLE Extract is used.
14。。。。。。。。。。。。。
15***********************************************************************
16*                   ** Run Time Statistics **                         *
17***********************************************************************
18
19
20Report at 2022-07-21 15:24:31 (activity since 2022-07-21 15:23:42)
21
22Output to rep0:
23
24From table LHR.ADDRESSES:
25       #                   inserts:       281
26       #                   updates:         0
27       #                   deletes:         0
28       #                   upserts:         0
29       #                  discards:         0
30From table LHR.CARD_DETAILS:
31       #                   inserts:       281
32       #                   updates:         0
33       #                   deletes:         0
34       #                   upserts:         0
35       #                  discards:         0
36From table LHR.CUSTOMERS:
37       #                   inserts:       230
38       #                   updates:         0
39       #                   deletes:         0
40       #                   upserts:         0
41       #                  discards:         0
42From table LHR.EMP:
43       #                   inserts:         9
44       #                   updates:         0
45       #                   deletes:         0
46       #                   upserts:         0
47       #                  discards:         0
48From table LHR.IMAGE_LOB:
49       #                   inserts:         4
50       #                   updates:         0
51       #                   deletes:         0
52       #                   upserts:         0
53       #                  discards:         0
54From table LHR.INVENTORIES:
55       #                   inserts:    900724
56       #                   updates:         0
57       #                   deletes:         0
58       #                   upserts:         0
59       #                  discards:         0
60From table LHR.LOGON:
61       #                   inserts:       575
62       #                   updates:         0
63       #                   deletes:         0
64       #                   upserts:         0
65       #                  discards:         0
66From table LHR.ORDERENTRY_METADATA:
67       #                   inserts:         4
68       #                   updates:         0
69       #                   deletes:         0
70       #                   upserts:         0
71       #                  discards:         0
72From table LHR.ORDERS:
73       #                   inserts:       424
74       #                   updates:         0
75       #                   deletes:         0
76       #                   upserts:         0
77       #                  discards:         0
78From table LHR.ORDER_ITEMS:
79       #                   inserts:      1642
80       #                   updates:         0
81       #                   deletes:         0
82       #                   upserts:         0
83       #                  discards:         0
84From table LHR.PRODUCT_DESCRIPTIONS:
85       #                   inserts:      1000
86       #                   updates:         0
87       #                   deletes:         0
88       #                   upserts:         0
89       #                  discards:         0
90From table LHR.PRODUCT_INFORMATION:
91       #                   inserts:      1000
92       #                   updates:         0
93       #                   deletes:         0
94       #                   upserts:         0
95       #                  discards:         0
96From table LHR.TCUSTMER:
97       #                   inserts:         2
98       #                   updates:         0
99       #                   deletes:         0
100       #                   upserts:         0
101       #                  discards:         0
102From table LHR.TCUSTORD:
103       #                   inserts:         2
104       #                   updates:         0
105       #                   deletes:         0
106       #                   upserts:         0
107       #                  discards:         0
108From table LHR.TSRSLOB:
109       #                   inserts:         1
110       #                   updates:         0
111       #                   deletes:         0
112       #                   upserts:         0
113       #                  discards:         0
114From table LHR.WAREHOUSES:
115       #                   inserts:      1000
116       #                   updates:         0
117       #                   deletes:         0
118       #                   upserts:         0
119       #                  discards:         0
120
121  Bytes output             86255971

rep0的日志:

  1GGSCI (lhrogg21all) 13> info rep0
2
3Replicat   REP0      Initialized  2022-07-21 15:17   Status STARTING
4Checkpoint Lag       00:00:00 (updated 00:07:16 ago)
5Process ID           1255
6Log Read Checkpoint  Not Available
7Task                 SPECIALRUN
8
9
10GGSCI (lhrogg21all) 14> info rep0
11
12Replicat   REP0      Initialized  2022-07-21 15:17   Status STOPPED
13Checkpoint Lag       00:00:00 (updated 00:07:35 ago)
14Log Read Checkpoint  Not Available
15Task                 SPECIALRUN
16
17
18GGSCI (lhrogg21all) 15> view report rep0
19
20
21***********************************************************************
22                    Oracle GoldenGate for Big Data
23                    Version 21.4.0.0.0 (Build 002)
24
25                      Oracle GoldenGate Delivery
26   Version 21.4.0.0.0 OGGCORE_21.4.0.0.0OGGRU_PLATFORMS_211022.1803
27 Oracle Linux 7, x64, 64bit (optimized), Generic  on Oct 22 2021 23:37:40
28
29Copyright (C) 1995, 2021, Oracle and/or its affiliates. All rights reserved.
30
31                    Starting at 2022-07-21 15:23:43
32***********************************************************************
33。。。。。
34***********************************************************************
35*                   ** Run Time Statistics **                         *
36***********************************************************************
37
38
39Report at 2022-07-21 15:24:36 (activity since 2022-07-21 15:23:52)
40
41From table LHR.ADDRESSES to LHR.ADDRESSES:
42       #                   inserts:       281
43       #                   updates:         0
44       #                   deletes:         0
45       #                   upserts:         0
46       #                  discards:         0
47From table LHR.CARD_DETAILS to LHR.CARD_DETAILS:
48       #                   inserts:       281
49       #                   updates:         0
50       #                   deletes:         0
51       #                   upserts:         0
52       #                  discards:         0
53From table LHR.CUSTOMERS to LHR.CUSTOMERS:
54       #                   inserts:       230
55       #                   updates:         0
56       #                   deletes:         0
57       #                   upserts:         0
58       #                  discards:         0
59From table LHR.EMP to LHR.EMP:
60       #                   inserts:         9
61       #                   updates:         0
62       #                   deletes:         0
63       #                   upserts:         0
64       #                  discards:         0
65From table LHR.IMAGE_LOB to LHR.IMAGE_LOB:
66       #                   inserts:         4
67       #                   updates:         0
68       #                   deletes:         0
69       #                   upserts:         0
70       #                  discards:         0
71From table LHR.INVENTORIES to LHR.INVENTORIES:
72       #                   inserts:    900724
73       #                   updates:         0
74       #                   deletes:         0
75       #                   upserts:         0
76       #                  discards:         0
77From table LHR.LOGON to LHR.LOGON:
78       #                   inserts:       575
79       #                   updates:         0
80       #                   deletes:         0
81       #                   upserts:         0
82       #                  discards:         0
83From table LHR.ORDERENTRY_METADATA to LHR.ORDERENTRY_METADATA:
84       #                   inserts:         4
85       #                   updates:         0
86       #                   deletes:         0
87       #                   upserts:         0
88       #                  discards:         0
89From table LHR.ORDERS to LHR.ORDERS:
90       #                   inserts:       424
91       #                   updates:         0
92       #                   deletes:         0
93       #                   upserts:         0
94       #                  discards:         0
95From table LHR.ORDER_ITEMS to LHR.ORDER_ITEMS:
96       #                   inserts:      1642
97       #                   updates:         0
98       #                   deletes:         0
99       #                   upserts:         0
100       #                  discards:         0
101From table LHR.PRODUCT_DESCRIPTIONS to LHR.PRODUCT_DESCRIPTIONS:
102       #                   inserts:      1000
103       #                   updates:         0
104       #                   deletes:         0
105       #                   upserts:         0
106       #                  discards:         0
107From table LHR.PRODUCT_INFORMATION to LHR.PRODUCT_INFORMATION:
108       #                   inserts:      1000
109       #                   updates:         0
110       #                   deletes:         0
111       #                   upserts:         0
112       #                  discards:         0
113From table LHR.TCUSTMER to LHR.TCUSTMER:
114       #                   inserts:         2
115       #                   updates:         0
116       #                   deletes:         0
117       #                   upserts:         0
118       #                  discards:         0
119From table LHR.TCUSTORD to LHR.TCUSTORD:
120       #                   inserts:         2
121       #                   updates:         0
122       #                   deletes:         0
123       #                   upserts:         0
124       #                  discards:         0
125From table LHR.TSRSLOB to LHR.TSRSLOB:
126       #                   inserts:         1
127       #                   updates:         0
128       #                   deletes:         0
129       #                   upserts:         0
130       #                  discards:         0
131From table LHR.WAREHOUSES to LHR.WAREHOUSES:
132       #                   inserts:      1000
133       #                   updates:         0
134       #                   deletes:         0
135       #                   upserts:         0
136       #                  discards:         0
137
138
1392022-07-21 15:24:36  INFO    OGG-25701  The file caching thread was shutdown. Thread ID: 140267906610944.
140。。。。。。

一张表一个主题,如下:

 1[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
2__consumer_offsets
3test
4[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
5ADDRESSES
6CARD_DETAILS
7CUSTOMERS
8EMP
9IMAGE_LOB
10INVENTORIES
11LHR_OGG
12LOGON
13ORDERENTRY_METADATA
14ORDERS
15ORDER_ITEMS
16PRODUCT_DESCRIPTIONS
17PRODUCT_INFORMATION
18TCUSTMER
19TCUSTORD
20TSRSLOB
21WAREHOUSES
22__consumer_offsets
23test
24[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092 | wc -l
2519
26[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server  localhost:9092 --describe --topic WAREHOUSES
27Topic: WAREHOUSES       TopicId: HR3273rMTK6JsQt8OTjKNA PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
28        Topic: WAREHOUSES       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
29[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic WAREHOUSES --from-beginning | wc -l       
30^CProcessed a total of 1000 messages

数据已全量同步完成。

增量同步

oracle

 1-- Oracle端
2ADD EXTRACT ext1 INTEGRATED TRANLOG BEGIN NOW
3ADD EXTTRAIL ./dirdat/e1 EXTRACT ext1
4
5dblogin useridalias ora12c
6REGISTER EXTRACT ext1 DATABASE
7
8
9edit params ext1
10EXTRACT ext1
11USERIDALIAS ora12c
12TRANLOGOPTIONS FETCHPARTIALLOB
13EXTTRAIL ./dirdat/e1
14TABLE LHR.*;
15
16
17-- 启动ext1
18start ext1
19
20GGSCI (tmp as ogg@lhrsdb) 56> info all
21
22Program     Status      Group       Lag at Chkpt  Time Since Chkpt
23
24MANAGER     RUNNING                                           
25JAGENT      STOPPED                                           
26PMSRVR      STOPPED                                           
27EXTRACT     RUNNING     EXT1        00:00:00      00:00:00 

kafka

 1edit param rep1
2REPLICAT rep1
3targetdb libfile libggjava.so set property=./dirprm/kafka.props
4map LHR.*, target LHR.*;
5
6
7add replicat rep1 exttrail /oggoracle/dirdat/e1
8
9
10start rep1
11
12
13GGSCI (lhrogg21all) 23> info all
14
15Program     Status      Group       Lag at Chkpt  Time Since Chkpt
16
17MANAGER     RUNNING                                           
18REPLICAT    RUNNING     REP1        00:00:00      00:00:00 

增量测试

 1-- 数据库
2LHR@lhrsdb> delete from lhr.WAREHOUSES where rownum<=2;
3
42 rows deleted.
5
6LHR@lhrsdb> commit;
7
8Commit complete.
9
10
11
12-- 源端OGG
13GGSCI (lhrogg21all as ogg@lhrsdb) 44> stats ext1,total,table LHR.WAREHOUSES
14
15Sending STATS request to Extract group EXT1 ...
16
17Start of statistics at 2022-07-21 15:36:12.
18
19Output to ./dirdat/e1:
20
21Extracting from LHR.WAREHOUSES to LHR.WAREHOUSES:
22
23*** Total statistics since 2022-07-21 15:35:48 ***
24    Total inserts                              0.00
25    Total updates                              0.00
26    Total deletes                              2.00
27    Total upserts                              0.00
28    Total discards                             0.00
29    Total operations                           2.00
30
31End of statistics.
32
33
34-- 目标端OGG
35GGSCI (lhrogg21all) 24> stats rep1,total,table LHR.WAREHOUSES
36
37Sending STATS request to Replicat group REP1 ...
38
39Start of statistics at 2022-07-21 15:36:50.
40
41Replicating from LHR.WAREHOUSES to LHR.WAREHOUSES:
42
43*** Total statistics since 2022-07-21 15:35:50 ***
44    Total inserts                              0.00
45    Total updates                              0.00
46    Total deletes                              2.00
47    Total upserts                              0.00
48    Total discards                             0.00
49    Total operations                           2.00
50
51End of statistics.
52
53
54
55-- kafka
56[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic WAREHOUSES 
57LHR.WAREHOUSESD42022-07-21 15:35:46.00000042022-07-21 15:35:50.651000(000000000000000019659B
58LHR.WAREHOUSESD42022-07-21 15:35:46.00000042022-07-21 15:35:50.769000(00000000000000002125OetJMt

使用kafka manager查看kafka数据

参考:https://www.xmmup.com/kafkatuxingguanligongjucmakkafka-manageranzhuangjishiyong.html

 1docker pull registry.cn-hangzhou.aliyuncs.com/lhrbest/kafkamanager_cmak:3.0.0.6
2
3docker rm -f lhrkafkamanager
4docker run -itd --name lhrkafkamanager -h lhrkafkamanager \
5  --net=ora-network --ip 172.72.7.45 \
6  -p 9100:9000  \
7  -v /sys/fs/cgroup:/sys/fs/cgroup \
8  --privileged=true lhrbest/kafkamanager_cmak:3.0.0.6 \
9  /usr/sbin/init
10
11docker exec -it lhrkafkamanager bash
12
13web登陆地址:http://192.168.1.35:9100/

总结

1、从OGG 12.2开始不再需要表定义文件文件

2、由于源端和目标端的OGG在同一台主机,所以请注意mgr端口和dirdat的位置。


文章转载自DB宝,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论