useradd -u 1400 -d home/kafka kafka
▼▼▼create table ogg.t1 ( ID1 number(8), ID2 number(8), info varchar(10), constraint pk_t1 primary key(ID1) using index );
下载for oracle ogg安装包:
下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html
本环境下载的版本如下:
191004_fbo_ggs_Linux_x64_shiphome.zip

解压,修改安装文件
vi Disk1/responseoggcore.rsp
修改如下参数:
./runInstaller -silent -responseFile ./response/oggcore.rsp
https://www.oracle.com/middleware/technologies/goldengate-downloads.html
本环境下载的版本如下:
OGG_BigData_Linux_x64_19.1.0.0.5.zip

1)下载安装包
https://mirrors.bfsu.edu.cn/apache/zookeeper
本环境下载的版本如下:
apache-zookeeper-3.7.0.tar.gz
2)安装启动
解压安装包,修改配置文件
▼▼▼vi zooper/apache-zookeeper-3.7.0-bin/conf/zoo.cfgtickTime = 2000dataDir = zooper/apache-zookeeper-3.7.0-bin/dataclientPort = 2181initLimit = 5syncLimit = 2/zooper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start$ JMX enabled by default$ Using config: Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg$ Starting zookeeper ... STARTED

显示STARTED后,表示zookeeper已启动完成。
1)下载安装包
下载地址:http://kafka.apache.org/downloads
本环境下载的版本如下:kafka_2.12-2.7.0.tgz
2)安装启动
解压安装包,修改配置文件:
▼▼▼vi kafka/kafka_2.12-2.7.0/config/server.propertiesbroker.id=0listeners=PLAINTEXT://postgresql1:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=postgresql1:2181zookeeper.connection.timeout.ms=18000group.initial.rebalance.delay.ms=0/kafka/kafka_2.12-2.7.0/bin/kafka-server-start.sh kafka/kafka_2.12-2.7.0/config/server.properties
3)创建topic
cd kafka/kafka_2.12-2.7.0/bin
bin目录下:
./kafka-topics.sh --create --zookeeper postgresql1:2181 --replication-factor 1 --partitions 1 --topic cyltopic
参数说明:
–zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样 –replication-factor:指定副本数量 –partitions:指定分区数量 –topic:主题名称
查看topic
./kafka-topics.sh --list --zookeeper postgresql1:2181

创建一个生产者
bin目录下:
./kafka-console-producer.sh --broker-list postgresql1:9092 --topic cyltopic

创建一个消费者
bin目录下:
./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning

1)抽取进程配置
添加抽取进程组:
▼▼▼add extract ext_test, TRANLOG, BEGIN NOWadd exttrail ./dirdat/te, EXTRACT ext_test, MEGABYTES 200
配置参数:
▼▼▼EXTRACT ext_test--Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")USERID ogg, PASSWORD xxxxxxxxxxxxxxxxxxgettruncatesDISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 1024DBOPTIONS ALLOWUNUSEDCOLUMNREPORTCOUNT EVERY 1 MINUTES, RATEFETCHOPTIONS NOUSESNAPSHOTEXTTRAIL ./dirdat/teWILDCARDRESOLVE DYNAMICGETUPDATEBEFORESNOCOMPRESSUPDATESNOCOMPRESSDELETESdynamicresolutiontable ogg.t1;
2)投递进程配置
添加投递进程组
▼▼▼ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/teADD RMTTRAIL ./dirdat/te, EXTRACT dpe_test, MEGABYTES 200
配置参数:
▼▼▼EXTRACT dpe_testPASSTHRURMTHOST 192.168.**.***, MGRPORT 7809RMTTRAIL ./dirdat/teDYNAMICRESOLUTIONTABLE *.*;
▼▼▼
ggsci>edit param defgen内容DEFSFILE dirdef/source.def, PURGEUSERID ogg, PASSWORD xxxxxxxxxxxxxxxxxxTABLE ogg.t1 ;[ogg@postgresql1 ogg2]$ ./defgen paramfile dirprm/defgen.prm --shell命令

将生成的source.def文件拷贝到目标端dirdef目录下
1)配置参数文件
拷贝ogg安装目录下示例参数文件
▼▼▼ls -trl ogg/AdapterExamples/big-data/kafka-rw-r--r--. 1 ogg ogg 332 Jan 15 2020 rkafka.prm-rw-r--r--. 1 ogg ogg 261 Jan 15 2020 custom_kafka_producer.properties-rw-r--r--. 1 ogg ogg 1133 Apr 6 2020 kafka.propscp ogg/AdapterExamples/big-data/kafka/* ogg/dirprm
cd /ogg/dirprm
vi kafka.props
▼▼▼gg.handlerlist = kafkahandlergg.handler.kafkahandler.type = kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties#gg.handler.kafkahandler.TopicName =cyltopicgg.handler.kafkahandler.topicMappingTemplate=cyltopicgg.handler.kafkahandler.format =avro_opgg.handler.kafkahandler.format=delimitedtextgg.handler.kafkahandler.format.fieldDelimiter=|gg.handler.kafkahandler.SchemaTopicName=mycyltopicgg.handler.kafkahandler.BlockingSend =falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.format.pkUpdateHandling=updategg.handler.kafkahandler.mode =op#gg.handler.kafkahandler.maxGroupSize =100, 1Mb#gg.handler.kafkahandler.minGroupSize =50, 500Kb goldengate.userexit.timestamp=utcgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUE gg.log=log4jgg.log.level=INFO gg.report.time=30sec #Sample gg.classpath for Apache Kafkagg.classpath=dirprm/:/kafka/kafka_2.12-2.7.0/libs/*#Sample gg.classpath for HDP#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/* javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
vi custom_kafka_producer.properties
▼▼▼bootstrap.servers=postgresql1:9092acks=1compression.type=gzipreconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializer# 100KB per partitionbatch.size=102400linger.ms=10000
2)创建复制进程
add replicat rep_test, exttrail ./dirdat/te
▼▼▼REPLICAT rep_testTARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.propsSOURCEDEFS dirdef/source.defREPORTCOUNT EVERY 1 MINUTES, RATEGROUPTRANSOPS 10000MAP ogg.*, TARGET ogg.*;
1)源端数据更新
▼▼▼beginfor i in 1..1000loop insert /*+append*/ into ogg.t1 values(i,i,'name2-'||i) ; end loop; commit;end;/
目标端kafka数据查询:
./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning


更多精彩干货分享
点击下方名片关注
IT那活儿





