暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用OGG微服务将PG同步到kafka(全量+增量)

DB宝 2022-07-28
1499


环境准备

PG环境

 1-- 创建专用网络
2docker network create --subnet=172.72.6.0/24 pg-network
3
4-- PG
5docker rm -f lhrpg
6docker run -d --name lhrpg -h lhrpg \
7   -p 64320:5432 --net=pg-network --ip 172.72.6.34 \
8   -e POSTGRES_PASSWORD=lhr \
9   -e TZ=Asia/Shanghai \
10   postgres:14.2
11
12
13
14psql -U postgres -h 192.168.1.35 -p 64320
15
16
17create database lhrdb;
18\c lhrdb
19create table t1(id int primary key);
20create table t2(id int primary key);
21create schema ogg;
22
23
24-- 需要重启库
25alter system set wal_level='logical';
26select pg_reload_conf();
27
28
29docker restart lhrpg
30
31
32
33
34
35sysbench /usr/share/sysbench/oltp_common.lua --db-driver=pgsql \
36--pgsql-host=172.72.6.34 --pgsql-port=5432 \
37--pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb \
38--table-size=100 --tables=10 --threads=10 \
39--events=999999999 --time=60 prepare
40
41
42
43psql -U postgres -h 192.168.1.35 -p 64320 -d lhrdb
44lhrdb=# \dt
45          List of relations
46 Schema |   Name   | Type  |  Owner   
47--------+----------+-------+----------
48 public | sbtest1  | table | postgres
49 public | sbtest10 | table | postgres
50 public | sbtest2  | table | postgres
51 public | sbtest3  | table | postgres
52 public | sbtest4  | table | postgres
53 public | sbtest5  | table | postgres
54 public | sbtest6  | table | postgres
55 public | sbtest7  | table | postgres
56 public | sbtest8  | table | postgres
57 public | sbtest9  | table | postgres
58 public | t1       | table | postgres
59 public | t2       | table | postgres
60(12 rows)

复制

目标端kafka环境

 1docker pull lhrbest/kafka:3.2.0
2
3
4docker rm -f lhrkafka
5docker run -itd --name lhrkafka -h lhrkafka \
6  --net=pg-network --ip 172.72.6.44 \
7  -p 9092:9092 -p 2181:2181 \
8  -v /sys/fs/cgroup:/sys/fs/cgroup \
9  --privileged=true lhrbest/kafka:3.2.0 \
10  /usr/sbin/init
11
12docker exec -it lhrkafka bash
13
14
15-- 启动(默认已启动)
16/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
17/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
18
19[root@lhrkafka /]# jps
20161 QuorumPeerMain
21162 Kafka
221127 Jps
23[root@lhrkafka /]# ps -ef|grep java
24root         161       1  7 14:20 ?        00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties
25root         162       1 30 14:20 ?        00:00:14 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties
26root        1167     961  0 14:20 pts/1    00:00:00 grep --color=auto java
27[root@lhrkafka /]# netstat -tulnp | grep java
28tcp        0      0 0.0.0.0:2181            0.0.0.0:*               LISTEN      161/java            
29tcp        0      0 0.0.0.0:9999            0.0.0.0:*               LISTEN      162/java            
30tcp        0      0 0.0.0.0:37691           0.0.0.0:*               LISTEN      161/java            
31tcp        0      0 0.0.0.0:40831           0.0.0.0:*               LISTEN      162/java            
32tcp        0      0 0.0.0.0:38977           0.0.0.0:*               LISTEN      162/java            
33tcp        0      0 0.0.0.0:9092            0.0.0.0:*               LISTEN      162/java 

复制

kafka默认占用9092端口,ZK默认占用2181端口。

kafka日志:

1tailf /usr/local/kafka/logs/server.log

复制

测试一下,在服务器上创建一个topic为test,然后生产几条信息:

 1-- 生产者
2/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
3>hello
4>world
5
6
7
8-- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据
9/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic test --from-beginning
10hello
11word
12
13
14
15
16-- 查看当前服务器中的所有 topic
17/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092

复制

源端OGG for  PG微服务环境

 1-- OGG机器
