暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

OGG实现Oracle to kafka数据同步

IT那活儿 2021-06-24
2113
1. 创建ogg账号,用来部署ogg软件
useradd -u 1300 -d home/ogg ogg
usermod -a -G oinstall ogg
2. 创建kafka账号部署zooper和kafka

useradd -u 1400 -d home/kafka kafka

3.  源端oracle数据库创建测试表

create table ogg.t1        ( ID1 number(8),          ID2 number(8),          info varchar(10),          constraint pk_t1 primary key(ID1) using index        );

4.  源端ogg安装

下载for oracle ogg安装包:

下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html

本环境下载的版本如下:

191004_fbo_ggs_Linux_x64_shiphome.zip

解压,修改安装文件

vi Disk1/responseoggcore.rsp

修改如下参数:

INSTALL_OPTION=ORA12c    ---数据库版本
SOFTWARE_LOCATION=/ogg2  --安装目录
START_MANAGER=NO
UNIX_GROUP_NAME=oinstall
静默安装:

./runInstaller  -silent -responseFile ./response/oggcore.rsp

5.  目标端ogg安装

https://www.oracle.com/middleware/technologies/goldengate-downloads.html

本环境下载的版本如下:

OGG_BigData_Linux_x64_19.1.0.0.5.zip

6.  安装zookeeper

1)下载安装包

下载地址:

https://mirrors.bfsu.edu.cn/apache/zookeeper

本环境下载的版本如下:

apache-zookeeper-3.7.0.tar.gz

2)安装启动

解压安装包,修改配置文件


vi zooper/apache-zookeeper-3.7.0-bin/conf/zoo.cfgtickTime = 2000dataDir = zooper/apache-zookeeper-3.7.0-bin/dataclientPort = 2181initLimit = 5syncLimit = 2/zooper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start$ JMX enabled by default$ Using config: Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg$ Starting zookeeper ... STARTED

显示STARTED后,表示zookeeper已启动完成。

7.  安装kafka

1)下载安装包

下载地址:http://kafka.apache.org/downloads

本环境下载的版本如下:kafka_2.12-2.7.0.tgz

2)安装启动

解压安装包,修改配置文件:


vi kafka/kafka_2.12-2.7.0/config/server.propertiesbroker.id=0listeners=PLAINTEXT://postgresql1:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=postgresql1:2181zookeeper.connection.timeout.ms=18000group.initial.rebalance.delay.ms=0/kafka/kafka_2.12-2.7.0/bin/kafka-server-start.sh kafka/kafka_2.12-2.7.0/config/server.properties

3)创建topic

cd kafka/kafka_2.12-2.7.0/bin

bin目录下:

./kafka-topics.sh --create --zookeeper postgresql1:2181 --replication-factor 1 --partitions 1 --topic cyltopic

参数说明:

–zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样 –replication-factor:指定副本数量 –partitions:指定分区数量 –topic:主题名称

查看topic

./kafka-topics.sh --list --zookeeper postgresql1:2181

创建一个生产者

bin目录下:

./kafka-console-producer.sh --broker-list postgresql1:9092 --topic cyltopic

创建一个消费者

bin目录下:

./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning

8.  源端ogg配置

1)抽取进程配置

添加抽取进程组:


add extract ext_test, TRANLOG, BEGIN NOWadd exttrail ./dirdat/te, EXTRACT ext_test, MEGABYTES 200

配置参数:


EXTRACT ext_test--Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")USERID ogg, PASSWORD xxxxxxxxxxxxxxxxxxgettruncatesDISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 1024DBOPTIONS ALLOWUNUSEDCOLUMNREPORTCOUNT EVERY 1 MINUTES, RATEFETCHOPTIONS NOUSESNAPSHOTEXTTRAIL ./dirdat/teWILDCARDRESOLVE DYNAMICGETUPDATEBEFORESNOCOMPRESSUPDATESNOCOMPRESSDELETESdynamicresolutiontable ogg.t1;

2)投递进程配置

添加投递进程组


ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/teADD RMTTRAIL ./dirdat/te, EXTRACT dpe_test, MEGABYTES 200

置参数:


EXTRACT dpe_testPASSTHRURMTHOST 192.168.**.***, MGRPORT 7809RMTTRAIL ./dirdat/teDYNAMICRESOLUTIONTABLE *.*;

9.  def文件生成

ggsci>edit param defgen内容DEFSFILE dirdef/source.def, PURGEUSERID ogg, PASSWORD xxxxxxxxxxxxxxxxxxTABLE ogg.t1 ;[ogg@postgresql1 ogg2]$ ./defgen paramfile dirprm/defgen.prm --shell命令

将生成的source.def文件拷贝到目标端dirdef目录下

10.  目标端ogg配置

1)配置参数文件

拷贝ogg安装目录下示例参数文件


ls -trl ogg/AdapterExamples/big-data/kafka-rw-r--r--. 1 ogg ogg 332 Jan 15 2020 rkafka.prm-rw-r--r--. 1 ogg ogg 261 Jan 15 2020 custom_kafka_producer.properties-rw-r--r--. 1 ogg ogg 1133 Apr 6 2020 kafka.propscp ogg/AdapterExamples/big-data/kafka/* ogg/dirprm

cd /ogg/dirprm

vi kafka.props


gg.handlerlist = kafkahandlergg.handler.kafkahandler.type = kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties#gg.handler.kafkahandler.TopicName =cyltopicgg.handler.kafkahandler.topicMappingTemplate=cyltopicgg.handler.kafkahandler.format =avro_opgg.handler.kafkahandler.format=delimitedtextgg.handler.kafkahandler.format.fieldDelimiter=|gg.handler.kafkahandler.SchemaTopicName=mycyltopicgg.handler.kafkahandler.BlockingSend =falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.format.pkUpdateHandling=updategg.handler.kafkahandler.mode =op#gg.handler.kafkahandler.maxGroupSize =100, 1Mb#gg.handler.kafkahandler.minGroupSize =50, 500Kb  goldengate.userexit.timestamp=utcgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUE gg.log=log4jgg.log.level=INFO gg.report.time=30sec #Sample gg.classpath for Apache Kafkagg.classpath=dirprm/:/kafka/kafka_2.12-2.7.0/libs/*#Sample gg.classpath for HDP#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/* javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

vi custom_kafka_producer.properties


bootstrap.servers=postgresql1:9092acks=1compression.type=gzipreconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializer# 100KB per partitionbatch.size=102400linger.ms=10000

2)创建复制进程

add replicat rep_test, exttrail ./dirdat/te


REPLICAT rep_testTARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.propsSOURCEDEFS dirdef/source.defREPORTCOUNT EVERY 1 MINUTES, RATEGROUPTRANSOPS 10000MAP ogg.*, TARGET ogg.*;


11.  数据测试

1)源端数据更新


beginfor i in 1..1000loop   insert /*+append*/ into ogg.t1 values(i,i,'name2-'||i) ;   end loop;   commit;end;/

目标端kafka数据查询:

./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning

END


更多精彩干货分享

点击下方名片关注

IT那活儿

文章转载自IT那活儿,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论