本文将通过一个简单的例子分享一下如何使用 DataX 数据同步工具做 OceanBase 数据库的读取和写入。DataX 是一款由阿里开源并被广泛使用的异构数据同步工具,开发技术栈主要是 Python 和 Java,DataX 的性能和灵活性已经在各种场景下得到了广泛的验证,大家可以在项目上放心使用。
1. 如何获取 DataX
您可以访问 https://github.com/alibaba/DataX 获取已经构建好的发布包,也可以下载源码自行构建,DataX 采用插件体系,可以通过自行扩展插件来适配自己的数据库产品。这里强烈建议大家从 Github 下载最新的 DataX,重新构建最新版本,因为一些从其他途径获得的预编译版本,进行 OceanBase 同步测试时会出现 writer multipoints 错误。
DataX 默认已经提供了 OceanBase 的读写插件,大家可以通过以下路径查询插件 jar 包是否存在。
如果您想只测试 OceanBase,可以通过以下步骤做一个快速打包,只编译 OB 的 reader 和 writer。
-- 注销 pom.xml 中 reader 和 writer 部分,跟 OceanBase 无关的插件 -- 执行如下命令,编译打包 mvn -U clean package assembly:assembly -Dmaven.test.skip=true -- 生成的 datax 文件在 target 目录下,可以直接运行测试使用
2. OceanBase 同步环境准备
为了方便测试,我们需要提前安装好一套 OceanBase 数据库,并创建如下测试表:
--创建测试源端表
obclient [oceanbase]> use test;
Database changed
obclient [test]> create table t_in(id int,name text);
Query OK, 0 rows affected (0.079 sec)
--创建测试目标端表
obclient [test]> create table t_out(id int,name text);
Query OK, 0 rows affected (0.040 sec)
--插入若干测试数据
obclient [test]> insert into t_in values(1,'a'),(2,'b'),(3,'c');
Query OK, 3 rows affected (0.013 sec)
Records: 3 Duplicates: 0 Warnings: 0
......部分步骤省略
obclient [test]> insert into t_in select * from t_in;
Query OK, 96 rows affected (0.002 sec)
Records: 96 Duplicates: 0 Warnings: 0
obclient [test]> select count(*) from t_in;
+----------+
| count(*) |
+----------+
| 192 |
+----------+
1 row in set (0.001 sec)
3. DataX 同步配置
因为 DataX 不提供图形化界面,所以我们只能通过编写 json 文件,使用 python 调用执行。下面是根据我的当前配置编写的 json 配置文件,供大家参考:
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "oceanbasev10reader",
"parameter": {
"username": "root",
"password": "observer",
"column": [
"*"
],
"connection": [
{
"table": [
"t_in"
],
"jdbcUrl": ["jdbc:oceanbase://10.211.55.84:2881/test"]
}
]
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": [
"*"
],
"preSql": [
"truncate table t_out"
],
"connection": [
{
"jdbcUrl": "jdbc:oceanbase://10.211.55.84:2881/test",
"table": [
"t_out"
]
}
],
"username": "root",
"password":"observer",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
4. 执行同步,查看结果
我们将上面的 json 存入文件,例如 ob.json,然后通过如下命令方式调用:
$ python datax.py obt.json
正常情况下,执行成功会返回如下信息:
5. 写在最后
本文整体篇幅不长,但是如果要成功完成这个示例,大家一定要仔细看我上面提到的注意点,比如:
- 尽量选用最新的 DataX 软件,旧版本 writer 存在问题同步可能报
datax java.lang.NumberFormatException: multiple points
错误 - DataX 配置文件中的 password 不允许为空,如果你的 OB 初始化后没有做密码配置,请使用语句
ALTER USER 'username' IDENTIFIED BY 'password';
重置一下密码
DataX 和 OceanBase 都是阿里系的产品,整体来看适配做的特别好,同步性能上也比较可观,希望我的分享能够帮助到大家,谢谢。