2docker pull lhrbest/ogg213mapg:v1.0
3
4docker rm -f lhrogg213mapg
5docker run -d --name lhrogg213mapg -h lhrogg213mapg \
6  --net=pg-network --ip 172.72.6.100 \
7  -p 9391:3389 -p 29000-29005:9000-9005 \
8  -v /sys/fs/cgroup:/sys/fs/cgroup \
9  --privileged=true lhrbest/ogg213mapg:v1.0 \
10  /usr/sbin/init
11
12
13docker exec -it lhrogg213mapg bash
14
15
16
17-- OGGMA
18cat > /ogg213c/ogg_ma/odbc.ini <<"EOF"
19[ODBC Data Sources]
20PGDSN=DataDirect 13 PostgreSQL Wire Protocol
21
22[ODBC]
23IANAAppCodePage=106
24InstallDir=/ogg213c/ogg_ma
25
26[PGDSN]
27Driver=/ogg213c/ogg_ma/lib/GGpsql25.so
28#Driver=/usr/lib64/psqlodbcw.so
29Description=DataDirect 13 PostgreSQL Wire Protocol
30Database=lhrdb
31HostName=172.72.6.34
32PortNumber=5432
33LogonID=postgres
34Password=lhr
35
36EOF
37
38
39su - pg
40adminclient
41CONNECT http://127.0.0.1:9000 deployment deploy213 as oggadmin password lhr

复制

访问:http://192.168.1.35:29001 ,用户名:oggadmin,密码:lhr

创建身份证明、添加trandata

目标端OGG for  bigdata微服务环境

 1docker pull lhrbest/ogg214mabigdata:v1.0
2
3docker rm -f lhrogg214mabigdata
4docker run -d --name lhrogg214mabigdata -h lhrogg214mabigdata \
5  --net=pg-network --ip 172.72.6.101 \
6  -p 9191:3389 -p 9000-9005:9000-9005 \
7  -v /sys/fs/cgroup:/sys/fs/cgroup \
8  --privileged=true lhrbest/ogg214mabigdata:v1.0 \
9  /usr/sbin/init
10
11
12docker exec -it lhrogg214mabigdata bash
13
14
15-- 配置kafka参数
16vi /ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
17gg.handler.kafkahandler.schemaTopicName=LHR_OGG
18
19
20vi  /ogg214c/ogg_deploy/etc/conf/ogg/custom_kafka_producer.properties
21bootstrap.servers=172.72.6.44:9092

复制

访问:http://192.168.1.35:9001 ,用户名:oggadmin,密码:lhr

全量同步

注意:在此阶段,源端需要停业务,不能产生新数据。

源端创建初始化加载

image-20220725094050700
1EXTRACT ext0
2SETENV(PGCLIENTENCODING = "UTF8")
3SETENV(ODBCINI="/ogg213c/ogg_ma/odbc.ini")
4SOURCEDB PGDSN USERIDALIAS PG1, DOMAIN OGGMA
5EXTFILE ./dirdat/e0 ,  PURGE
6TABLE public.*;

复制

查询报告,说明数据已经传输到目标端了,如下:

进入OS查询:

1[root@lhrogg213mapg /]# cd /ogg213c/ogg_deploy/var/lib/data/dirdat
2[root@lhrogg213mapg dirdat]# ll
3total 272
4-rw-r----- 1 oracle oinstall 278395 Jul 25 09:42 e0000000
5[root@lhrogg213mapg dirdat]# ll -h
6total 272K
7-rw-r----- 1 oracle oinstall 272K Jul 25 09:42 e0000000
8[root@lhrogg213mapg dirdat]

复制

使用scp将文件传递到目标端

1scp  /ogg213c/ogg_deploy/var/lib/data/dirdat/e0000000 root@172.72.6.101:/ogg214c/ogg_deploy/var/lib/data/dirdat/
2
3cd /ogg214c/ogg_deploy/var/lib/data/dirdat/
4chown oracle.oinstall -R ./dirdat/

复制

目标端kafka数据全量初始化

1REPLICAT rep0
2targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
3end runtime
4map public.*, target public.*;

复制

运行完后,自动停止:

全量同步结果检查

 1-- 查看所有历史数据
2/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic LHR_OGG --from-beginning
3
4
5
6-- 查看当前服务器中的所有 topic
7/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
8
9
10-- topic详情
11/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server  localhost:9092 --describe --topic LHR_OGG

复制

