暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

5分钟搞定主流关系型数据库到 Kudu 实时数据同步-CloudCanal 实战

ClouGence 2021-10-11
973

简述

Kudu 是 Cloudera 开源的新型列式存储系统,对实时数据分析支撑良好,同时具备实时 增删改 能力,也是 Apache Hadoop 生态圈成员。

本文主要介绍 CloudCanal 迁移同步关系型数据库(MySQL、Oracle、PostgreSQL) 数据到 Kudu 的能力, 从技术方案和使用角度介绍 CloudCanal 如何达成此类能力。

技术点

可选择的方案

市面上主流迁移同步数据到 Kudu 有三种选择

  • 依赖 SQL 层

  • 选用类似 DataX 、Canal 这类开源中间件,同时 Kudu 搭建上层 SQL 引擎如 Hive、Impala ,构建数据迁移同步系统

  • Kafka Connector

  • 选用 Kafka + Flume + Kudu 实现数据同步

  • 直连

  • 如 StreamSets 直接将数据从源端数据库写入目标端 Kudu 存储

各方案特点

  • 依赖 SQL 层

  • Hive 基于 MapReduce 架构,基本上很难实时同步,配合 DataX 可进行数据迁移, Impala 基于 MPP 架构,配合 DataX 和 Canal (需要自主消费) 可实现迁移和准实时同步

  • Kafka Connector

  • 具备数据堆积能力,适合实时数据中转和攒批写入

  • 直连

  • 路径短,依赖少,实时性好,适合在线场景

CloudCanal 的直连实现

  • 建表

List<ColumnSchema> columns = new ArrayList(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add("key");
Schema schema = new Schema(columns);
client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys));

复制

    Kudu 由于存储引擎限制,每张表必须要指定 Partition Column。

    被设置为 Partition 的列不允许 update,如若修改 Partition Column 列的值。需要删除整行数据重新插入。

  • 数据写入

  • Kudu 数据写入 Kudu 支持 InsertUpdateDeleteUpsert 四种操作。

KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
session.setTimeoutMillis(60000);
for (int i = 0; i < 3; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addInt(0, i);
row.addString(1, "value " + i);
session.apply(insert);
}
session.flush();
session.close();

复制

    Kudu 1.6 之前,不支持 Decimal。因此源端数据类型如果是浮点数只可以选择:float、double、int、string 来承载。

    Kudu 1.15 开始,提供了 varchar 类型可以与 数据库的 varchar 相互对应。在此之前应当选择 string 类型。

举个"栗子"

准备 CLOUDCANAL

  • SAAS 版参考自建机器客户端安装文章

    (https://doc.clougence.com/docs/en/add_worker_self_maintain)

  • 社区版参考文章 docker 安装文章(https://www.askcug.com/topic/75)

添加数据源

  • 登录 CloudCanal 平台

  • 数据源管理 -> 添加数据源

  • 选择 自建数据源 ,并填写相关数据库信息,其中 网络地址 请按提示带上端口号

    • kudu-client 默认访问的是 7051 端口,并非提供 Web 界面的 8051

    • 如果 Kudu 是集群化部署,那么在配置网络地址时需要填写集群 ip:port 列表用 英文逗号 间隔

创建任务

  • 任务管理管理 -> 创建任务

  • 高级选项要求 Kudu 至少 3 副本, 可根据 Kudu 集群情况来修改对应配置

  • 点击 下一步


  • 选择 数据同步

  • 点击 下一步


  • 选择要同步的表和操作

  • 点击 下一步


  • 确认创建


  • 任务自动进行结构迁移、全量迁移、增量同步,稳定运行

  • 结构迁移跑完之后,Kudu 控制台可看到对应表


  • 使用 Impala 来查询位于 Kudu 中的数据,外表建表语句如下,创建完毕之后使用正常 SQL 语句查询即可

CREATE EXTERNAL TABLE `canal_test_case_column_default` STORED AS KUDU
TBLPROPERTIES(
'kudu.table_name' = 'my-j52hri3880d6dka.canal_test_case.column_default',
'kudu.master_addresses' = '192.168.0.254:7051')

复制
  • 增量同步过程中对源端数据进行增删改,数据可实时同步到 Kudu

能力和限制

  • 支持 MySQL、PostgreSQL、Oracle 作为源端到 Kudu 的同步

  • 支持主键变更同步,转换为 Kudu 删除在插入

  • 对端 Kudu 要求 1.6 版本

  • 不支持源端 无主键表

  • 不支持 DDL 增量同步

总结

本文简单介绍了 CloudCanal 如何将主流关系型数据库迁移同步数据到 Kudu 的能力。各位小伙伴,如果觉得还不错,请点赞、评论加转发吧。

更多精彩

社区快讯

  • 我们创建 CloudCanal 微信粉丝群啦,在里面,你可以得到最新版本发布信息和资源链接,你能看到其他用户一手评测、使用情况,你更能得到热情的问题解答,当然你还可以给我们提需求和问题。快快加入吧。


  • 扫描下方二维码,添加我们小助手微信拉您进群,接头语(“CloudCanal yyds”)

文章转载自ClouGence,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论