暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Oracle DB实时分析之OGG+Kafka架构(第二弹)

火火日记 2020-11-10
1327

继上周OGG+Kafka架构内容,本周继续完成后半部分(7~11)的开发内容。

系统架构

开发流程

  1. 启动Oracle数据库

  2. Oracle数据库启用归档日志

  3. 创建ggadmin用户

  4. 创建ESHOP Schema

  5. 初始化GoldenGate Classic

  6. 创建GoldenGate Extract

  7. 安装并运行Apache Kafka

  8. 安装GoldenGate for Big Data

  9. 启动GoldenGate for Big Data Manager

  10. 创建Data Pump

  11. 将事务发布到Kafka


7. 安装并运行Apache Kafka

在这一步骤,我们将完成整体架构中如下部分的工作

首先从虚拟机的桌面环境中打开Firefox并下载Apache Kafka(kafka_2.11-2.1.1.tgz)

https://kafka.apache.org/downloads

然后打开一个Linux shell并重置CLASSPATH环境变量(在BigDataLite-4.11虚拟机中设置的当前值会和Kafka产生冲突)

declare -x CLASSPATH=""

复制

切换到Download目录后,解压缩压缩包,启动ZooKeeper和Kafka

cd ~/Downloads
tar zxvf kafka_2.11-2.1.1.tgz
cd kafka_2.11-2.1.1
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/kafka-server-start.sh -daemon config/server.properties

复制

我们可以通过启动echo stats | nc localhost 2181来检查ZooKeeper是否正常

我们也可以通过echo dump | nc localhost 2181 | grep brokers来判断Kafka是否正常启动

8. 安装GoldenGate for Big Data

同理在这一部分,我们需要通过虚拟机的火狐浏览器下载Oracle GoldenGate for Big Data 12c(Oracle GoldenGate for Big Data 12.3.2.1.0),下载时需要Oracle帐户(免费)

https://www.oracle.com/cn/database/technology/goldengate-downloads.html

*假如在Linux环境中下载出现问题时,建议用mac或者Windows下载好Zip文件后,用SCP把文件同步到虚拟机中

下载完毕后,通过如下代码进行安装

cd ~/Downloads
unzip OGG_BigData_Linux_x64_12.3.2.1.0.zip
cd ..
mkdir ogg-bd-poc2
cd ogg-bd-poc2
tar xvf ../Downloads/OGG_BigData_Linux_x64_12.3.2.1.0.tar

复制

就这样,GoldenGate for Big Data 12c被安装在了/home/oracle/ogg-bd-poc2文件夹中

9. 启动GoldenGate for Big Data Manager

在这一步骤,我们将完成整体架构中如下部分的工作

打开GoldenGate for Big Data CLI

cd ~/ogg-bd-poc2
./ggsci

复制

需要更改管理器端口,否则会与之前启动的与GoldenGate (classic)管理器发生冲突

在GoldenGate for Big Data CLI运行如下代码

create subdirs
edit params mgr

复制

在vi编辑界面输入如下内容

PORT 27801

复制

然后保存内容,退出vi,返回CLI,完成GoldenGate for Big Data manager监听端口27081的修改

start mgr

复制

10. 创建Data Pump

在这一步骤,我们将完成整体架构中如下部分的工作

接下来我们需要创建在GoldenGate中创建数据泵(Data pump)。数据泵是一个数据提取进程,它会实时监控源端OGG的trail log,然后实时地将任何更改信息推到目标端OOG实例的Trial log中

按照如下顺序进行数据泵创建

cd /u01/ogg
./ggsci
edit params pmpeshoo

复制

在vi编辑页面中加入以下内容

EXTRACT pmpeshoo
USERID ggadmin,PASSWORD ggadmin
SETENV (ORACLE_SID='orcl')
-- GoldenGate for Big Data address/port:
RMTHOST localhost, MGRPORT 27801
RMTTRAIL ./dirdat/bb
PASSTHRU
-- The "tokens" part it is useful for writing in the Kafka messages
-- the Transaction ID and the database Change Serial Number
TABLE orcl.eshop.*, tokens(txid = @GETENV('TRANSACTION', 'XID'), csn = @GETENV('TRANSACTION', 'CSN'));

复制

保存内容并退出vi


接下来我们要通过GoldenGate CLI注册并启动数据泵

dblogin userid ggadmin password ggadmin

复制

add extract pmpeshoo, exttrailsource ./dirdat/aa begin now
add rmttrail ./dirdat/bb extract pmpeshoo
start pmpeshoo

复制

通过从GoldenGate CLI运行以下命令来检查数据泵的状态,其中view report pmpeshop为查看数据泵日志

info pmpeshoo
view report pmpeshoo

复制

接下来我们可以在源表中插入新的模拟数据,来确认下数据泵是否正常工作

