环境准备

PG环境
1-- 创建专用网络
2docker network create --subnet=172.72.6.0/24 pg-network
3
4-- PG
5docker rm -f lhrpg
6docker run -d --name lhrpg -h lhrpg \
7 -p 64320:5432 --net=pg-network --ip 172.72.6.34 \
8 -e POSTGRES_PASSWORD=lhr \
9 -e TZ=Asia/Shanghai \
10 postgres:14.2
11
12
13
14psql -U postgres -h 192.168.1.35 -p 64320
15
16
17create database lhrdb;
18\c lhrdb
19create table t1(id int primary key);
20create table t2(id int primary key);
21create schema ogg;
22
23
24-- 需要重启库
25alter system set wal_level='logical';
26select pg_reload_conf();
27
28
29docker restart lhrpg
30
31
32
33
34
35sysbench /usr/share/sysbench/oltp_common.lua --db-driver=pgsql \
36--pgsql-host=172.72.6.34 --pgsql-port=5432 \
37--pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb \
38--table-size=100 --tables=10 --threads=10 \
39--events=999999999 --time=60 prepare
40
41
42
43psql -U postgres -h 192.168.1.35 -p 64320 -d lhrdb
44lhrdb=# \dt
45 List of relations
46 Schema | Name | Type | Owner
47--------+----------+-------+----------
48 public | sbtest1 | table | postgres
49 public | sbtest10 | table | postgres
50 public | sbtest2 | table | postgres
51 public | sbtest3 | table | postgres
52 public | sbtest4 | table | postgres
53 public | sbtest5 | table | postgres
54 public | sbtest6 | table | postgres
55 public | sbtest7 | table | postgres
56 public | sbtest8 | table | postgres
57 public | sbtest9 | table | postgres
58 public | t1 | table | postgres
59 public | t2 | table | postgres
60(12 rows)复制
目标端kafka环境
1docker pull lhrbest/kafka:3.2.0
2
3
4docker rm -f lhrkafka
5docker run -itd --name lhrkafka -h lhrkafka \
6 --net=pg-network --ip 172.72.6.44 \
7 -p 9092:9092 -p 2181:2181 \
8 -v /sys/fs/cgroup:/sys/fs/cgroup \
9 --privileged=true lhrbest/kafka:3.2.0 \
10 /usr/sbin/init
11
12docker exec -it lhrkafka bash
13
14
15-- 启动(默认已启动)
16/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
17/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
18
19[root@lhrkafka /]# jps
20161 QuorumPeerMain
21162 Kafka
221127 Jps
23[root@lhrkafka /]# ps -ef|grep java
24root 161 1 7 14:20 ? 00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties
25root 162 1 30 14:20 ? 00:00:14 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties
26root 1167 961 0 14:20 pts/1 00:00:00 grep --color=auto java
27[root@lhrkafka /]# netstat -tulnp | grep java
28tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 161/java
29tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN 162/java
30tcp 0 0 0.0.0.0:37691 0.0.0.0:* LISTEN 161/java
31tcp 0 0 0.0.0.0:40831 0.0.0.0:* LISTEN 162/java
32tcp 0 0 0.0.0.0:38977 0.0.0.0:* LISTEN 162/java
33tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 162/java复制
kafka默认占用9092端口,ZK默认占用2181端口。
kafka日志:
1tailf /usr/local/kafka/logs/server.log
复制
测试一下,在服务器上创建一个topic为test,然后生产几条信息:
1-- 生产者
2/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
3>hello
4>world
5
6
7
8-- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据
9/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
10hello
11word
12
13
14
15
16-- 查看当前服务器中的所有 topic
17/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092复制
源端OGG for PG微服务环境
1-- OGG机器
2docker pull lhrbest/ogg213mapg:v1.0
3
4docker rm -f lhrogg213mapg
5docker run -d --name lhrogg213mapg -h lhrogg213mapg \
6 --net=pg-network --ip 172.72.6.100 \
7 -p 9391:3389 -p 29000-29005:9000-9005 \
8 -v /sys/fs/cgroup:/sys/fs/cgroup \
9 --privileged=true lhrbest/ogg213mapg:v1.0 \
10 /usr/sbin/init
11
12
13docker exec -it lhrogg213mapg bash
14
15
16
17-- OGGMA
18cat > /ogg213c/ogg_ma/odbc.ini <<"EOF"
19[ODBC Data Sources]
20PGDSN=DataDirect 13 PostgreSQL Wire Protocol
21
22[ODBC]
23IANAAppCodePage=106
24InstallDir=/ogg213c/ogg_ma
25
26[PGDSN]
27Driver=/ogg213c/ogg_ma/lib/GGpsql25.so
28#Driver=/usr/lib64/psqlodbcw.so
29Description=DataDirect 13 PostgreSQL Wire Protocol
30Database=lhrdb
31HostName=172.72.6.34
32PortNumber=5432
33LogonID=postgres
34Password=lhr
35
36EOF
37
38
39su - pg
40adminclient
41CONNECT http://127.0.0.1:9000 deployment deploy213 as oggadmin password lhr复制
访问:http://192.168.1.35:29001 ,用户名:oggadmin,密码:lhr
创建身份证明、添加trandata