一张表一个主题,如下:

 1[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
2__consumer_offsets
3test
4[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
5LHR_OGG
6__consumer_offsets
7sbtest1
8sbtest10
9sbtest2
10sbtest3
11sbtest4
12sbtest5
13sbtest6
14sbtest7
15sbtest8
16sbtest9
17[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic sbtest1 --from-beginning | wc -l
18^CProcessed a total of 100 messages

复制

数据已全量同步完成。

增量同步

配置复制槽

在配置PostgreSQL实时同步之前,需要先配置复制槽。

1su - oracle
2adminclient
3CONNECT http://127.0.0.1:9001 deployment deploy213 as oggadmin password lhr
4dblogin useridalias PG1 DOMAIN OGGMA
5REGISTER EXTRACT  ext1

复制

若不配置复制槽,会报错:OGG-25374        
Oracle GoldenGate Capture for PostgreSQL, EXT1.prm: The replication slot 'ext1_eaa1c3d574a94c47' for group 'EXT1' does not exist in the database 'lhrdb'.

过程:

 1[root@lhrogg213mapg dirdat]# su  - oracle
2Last login: Fri Dec  3 10:58:49 CST 2021 on pts/0
3[oracle@lhrogg213mapg ~]$ adminclient 
4Oracle GoldenGate Administration Client for PostgreSQL
5Version 21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047
6
7Copyright (C) 1995, 2021, Oracle and/or its affiliates. All rights reserved.
8
9Oracle Linux 7, x64, 64bit (optimized) on Aug  4 2021 19:52:45
10Operating system character set identified as UTF-8.
11
12OGG (not connected) 1CONNECT http://127.0.0.1:9001 deployment deploy213 as oggadmin password lhr             
13
14OGG (http://127.0.0.1:9001 deploy213) 2> dblogin useridalias PG1 DOMAIN OGGMA
15Successfully logged into database.
16
17OGG (http://127.0.0.1:9001 deploy213 as PG1@lhrdb) 3REGISTER EXTRACT  ext1
182022-07-25T02:19:38Z  INFO    OGG-25355  Successfully created replication slot 'ext1_eaa1c3d574a94c47' for Extract group 'EXT1' in database 'lhrdb'.
19
20OGG (http://127.0.0.1:9001 deploy213 as PG1@lhrdb) 4

复制

PG端配置

1extract ext1
2SETENV(PGCLIENTENCODING = "UTF8" )
3SETENV(ODBCINI="/ogg213c/ogg_ma/odbc.ini" )
4SOURCEDB PGDSN USERIDALIAS PG1, DOMAIN OGGMA
5exttrail ./dirdat/e1
6IGNOREREPLICATES
7table public.*;

复制

源端配置数据分发服务

登陆:http://192.168.1.35:29002

1trail://172.72.6.100:9002/services/v2/sources?trail=./dirdat/e1
2ogg://172.72.6.101:9003/services/v2/targets?trail=./dirdat/e1

复制

此时,bigdata会自动添加接收方服务:

文件已传输到目标端:

1[root@lhrogg214mabigdata dirdat]# ll
2total 276
3-rwxrwxrwx 1 oracle oinstall 278395 Jul 25 09:51 e0000000
4-rw-r----- 1 oracle oinstall   1534 Jul 25 10:31 e1000000000
5[root@lhrogg214mabigdata dirdat]# pwd
6/ogg214c/ogg_deploy/var/lib/data/dirdat

复制

kafka端应用配置

目标端选项较多,包括:Warehouse、Cassandra、HBase、HDFS、JDBC、Kafka和Kafka Connect等。

1REPLICAT rep1
2targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
3map public.*, target public.*;

复制

增量测试

1lhrdb=# insert into t1 values (1),(2);
2INSERT 0 2
3lhrdb=# delete from sbtest1 where id<=1;
4DELETE 1
5lhrdb=#

复制

源端:

数据分发:

kafka端:

命令行接收:

1[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic t1
2public.t1I42022-07-25 10:34:36.05799942022-07-25 10:34:40.889000(00000000000000001820
3public.t1I42022-07-25 10:34:36.05799942022-07-25 10:34:41.005000(00000000000000001922

复制

可见,数据会增量同步的。

使用kafka manager查看kafka数据

参考:https://www.xmmup.com/kafkatuxingguanligongjucmakkafka-manageranzhuangjishiyong.html

 1docker pull registry.cn-hangzhou.aliyuncs.com/lhrbest/kafkamanager_cmak:3.0.0.6
2
3docker rm -f lhrkafkamanager
4docker run -itd --name lhrkafkamanager -h lhrkafkamanager \
5  --net=ora-network --ip 172.72.6.45 \
6  -p 9100:9000  \
7  -v /sys/fs/cgroup:/sys/fs/cgroup \
8  --privileged=true lhrbest/kafkamanager_cmak:3.0.0.6 \
9  /usr/sbin/init
10
11docker exec -it lhrkafkamanager bash
12
13web登陆地址:http://192.168.1.35:9100/

复制

总结

1、配置数据分发服务时,需要注意dirdat的位置

2、分发是9002端口,接收是9003端口。

3、若replicate进程启动不报错,但是不应用,检查是否参数文件的owner写错了?


文章转载自DB宝,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论