首先登陆数据库

sqlplus eshop/eshop@ORCL

复制

执行下方SQL脚本创建一个新的模拟客户订单数据

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)
VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA03', SYSDATE, 'SHIPPING', SYSTIMESTAMP);


INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)
VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Inside Out', 1);


COMMIT;

复制

可确认源表两行数据已成功插入

在GoldenGate(Classic) CLI运行如下代码

stats pmpeshoo

复制

可确认已成功插入变更记录数据

11. 将事务发布到Kafka

在这一步骤,我们将完成整体架构中如下部分的工作

最后,我们将在GoldenGate中为BigData创建一个replicat process,以便往Kafka Topic中插入实时业务数据。replicat进程会从trail log中读取事务的增删改操作,并将此信息转换为JSON格式后传递给Kafka


下面在/home/oracle/ogg-bd-poc2 /dirprm路径下创建一个名为eshop_kafkaconnect.properties的文件。文件内输入如下代码

# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kafkaconnect.properties
# -----------------------------------------------------------


# address/port of the Kafka broker
bootstrap.servers=localhost:9092
acks=1


#JSON Converter Settings
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false


#Adjust for performance
buffer.memory=33554432
batch.size=16384
linger.ms=0


# This property fix a start-up error as explained by Oracle Support here:
# https://support.oracle.com/knowledge/Middleware/2455697_1.html
converter.type=key

复制

在同一个文件夹中,创建一个名为eshop_kc.props的文件,文件内输入如下代码

# File: /home/oracle/ogg-bd-poc2/dirprm/eshop_kc.props
# ---------------------------------------------------
gg.handlerlist=kafkaconnect


#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=eshop_kafkaconnect.properties
gg.handler.kafkaconnect.mode=tx


#The following selects the topic name based only on the schema name
gg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName}


#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}


#The formatter properties
gg.handler.kafkaconnect.messageFormatting=op
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
gg.handler.kafkaconnect.includeTableName=true
gg.handler.kafkaconnect.includeOpType=true
gg.handler.kafkaconnect.includeOpTimestamp=true
gg.handler.kafkaconnect.includeCurrentTimestamp=true
gg.handler.kafkaconnect.includePosition=true
gg.handler.kafkaconnect.includePrimaryKeys=true
gg.handler.kafkaconnect.includeTokens=true


goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE


gg.log=log4j
gg.log.level=INFO


gg.report.time=30sec


# Apache Kafka Classpath
# Put the path of the "libs" folder inside the Kafka home path
gg.classpath=/home/oracle/Downloads/kafka_2.11-2.1.1/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm

复制

重新打开GoldenGate for Big Data CLI

cd ~/ogg-bd-poc2
./ggsci

复制

开始创建replicat进程

edit params repeshoq

复制

在vi界面输入如下代码

REPLICAT repeshoq
TARGETDB LIBFILE libggjava.so SET property=dirprm/eshop_kc.props
GROUPTRANSOPS 1000
MAP orcl.eshop.*, TARGET orcl.eshop.*;

复制

保存并退出vi

将replicat与trail log(bb)进行关联,并启动replicat进程

add replicat repeshoq, exttrail ./dirdat/bb
start repeshoq

复制

确认replicat是否正常工作,其中view report repeshoq是查看日志的模块

info repeshoq
view report repeshoq

复制

接下来在Oracle 数据库源表中插入数据,确认数据是否实时同步到来kafka Topic中

首先在Linux shell中登陆到数据库

sqlplus eshop/eshop@ORCL

复制

插入数据语句

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)
VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA03', SYSDATE, 'DELIVERED', SYSTIMESTAMP);


INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)
VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Cars 3', 2);


COMMIT;

复制

可以确认数据已成功插入源表中

在GoldenGate for Big Data CLI中确认数据是否同步到了replicat进程中

./ggsci
stats repeshoq

复制

可以看到数据同步是正常的

下面我们可以访问Kafka内部,具体查看是否有一个名为CDC-ESHOP的Topic创建成功

cd ~/Downloads/kafka_2.11-2.1.1/bin
./kafka-topics.sh --list --zookeeper localhost:2181

复制

可以看到已经成功创建了CDC-ESHOP Topic

同时我们可以通过如下代码,访问存储在CDC-ESHOP Topic之内的数据

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning

复制

通过下图我们可以清晰的看到,源库源表中新插入的数据都已经实时的同步到了Kafka队列中

如果现在的输出结果看的不清楚,我们可以通过安装jq来进行查看



sudo yum -y install jq
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning | jq .

复制


至此我们完成了从Oracle Dababase 12C源库数据到Kafka的实时数据同步,后续我们可以用Python、Java或者FlinkSQL写Kafka的消费程序,实时进行数据处理和计算后落到结果表中


谢谢大家!

文章转载自火火日记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论