暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

数据迁移大师:Oracle GoldenGate到Kafka的高效同步策略

1.1 概述

目标:

通过OGG 软件将Oracle数据库表通过给KAFKA 平台,通过KAFKA平台解析JSON文件,在进行大数据库平台展现。

环境:

源端规划

当源端为 Oracle 数据库时,通常的做法是将 OGG 部署于源端的 Oracle 数据库服务器 上;也可以部署到相应的 ADG 环境或者单独的主机(要求与源 数据库的操作系统平台相同)。

目标端规划

当目标端的产品为 O GG for Big Data 产品时,目前官方认证过并支持的平台有如下的四种平台:Linux、Windows、IBM-AIX、Solaries、HP-UNIX 基于稳定性考虑,必须选择 O GG for Big Data 产品安装在其中的一种平台上。

下载方式

高版本:Oracle GoldenGate Downloads 历史版本:http://edelivery.oracle.com/osdc/faces/Home.jspx

1.2 概述

1.2.1 文件系统要求

OGG 的安装和运行需要使用文件系统来存放文件。对文件系统的要求主要有:

使用专用的文件系统,比如挂载到 ogg 目录下,通过考虑高可用可以选用集中存储。

文件系统的容量主要与所要复制的数据量相关。如果不确定要复制的数据量,建议从 1 6GB 到 32GB 起步。

1.2.2 源端到目标端主机的网络连接

OGG 的数据复制操作要求源端主机和目标端主机之间的网络是连通的,未必必须放开特定端口的防火墙限制(如果有防火墙限制)。当前的要求是:打开源与目标主机之间的 7890 – 7850 端口的防火墙限制。

1.2.3 创建操作系统用户

建议将 OGG 产品安装以及运行于专用的操作系统用户,这样可以提高维护的安全性。可以通过下面的步骤来创建操作系统用户 ogg 。

- - 对于没有 O racle 数据库的环境(本文即 K afka 环境)

> Linux:

#groupadd oinstall

# useradd -g oinstall -G oinstall  -d /home/ogg ogg

复制

1.3 在源端(Oracle库, Linux 平台)安装 OGG

1.3.1 安装 J DK 1.8

OGG for Big Data 的运行需要 J AVA 1. 8 ,可以安装 J DK 1.8 或者 J RE 1.8 , JDK 1.7 不满足要求,考虑到现有环境对JDK1.7 的依赖考虑,我们在ogg用户下面单独配置一个JDK1.8的环境变量:

[ogg@rac2 soft]$ ls -lrt
总用量568456
drwxr-xr-x 3 ogg oinstall      40964月162024 fbo_ggs_Linux_x64_shiphome
-rw-r--r--1 ogg oinstall      13965月102024 OGG-12.3.0.1.4-README.txt
-rw-r--r--1 ogg oinstall    2935665月102024 OGG_WinUnix_Rel_Notes_12.3.0.1.4.pdf
-rw-r--r--1 ogg oinstall 1467298277月1701:08 jdk-8u421-linux-x64.tar.gz
-rw-r--r--1 ogg oinstall 3398376117月1720:00 V975837-01.zip
-rw-r--r--1 ogg oinstall  952121747月1720:00 V979723-01.zip
[ogg@rac2 soft]$cd /ogg/jdk1.8
[ogg@rac2 jdk1.8]$ pwd
/ogg/jdk1.8

[ogg@rac2 ~]$ cat .bash_profile
ORACLE_SID=rac2;export ORACLE_SID
ORACLE_BASE=/u01/app/oracle;export ORACLE_BASE
ORACLE_HOME=$ORACLE_BASE/product/11.2.0/db_1;export ORACLE_HOME

export PATH
export JAVA_HOME=/ogg/jdk1.8
export JRE_HOME=/ogg/jdk1.8/jre
export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$ORACLE_HOME/bin:$ORACLE_HOME/OPatch:$PATH
LD_LIBRARY_PATH=$ORACLE_HOME/lib
LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$ORACLE_HOME/oracm/lib
LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/lib:/usr/lib:/usr/local/lib
export LD_LIBRARY_PATH
[ogg@rac2 ~]$ java -version
java version "1.8.0_421"
Java(TM) SE RuntimeEnvironment(build 1.8.0_421-b09)
JavaHotSpot(TM)64-BitServer VM (build 25.421-b09, mixed mode)
[ogg@rac2 ~]$