目标端OGG for bigdata微服务环境
1docker pull lhrbest/ogg214mabigdata:v1.0
2
3docker rm -f lhrogg214mabigdata
4docker run -d --name lhrogg214mabigdata -h lhrogg214mabigdata \
5 --net=pg-network --ip 172.72.6.101 \
6 -p 9191:3389 -p 9000-9005:9000-9005 \
7 -v /sys/fs/cgroup:/sys/fs/cgroup \
8 --privileged=true lhrbest/ogg214mabigdata:v1.0 \
9 /usr/sbin/init
10
11
12docker exec -it lhrogg214mabigdata bash
13
14
15-- 配置kafka参数
16vi /ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
17gg.handler.kafkahandler.schemaTopicName=LHR_OGG
18
19
20vi /ogg214c/ogg_deploy/etc/conf/ogg/custom_kafka_producer.properties
21bootstrap.servers=172.72.6.44:9092复制
访问:http://192.168.1.35:9001 ,用户名:oggadmin,密码:lhr
全量同步
注意:在此阶段,源端需要停业务,不能产生新数据。
源端创建初始化加载

1EXTRACT ext0
2SETENV(PGCLIENTENCODING = "UTF8")
3SETENV(ODBCINI="/ogg213c/ogg_ma/odbc.ini")
4SOURCEDB PGDSN USERIDALIAS PG1, DOMAIN OGGMA
5EXTFILE ./dirdat/e0 , PURGE
6TABLE public.*;复制

查询报告,说明数据已经传输到目标端了,如下:

进入OS查询:
1[root@lhrogg213mapg /]# cd /ogg213c/ogg_deploy/var/lib/data/dirdat
2[root@lhrogg213mapg dirdat]# ll
3total 272
4-rw-r----- 1 oracle oinstall 278395 Jul 25 09:42 e0000000
5[root@lhrogg213mapg dirdat]# ll -h
6total 272K
7-rw-r----- 1 oracle oinstall 272K Jul 25 09:42 e0000000
8[root@lhrogg213mapg dirdat]#复制
使用scp将文件传递到目标端
1scp /ogg213c/ogg_deploy/var/lib/data/dirdat/e0000000 root@172.72.6.101:/ogg214c/ogg_deploy/var/lib/data/dirdat/
2
3cd /ogg214c/ogg_deploy/var/lib/data/dirdat/
4chown oracle.oinstall -R ./dirdat/复制
目标端kafka数据全量初始化

1REPLICAT rep0
2targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
3end runtime
4map public.*, target public.*;复制

运行完后,自动停止:

全量同步结果检查
1-- 查看所有历史数据
2/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic LHR_OGG --from-beginning
3
4
5
6-- 查看当前服务器中的所有 topic
7/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
8
9
10-- topic详情
11/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic LHR_OGG复制
一张表一个主题,如下:
1[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
2__consumer_offsets
3test
4[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
5LHR_OGG
6__consumer_offsets
7sbtest1
8sbtest10
9sbtest2
10sbtest3
11sbtest4
12sbtest5
13sbtest6
14sbtest7
15sbtest8
16sbtest9
17[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sbtest1 --from-beginning | wc -l
18^CProcessed a total of 100 messages复制
数据已全量同步完成。
增量同步
配置复制槽
在配置PostgreSQL实时同步之前,需要先配置复制槽。
1su - oracle
2adminclient
3CONNECT http://127.0.0.1:9001 deployment deploy213 as oggadmin password lhr
4dblogin useridalias PG1 DOMAIN OGGMA
5REGISTER EXTRACT ext1复制
若不配置复制槽,会报错:OGG-25374
Oracle GoldenGate Capture for PostgreSQL, EXT1.prm: The replication slot 'ext1_eaa1c3d574a94c47' for group 'EXT1' does not exist in the database 'lhrdb'.
过程:
1[root@lhrogg213mapg dirdat]# su - oracle
2Last login: Fri Dec 3 10:58:49 CST 2021 on pts/0
3[oracle@lhrogg213mapg ~]$ adminclient
4Oracle GoldenGate Administration Client for PostgreSQL
5Version 21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047
6
7Copyright (C) 1995, 2021, Oracle and/or its affiliates. All rights reserved.
8
9Oracle Linux 7, x64, 64bit (optimized) on Aug 4 2021 19:52:45
10Operating system character set identified as UTF-8.
11
12OGG (not connected) 1> CONNECT http://127.0.0.1:9001 deployment deploy213 as oggadmin password lhr
13
14OGG (http://127.0.0.1:9001 deploy213) 2> dblogin useridalias PG1 DOMAIN OGGMA
15Successfully logged into database.
16
17OGG (http://127.0.0.1:9001 deploy213 as PG1@lhrdb) 3> REGISTER EXTRACT ext1
182022-07-25T02:19:38Z INFO OGG-25355 Successfully created replication slot 'ext1_eaa1c3d574a94c47' for Extract group 'EXT1' in database 'lhrdb'.
19
20OGG (http://127.0.0.1:9001 deploy213 as PG1@lhrdb) 4>复制
PG端配置


1extract ext1
2SETENV(PGCLIENTENCODING = "UTF8" )
3SETENV(ODBCINI="/ogg213c/ogg_ma/odbc.ini" )
4SOURCEDB PGDSN USERIDALIAS PG1, DOMAIN OGGMA
5exttrail ./dirdat/e1
6IGNOREREPLICATES
7table public.*;复制

源端配置数据分发服务
登陆:http://192.168.1.35:29002

1trail://172.72.6.100:9002/services/v2/sources?trail=./dirdat/e1
2ogg://172.72.6.101:9003/services/v2/targets?trail=./dirdat/e1复制

此时,bigdata会自动添加接收方服务:

文件已传输到目标端:
1[root@lhrogg214mabigdata dirdat]# ll
2total 276
3-rwxrwxrwx 1 oracle oinstall 278395 Jul 25 09:51 e0000000
4-rw-r----- 1 oracle oinstall 1534 Jul 25 10:31 e1000000000
5[root@lhrogg214mabigdata dirdat]# pwd
6/ogg214c/ogg_deploy/var/lib/data/dirdat复制
kafka端应用配置

目标端选项较多,包括:Warehouse、Cassandra、HBase、HDFS、JDBC、Kafka和Kafka Connect等。
1REPLICAT rep1
2targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
3map public.*, target public.*;复制


增量测试
1lhrdb=# insert into t1 values (1),(2);
2INSERT 0 2
3lhrdb=# delete from sbtest1 where id<=1;
4DELETE 1
5lhrdb=#复制
源端:

数据分发:

kafka端:

命令行接收:
1[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic t1
2public.t1I42022-07-25 10:34:36.05799942022-07-25 10:34:40.889000(00000000000000001820
3public.t1I42022-07-25 10:34:36.05799942022-07-25 10:34:41.005000(00000000000000001922复制
可见,数据会增量同步的。
使用kafka manager查看kafka数据
参考:https://www.xmmup.com/kafkatuxingguanligongjucmakkafka-manageranzhuangjishiyong.html
1docker pull registry.cn-hangzhou.aliyuncs.com/lhrbest/kafkamanager_cmak:3.0.0.6
2
3docker rm -f lhrkafkamanager
4docker run -itd --name lhrkafkamanager -h lhrkafkamanager \
5 --net=ora-network --ip 172.72.6.45 \
6 -p 9100:9000 \
7 -v /sys/fs/cgroup:/sys/fs/cgroup \
8 --privileged=true lhrbest/kafkamanager_cmak:3.0.0.6 \
9 /usr/sbin/init
10
11docker exec -it lhrkafkamanager bash
12
13web登陆地址:http://192.168.1.35:9100/复制


总结
1、配置数据分发服务时,需要注意dirdat的位置
2、分发是9002端口,接收是9003端口。
3、若replicate进程启动不报错,但是不应用,检查是否参数文件的owner写错了?