复制

1. 4 在目标端( Kafka , L inux 平台)安装 OGG for Big Data

1.4.1 安装 JDK 1.8

方法按照上述描述进行操作,可以解决同一系统对软件多版本的依赖

1.4.2 安装 OGG for Big Data 产品

说明:对于低版本 的 OGG f or Big Data 产品,我们可以通过解压缩安装的方式,不需要使用 OUI 以及 G lobal Inventory 机制,也没有 opatch 命令(比如通过 opatch 列出所安装的单个补丁)。首先使用 ogg 用户创建存放 O GG for Big Data 产品的目录。最好部署在一个独立的文件系统上,具有至少 16GB 以上的可用存储空间。

cd  /data/ogg
mkdir ogg_home1

复制

这里通过将 OGG for Big Data 补丁包进行解压缩的方式,即完成软件的安装。该补丁包的应用,使 OGG 的版本为 12.3.2.1.6 (Build 001) 。

c d  /data/ogg/ogg_home
unzip V979723-01.zip
tar xvf  OGG_BigData_Linux_x64_12.3.2.1.1.tar

复制

为 ogg 用户设置环境变量,以满足运行 OGG 的要求.

[ogg@node3 ogg_home]$ ./ggsci
./ggsci: error while loading shared libraries: libjvm.so: cannot open shared object file:No such file or directory
[ogg@node3 ogg_home]$ ldd ggsci
        linux-vdso.so.1=>(0x00007ffcd7dd3000)
        librt.so.1=>/lib64/librt.so.1(0x00007fc6e1ffb000)
        libdl.so.2=>/lib64/libdl.so.2(0x00007fc6e1df3000)
        libgglog.so =>/data/ogg/ogg_home/./libgglog.so (0x00007fc6e19a3000)
        libggutil.so =>/data/ogg/ogg_home/./libggutil.so (0x00007fc6e177b000)
        libggrepo.so =>/data/ogg/ogg_home/./libggrepo.so (0x00007fc6e148b000)
        libdb-6.1.so =>/data/ogg/ogg_home/./libdb-6.1.so (0x00007fc6e1073000)
        liblmdb.so =>/data/ogg/ogg_home/./liblmdb.so (0x00007fc6e0e5b000)
        libggperf.so =>/data/ogg/ogg_home/./libggperf.so (0x00007fc6e0c1b000)
        libggparam.so =>/data/ogg/ogg_home/./libggparam.so (0x00007fc6df3f3000)
        libicui18n.so.56=>/data/ogg/ogg_home/./libicui18n.so.56(0x00007fc6def2b000)
        libicuuc.so.56=>/data/ogg/ogg_home/./libicuuc.so.56(0x00007fc6deb2b000)
        libicudata.so.56=>/data/ogg/ogg_home/./libicudata.so.56(0x00007fc6dccc3000)
        libpthread.so.0=>/lib64/libpthread.so.0(0x00007fc6dcaa3000)
        libxerces-c-3.1.so =>/data/ogg/ogg_home/./libxerces-c-3.1.so (0x00007fc6dc413000)
        libantlr3c.so =>/data/ogg/ogg_home/./libantlr3c.so (0x00007fc6dc1f3000)
        libjvm.so =>not found
        libggnnzitp.so =>/data/ogg/ogg_home/./libggnnzitp.so (0x00007fc6db77b000)
        libm.so.6=>/lib64/libm.so.6(0x00007fc6db473000)
        libc.so.6=>/lib64/libc.so.6(0x00007fc6db0a3000)
/lib64/ld-linux-x86-64.so.2(0x00007fc6e2203000)
        libnsl.so.1=>/lib64/libnsl.so.1(0x00007fc6dae83000)
        libstdc++.so.6=>/lib64/libstdc++.so.6(0x00007fc6dab7b000)
        libgcc_s.so.1=>/lib64/libgcc_s.so.1(0x00007fc6da963000)

复制

在使用 JDK 目录设置了 LD_LIBRARY_PATH 环境变量后,要检查该环境变量指向的目录中是否具有以下两个库文件,这两个文件为 OGG 运行所必须:

cd ~
v i .bash_profile
# For JDK 1.8
export JAVA_HOME=/data/jdk1.8
export PATH=$JAVA_HOME/bin:$PATH
export LD_LIBRARY_PATH=/usr/java/jdk1.8.0_192-amd64/jre/lib/amd64/server
export OGG_HOME=/data/ogg/ogg_home

复制

退出 ogg 用户当前的 ssh 会话并重新连接,使环境变量的设置生效,随后运行 ggsci 命令,确认可以正常执行:

[ogg@node3 ~]$ cd /data/ogg/ogg_home/
[ogg@node3 ogg_home]$ ./ggsci
OracleGoldenGateforBigData
Version12.3.2.1.1(Build005)
OracleGoldenGateCommandInterpreter
Version12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
Linux, x64,64bit(optimized),Generic on Jul13201800:46:09
Operating system character set identified as UTF-8.
Copyright(C)1995,2018,Oracleand/or its affiliates.All rights reserved.

复制

1.5 在源库( O racle 数据库)的检查与配置

为满足 OGG 的工作条件,必须对源数据库进行必要的设置。本节将检查相关的配置项,并对其中不满足的进行设置。

1.5.1 在数据库上启用对 O GG 的支持

对于 Oracle 11.2.0.4 以及更高版本,必须显式设数据库以启用对 OGG 的支持,这个设置要求对于抽取和复制的全部模式(经典和集成)都是必须的。

SQL>alter system set enable_goldengate_replication = true scope = both sid = '*';
SQL>show parameter enable_goldengate_replication

复制

1.5.2 数据库的归档模式与附加日志

OGG 要求数据库必须运行于归档日志模式,并且启用附加日志的支持。以下用来检查数据库的归档模式以及附加日志的配置情况。说明:Oracle 官方推荐在数据库级别启用 F orce logging 模式。为减少全库级别启用 F orce logging 后一些不必要的 Redo 日志量,本文的做法是在表空间级别启用 Force logging 。【可选操作】在数据库级别启用强制日志 如果 上面 “ force_logging ”字段的返回值为“ YES ”,则已经启用 , 否则执行 下面的 SQL :

SQL> alter database force logging;
--重新执行上面的检查SQL,确认返回值为“YES”。
回退操作
SQL> alter database no force logging;
【必选操作】在数据库级别打开最小附加日志
如果上面“ supplemental_log_data_min ”字段的返回值为“ YES ”,则已经启用,否则执行:
SQL> alter database add supplemental log data;
--如果如果出现行锁,可以取消重新自行
--在全部R AC 节点上执行日志文件的切换
SQL>  alter system switch logfile;
--重新执行上面的检查SQL,确认返回值为“YES”。
回退操作
SQL>  alter database drop  supplemental log data;

复制

1.5.3 创建用于运行 OGG 的数据库用户

OGG 的运行需要一个数据库用户, O GG 通过该用户建立到数据库的连接,通常采用一个专用的用户。创建 OGG 需要的数据库用户 执行下面的 SQL 以创建 OGG 的数据库用户 ogg (也可以使用其它名称)。

- -(可选)为 o gg 用户创建专用的表空间:
SQL>  create tablespace oggtbs datafile '+ASM'size 30G autoextend off;
SQL>  create user ogg identified by"xxxxx"default tablespace ts_ogg_data;
--检查并设置密码修改策略,使得口令不需要经常修改:
SQL>  alter user ogg profile PROFILE_APP;
--赋予 ogg 必须的基本权限以及运行 O GG 需要的其它权限:
SQL>  grant connect, resource ,unlimited tablespace to ogg ;
SQL>  grant create session   to ogg ;
SQL>  grant alter system,select any dictionary to ogg;
SQL>  execute dbms_goldengate_auth.grant_admin_privilege('ogg'   );

复制

1.5.4 表空间的 F orce Logging

建议将源端所有需要数据复制的表存放在单独的表空间上,并为这些表空间启用 force logging 。启用业务数据表空间的 F orce Logging 执行下面的 SQL 以创建 OGG 的数据库用户 ogg (也可以使用其它名称)。

1.6 在源端( Oracle 数据库)配置 OGG

1.6.1 创建登录数据库的用户证书

将 OGG 登录数据库的用户名、口令等以证书的形式保存到系统中,这样在配置文件以及执行管理任务时可以不需要输入口令,从而简化系统的配置,增加系统的安全性。创建用户证书 执行下面的命令。

GGSCI (rac2) 13>  add credentialstore

GGSCI (rac2) 13>  alter CredentialStore add user ogg, password xxxxx alias user_ogg
Credential store altered.

GGSCI (rac2) 13> dblogin useridalias user_ogg;
Successfully logged into database.

复制

1.6.2 配置并启动 O GG m gr 进程

配置并启动 OGG 的 m anager 进程。

GGSCI (rac2)14> edit param mgr

PORT 7809

dynamicportlist 7810-7820

userid ogg@db_ogg,password ogg

autorestart er *,retries 5,waitminutes 3

purgeoldextracts ./dirdat/*,usecheckpoints,minkeepdays 3

lagreporthours 2

laginfominutes 30

lagcriticalminutes 45

GGSCI (rac2) 15> start mgr
Manager started.

GGSCI (rac2) 17> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING

复制

1.6.4 对复制表执行附加日志设置

对于 UPDATE 操作, OGG 的数据复制机制要求必须将相关行的主键(或者唯一索引字段)的值记录到 Redo 中,即使这些字段的值在 U PDATE 操作中没有被更改。对于源端任何一条数据的 UPDATE 操作, OGG 会使用主键字段的值作为 W HERE 条件,在目标端对一行记录执行修改,这样可以避免错误的更新多行记录。为实现上述目标,必须对复制表进行附加日志的设置,从而使 Oracle 数据库可以将主键以及唯一字段的值也写入到 Redo 中。本节将执行这个操作,这是通过 OGG 的 add trandata 命令完成的。为所有需要进行复制的表执行 add trandata 命令。

GGSCI (rac2) 18>dblogin UserIdAlias user_ogg
add trandata owner.table_name

检查表的 SUPPLEMENTAL_LOG是否开启:

info trandata owner.table_name

复制

1.6.5 配置抽取进程

抽取进程负责从源库的 Redo 中将变更的数据捕获并写入到 Trail 文件中。创建抽取进程 在源端创建抽取进程 e_kafka1 。

register extract e_kafka1 database

add extract e_kafka1, integrated tranlog, begin now

add exttrail ./dirdat/k1, extract e_kafka1, MegaBytes 256

复制

编辑抽取进程的参数,将参数信息将保存于 ./dirprm/ e_kafka1 .prm 文件中。

edit params e _ kafka1
Extract e_kafka1

UserIDAlias user_ogg

DiscardFile./dirrpt/e_kafka1.dsc,Append,MegaBytes128

ExtTrail./dirdat/k1

DBOptionsAllowUnusedColumn

GetTruncates

LogAllSupCols

UpdateRecordFormat compact

WarnLongTrans2h,CheckInterval300s

TranLogOptionsIntegratedParam( max_sga_size 1024,Parallelism2)

obey ./dirprm/tabmap;


vi  ./dirprm/tabmap

table owner.table_name;

ggsci>   start *

复制

可以使用 info all 检查 e_ kafka1 , p_ kafka1 进程状态是否为 RUNNING

如果状态不是 RUNNING ,则可以使用以下命令查看报错原因,再根据错误原因具体分析和处理:

ggsci > view report e _ kafka1
复制

1.6.6 配置传输进程

在源端创建数据泵进程 pump1 :

add extract p _ kafka1, exttrailsource ./dirdat/k1

add rmttrail ./dirdat/k1, extract p _ kafka1, Megabytes 2 56

复制

用来将 trail 文件的存放位置修改在 NAS目录上:

alter pump1 exttrailsource /nasdir/zhzy/ogg/dirdat/k1
复制

编辑 pump1 进程参数设置,配置信息将保存于 ./dirprm/pump1.prm 文件中。

ggsci>   edit params p _ kafka1
复制

pump1 进程的参数配置如下:

Extract p _ kafka1

Passthru

RmtHost xx.xx.xx.xx, MgrPort 7809

RmtTrail ./dirdat/ k 1

 

obey ./dirprm/tabmap;

复制

1.6.7 启动抽取和传输进程

在源端启动数据抽取进程和数据泵进程:

ggsci>   start *
复制

使用 stats 命令对抽取和传输进行进行检查:

stats ext1

stats pump1

复制

1.7 在目标端( Kafka )的检查与配置

1.7.1 配置并启动 OGG mgr 进程

配置并启动 OGG 的 manager 进程。

./ggsci
create subdirs

edit param mgr


Port7809

DynamicPortList7810-7850

--PurgeOldExtracts./dirdat/*, UseCheckpoints, MinKeepDays 7

-- AutoRestart ER *, Retries 5, WaitMinutes 10, ResetMinutes 60

LagInfoMinutes 3

LagCriticalMinutes 30

LagReportHours 1


start mgr

复制

1.1.2 配置并启动 OGG 复制进程

配置并启动 OGG 的复制进程。

cd $OGG_HOME

ggsci

add replicat r_kafka1 exttrail ./dirdat/k1

复制

编辑 r```_kafka1 进程的参数文件:

edit param r_kafka1



Replicat r_kafka1

GetEnv(JAVA_HOME)

GetEnv(CLASSPATH)

GetEnv(PATH)

GetEnv(LD_LIBRARY_PATH)

TargetDBLibFile libggjava.so Setproperty=dirprm/kafka_handler1.props

ReportCountEvery1Minutes,Rate

GroupTransOps10000



Map*.*,Target *.*;

复制

编辑复制进程 r_kafka1 所使用的 Kafka Handler 的属性文件:

将安装文件中自带的示例配置文件拷贝到 d irprm 目录,并在此基础上执行修改

cd $OGG_HOME/AdapterExamples/big-data/kafka

cp kafka.props $OGG_HOME/dirprm/kafka_handler1 .props

cp custom_kafka_producer.properties $OGG_HOME/dirprm/kafka_producer1.props

cd $OGG_HOME/dirprm



v i kafka_handler1.props

gg.handlerlist            = kafkahandler

gg.handler.kafkahandler.type    = kafka

# The following selects the message key using the concatenated primary keys

gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}

gg.handler.kafkahandler.BlockingSend=false

gg.handler.kafkahandler.includeTokens=false

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=TRACE

gg.report.time=30sec

javawriter.bootoptions=-Xmx512m-Xms32m-Djava.class.path=ggjava/ggjava.jar

gg.handler.kafkahandler.KafkaProducerConfigFile= kafka_producer1.props

gg.classpath  = dirprm/:/usr/lib/kafka/libs/*

gg.handler.kafkahandler.mode             = op

gg.handler.kafkahandler.topicMappingTemplate    = test_illegal

gg.handler.kafkahandler.format            = json

复制

根据情况决定是否 启用 trace

gg.log.level = trace
复制

编辑 Kafka Handler 所使用的连接 Kafka 相关的属性文件 配置文件 custom_kafka_producer.properties 中指定了连接到 K afka 的配置:

cd dirprm

vi kafka1_producer.properties

acks=1

reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# 100KB per partition

batch.size=16384

linger.ms=0

## Added by ZhangHua

bootstrap.servers   = xxx.xxx.xxx.xxx:9000, xxx.xxx.xxx.xxx:9000, xxx.xxx.xxx.xxx:9000, xxx.xxx.xxx.xxx:9000

max.request.size    =5024000

send.buffer.bytes   = 5024000

复制

启动复制进程

start r_kafka1
复制

可以使用 info all 检查复制进程的状态是否为 RUNNING ;如果状态不是 RUNNING ,则可以使用以下命令查看报错原因:

view report r_kafka1
复制

再根据错误原因具体分析和处理。

相关学习链接:

GGSCI: Unable To Open Credential Store. Error Code 29,231 After Applying RDBMS APR-2021 Patch While Using FIPS. (Doc ID 2782150.1)

Replicat Abends With OGG-15051 Java Or JNI Exception: When Using Coludera Ggclasspath And Error creating bean with name 'userExitDataSource' defined in class path resource (Doc ID 2802307.1)


文章转载自小周的数据库进阶之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论