暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

18 行代码搞定异构数据源同步,以 OpenMLDB 为例

SeaTunnel 2023-02-23
1028

点击蓝字 关注我们

Apache SeaTunnel PPMC & Committer 田超


导读

本文将以 OpenMLDB 为例,详细讲解如何使用 Apache SeaTunnel 进行异构数据源同步,以及如何使用 SeaTunnel API 快速接入数据源。


概要:

  1. Apache SeaTunnel 简介

  2. Apache SeaTunnel 原理和架构演进

  3. Apache SeaTunnel Source & Sink 开发

  4. OpenMLDB Source 连接器示例

  5. 如何快速参与贡献



SeaTunnel简介


SeaTunnel 是一个非常易用的支持海量数据同步的超高性能分布式数据集成平台,由国人主导并捐赠给 Apache 基金会。SeaTunnel 取名的灵感来自于小说三体中无坚不摧的水滴。

1

Apache SeaTunnel 特性


  • 简单易用,无需开发:SeaTunnel 做数据集成无需开发,只需要写入配置文件和配置项,即可快速生成一个数据同步任务;
  • 批流一体架构:既支持批任务,又支持流任务;
  • 异构多数据源支持:包括 HDFS、MySQL、Oracle、IoTDB 等,支持多异构数据源之间同步
  • 支持Spark2.4.X+、Flink1.13.X:不仅有自己的计算引擎 Zeta,也支持 Spark2.4.X+、Flink1.13.X 版本,企业内部使用这些引擎可以无缝接入 SeaTunnel;
  • 模块化和插件化,易于扩展:预留了丰富的接口更好地处理数据请求;
  • 自研计算引擎,减少部署成本:自研计算引擎 Zeta,2.3.0 版本中有了很大的改进,保证稳定性和性能,系统在 Java 环境下可以直接部署,减少部署成本;
  • 支持CDC:无缝接入 CDC,目前支持的 CDC数据源包括 MySQL、SQLServer 等。



原理和架构演进


接下来聊一聊 SeaTunnel 的原理和架构演进。

1

Apache SeaTunnel 核心理念


SeaTunnel 设计的核心是利用设计模式中的“控制翻转”或者叫“依赖注入”,主要概括为以下两点:
  1. 上层不依赖底层,两者都依赖抽象
  2. 流程代码与业务逻辑应该分离

对于整个数据处理过程,大致可以分为以下几个流程:输入 -> 转换 -> 输出,对于更复杂的数据处理,实质上也是这几种行为的组合:


2

Apache SeaTunnel 内核原理


SeaTunnel 将数据处理的各种行为抽象成 Plugin,并使用 Java SPI 技术进行动态注册,设计思路保证了框架的灵活扩展,在以上理论基础上,数据的转换与处理还需要做统一的抽象,譬如比较有名异构数据源同步工具 DataX,也同样对数据单条记录做了统一抽象。

在 SeaTunnel 架构体系中,由于背靠 Spark 和 Flink 两大分布式计算框架,框架已经为我们做好了数据源抽象的工作,Flink 的 DataStream、Spark 的 DataSet 已经是对接入数据源的高度抽象,在此基础上我们只需要在插件中处理这些数据抽象即可,同时借助于 Flink 和 Spark提供的 SQL 接口,还可以将每一次处理完的数据注册成表,方便用 SQL 进行处理,减少代码的开发量;在最新 SeaTunnel 的架构中,SeaTunnel 做了自己的类型抽象,实现了与引擎解耦的目的。

3

Apache SeaTunnel 架构演进


那么我们在最新的一个系统的架构当中,我们已经做了自己的类型抽象,实现了与 Spark 和 Flink 解偶。

从 2022 年发出 Proposal 到现在,我们已经重构了 SeaTunnel 的底层 API,并完成了从旧架 V1 到新架构 V2 的演进。

对于 V1 版本来讲,SeaTunnel 本质上是一个 ETL工具。而 V2 版本则向 ELT  的路线发展。

这里简单介绍一下这些概念。在数据集成领域中,数据处理有三个核心概念,也就是数据的抽取(E),数据的转换(T)和数据的加载(L)。

这三个字母代表了三个步骤。那么 SeaTunnel 是 ETL 架构,也就是支持把数据抽出来,在本地进行转换处理,或者清洗等一系列操作,再 load 到对应的目标端。

但这样的架构有一个不好的点,也就是说我们如果抽取的数据量过大,而且中间转换的步骤很频繁,或者是很复杂,那么整个数据集成工具会面临很大的压力,无法利用上下游source和sink 丰富的计算资源。

如果在转换数据之前的 Source 端就把数据处理好,那么抽取效率就变高了。所以,V2 的核心就是从 ETL 架构变成 ELT 架构,把 transform 的操作上推或者下移,减轻数据集成层的压力,同时提高数据集成效率。

基于整个架构和设计哲学的讨论,我们可以在https://github.com/apache/incubator-seatunnel/issues/1608 看到,如果有兴趣,可以去了解一下 SeaTunnel 架构演进的前世今生。

  • V1 架构

V1 架构中,SeaTunnel 的连接器和异构数据都是强依赖分布式计算引擎的,对于不同的计算引擎,会有不同的一个 API 层,连接器也都依赖着 Spark 和 Flink,已经开发好的连接器本质上也都是 Spark connector 和 Flink connecter。

接入数据之后,再去对接入进来的数据进行转换,然后再进行写出。这套设计哲学虽然代码开发量很少,而且很多细节都不需要考虑,因为现在开源的 Spark、Flink的  connecotor 都已经给我们解决了大多数的问题,但实际上这也是一种弊端。第一,强依赖计算引擎,我们无法做到解耦,而且每当计算引擎做大版本升级的时候,就需要进行大量的底层改造,难度比较大。

  • V2架构

基于这些痛点,我们对 V 2 版本进行了重构。首先,V2 版本有了自己的一套 API,也有了自己的一套数据类型,基于这个就可以去开发自己的连接器,而不依赖任何引擎,接入的每一条数据都是 SeaTunnelRow,通过翻译层,把 SeaTunnelRow push 到对应的计算引擎里。

从底层来讲,我们实现是统一的,为适配不同的引擎版本,只需要开发不同的翻译层即可。即使引擎进行了大规模的 API 升级改造,也只需要再新建翻译层就好了,老版本丝毫不影响,这也是 V2 版本最大的亮点。

除了这点之外,我们还正在 V2 架构上做对应的 web 服务化工作,目前社区的进度已经进行了 50%。计划分为四个方面,首先会有 job  IDE,也就是说会有一个 job 编辑的页面,会有智能提示、数据源管理等一系列高级功能。第二,是会有 job designer 高级设计版本功能,可以通过不同的数据源参数替换,或者根据不同的需求来去实现,由一个 template  生成多个作业。

同时,我们还在规划自己的调度。社区目前已经实现了  Apache DolphinScheduler 的接入,未来会接入自带的调度。

最后是要丰富指标收集系统和报警系统,去做好作业报警,作业指标的收集和推送,让用户可以更好地感知作业运行状况。

最后做一下总结,进行 V1 和 V2 架构的升级对比,到底我们做了哪些事情。


首先在引擎依赖方面,V1 强依赖 Spark 和 Flink,V2 无依赖,且自带计算引擎,直接去部署就可以用了。

第二,在连接器实现方面,V1 针对不同的引擎可能要实现多次,V2 我们用自己的API 就只需实现一遍。

第三,V1 引擎版本升级比较困难,因为底层 API 是与引擎高度耦合的,V2 引擎版本升级比较容易,因为我们已经和引擎进行解耦,针对不同版本开发不同的翻译层即可。连接器参数统一方面,V1 针对不同引擎会有不同的参数,这也是 V1 被吐槽的一个点, V2 解决了这个痛点,所有的参数和特性都是统一的,减少用户的使用和学习成本。

最后,自定义分片逻辑上,在 V1 架构下,因为是强依赖 Spark 和 Flink 的connector,底层都已经实现好了一些分片逻辑,数据抽取效率不高,而且分片逻辑也改不了,除非改 Spark 的源码,再重新编译。对于这个痛点, V2  可以自定义分片逻辑,我们可以自由地发挥自己的想象力,数据抽取效率也是可控的,我们可以根据不同的业务需求进行修改。

  • V2架构新特性

架构升级之后,V2 架构有了一些新特性。



SeaTunnel Engine性能测试


我们还与 DataX 进行了一个性能测试对比 ,在相同的千万级别作业量下,我们对比了不同并发、堆内存下的表现,结果显示 SeaTunnel Engine 的用时明显小于 DataX,尤其在 200 兆小内存的情况下,SeaTunnel Engine表现非常好,性能比DataX 提升了30~40%。



Source&Sink开发


接下来介绍一下整个 Source 和 Sink 的开发流程,以及接口都有哪些。

首先是 API 的相关模块,大家如果想要去开发,可以去拉一下源码先看一下。

1

连接器API相关模块




你会用到的接口都在 SeaTunnel API 包这里,主要需要去看 Sink 和 Source 这两个包, Source 包里边包括你要去开发一个 Source Connector 需要用到的一些类,Sink 包包含开发 Sink Connectors 用到一些类。

2

连接器API数据抽象


SeaTunnel 连接器 V2 对数据做了统一的抽象,在所有的 Source 连接器和 Sink 连接器中,处理的都是 SeaTunnelRow 类型数据,同时 SeaTunnel 也对内设置了数据类型规范,所有通过Source 接入进来的数据会被对应的连接器转化为 SeaTunnelRow 送到下游。

这块会要进行数据映射,大家首先要了解自己要要实现的Sink 或 Source Connector 中有哪些类型是支持的,避免开发过程中踩坑。

SeaTunnel 目前支持这多种 Type 类型,包括 ArrayType,LocalTimeType,MapType 等。一些基础类型包含在 BasicType 中,如 String 等,大家如果想去开发,可以直接在源码去看。

3

连接器 API Common



接下来聊一下 API Common 模块,也就是我们刚才聊到的所有数据插有一些插件中的公共特性,比如比较重要的 PluginIdentifierInterface,也就是插件唯一标识。以及SeaTunnelContext,即插件的应用上下文,会保存一些源数据信息,如 Schema,当前并行度等。还有 SeaTunnelPluginLifeCycle,即 SeaTunnel 插件的生命周期。

4

连接器 API Source


接下来是如何开发 Source API。


首先我们有一个枚举来标识你开发的连接器到底是流式的还是批式的,也就是标注数据是否有界。如果是流式,就是 UnBoundedness,批式为 Boundedness。

下面这 4 个类是比较重要的类,第一个是SeaTunnelSource,这是 source 插件的基类,所有的 Source 插件都应该继承这个类,实现里面的方法。

第二是 SourceReader,这是 Source 插件里真正接入数据的类,因为我们需要在 Reader 里写对应的读取数据的逻辑。

SourceSplit,因为 Source 支持分片,刚才也聊到开放的自定义分片逻辑接口,如果用户想要开发 Source 插件,支持多分片并行,你就需要实现 SourceSplit,可以保存分片信息。

最后一个比较重要的类是 SourceSplitEnumerator,这是一个分片器,通过算法把分片分发到 Reader 中,真正实现 Reader 并行读取。

5

连接器 API Sink




Sink API 开发和 Source API 一样,会有一个基类 SeaTunnelSink,每个 Sink 插件都需要去继承某一个 Sink 的类。

第二个是 SinkAggregatedCommitter,用于处理 SinkWriter#prepareCommit 返回的数据信息,包含需要提交的事务信息等,但是会在单个节点一起处理,这样可以避免阶段二部分失败导致状态不一致的问题。
 
第三个是 SinkCommitter,用于处理SinkWriter#prepareCommit返回的数据信息,包含需要提交的事务信息。
 
最后是 SinkWriter 类,也是 Sink 插件真正写入数据的类。



OpenMLDB Source示例


下面我以 OpenMLDB Source 开发为例,讲解一下 SeaTunnel Connector 接入的流程。由于时间和工人工作比较忙,我暂时只开发了 Source,后面我会讲如何为项目做贡献,大家如果有兴趣的话可以参与到共建当中。

Source 实现其实很简单,总共涉及 7 个类,就完全实现了整个流程。


OpenMLDB 的 Java SDK 还是比较强大的,使用这个 Java SDK 可以快速接入到 Apache SeaTunnel 中,可以执行用户的 SQL 语句获得数据,并且可以通过 SQL 语句的预执行获取到Schema,这样就可以快速转换成 Apache SeaTunnel 内部的 SeaTunnelRow,这是一个非常好的功能。

所以说,借助于好的功能,我们就可以快速实现 Source 接入。

在 Config 信息下,整个连接器的配置参数都包含在这里,可以把这些参数封装成一个 Parameters 类来更好地进行传递。

OpenMldbSqlExecutor,因为 OpenMLDB 官方 Java SDK 比较推荐全局Excutor,所以在这个类中,我包装了所有的资源,对异常进行统一。

对于 Source,我只实现了两个类,OpenMldbSource 基类和 OpenMldbSourceReader。

OpenMldbSourceFactory 一个工具类,来实例化 Source插件,大家如果有兴趣可以去看一下整个代码的一个示例。

目前,OpenMLDB 这块还没有支持分片读取,如果大家有兴趣的话,可以再继续贡献,实现分片抽数的 feature,更好地去 SeaTunnel 进行集成。

目前 Apache SeaTunnel 已经完整支持 OpenMLDB 的所有数据类型,如果有兴趣的同学可以从 https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb 查看详细实现。

OpenMLDB Source 配置


那么接下来来说一下整个source的一些配置,我们现在目前支持某些模式,那么对于OpenMLDB来讲,它支持单机版又支持集群,那么支持集群的话就免不了会需要用到一些中间件管理一些集群的状态。那么这块的话会有一个cluster mode,代表你连的 OpenMLDB到底是个集群模式,还是不是个集群模式。

那么第二个参数我们要传一个SQL,就是我们要去抽数,我们要从哪张表上去抽数

那么第三个是一个database,这块database来讲也是依据我们一个OpenMLDB sdk的要求,要求SQL来去执行的时候SQL中不能写数据库名,SQL中只允许写表名

host和port, host和port其实来讲,只有单机版才会需要,连集群版的话,只需要提供Zookeeper连接信息。

那么同时我们因为底层它走的是HTTP协议,所以说它会有一个request,HTTP request timeout或者HTTP session timeout的参数来去让用户去进行控制,比如说这个表很大,对于OpenMLDB来说计算压力也很大,你可以设置一个超时间来去让你去等待。

使用 SeaTunnel 从 OpenMLDB 中抽数,只需要配几个参数即可,总共不超过 30 行就可以实现数据写入目标终端。
    env {
    job.name = “openmldb_to_console”
    job.mode = “BATCH”
    }


    source {
    OpenMldb {
    host = "172.17.0.2"
    port = 6527
    sql = "select * from demo_table1"
    database = "demo_db"
    cluster_mode = false
    }
    }


    sink {
    Console {}
    }

    那么详细的一些插件的用法,我在这贴了一个地址,大家可以去看一下,
    https://seatunnel.apache.org/docs/2.3.0/connector-v2/source/OpenMldb

    Sink 很快就会上线,大家拭目以待。
    机器学习应用

    SeaTunnel 在机器学习上的应用。

    SeaTunnel 把批数据和流数据统一了进来,可以用一套工具接入不同类型的数据源,机器学习所需要的模型训所需要的数据有可能是在线日志,或者是离线数据,聚合数据等实时数据流。

    对于不同的数据源,企业之前需要用不同的批流处理数据,并做融合处理再推到模型训练平台,让算法工程师们使用,这是一个架构的弊端。而 SeaTunnel 就完美地在机器学习这块发挥了很大的作用,通过批流一体的架构接入不同的数据源,为算法同学提供更好的数据帮助。

    可以参考 OPPO 机器学习平台的生产实践运维难,迭代难,oppo 智能推荐样本中心应用 SeaTunnel 后如何实现智能推荐在线学习?



    如何快速参与贡献


    如何快速参与贡献呢?


    大家也看到,SeaTunnel 的接口开发和连接器开发非常简单,Source 只需要实现 2 个类,Sink 只需要实现 3 个类,你只需要专注于连接器处理流程就好。

    这是我自己的个人私货经验分享,我在 SeaTunnel 社区算得上是一只“老鸟”了,也贡献了不少代码,这是我总结出来的一些经验,供参考。


    如果说你又是一个开源爱好者,而且社区现在做的功能和你本身工作是相关联的,你就可以快速参与进来,你会从社区的一个 owner,慢慢发展成一名 committer 或者 PMC Member。

    第二点,关注项目 issue,会有需求的讨论。如果你是开源新手,对项目了解不深,没有很好的切入点,可以找到一些 good first issue,来快速上手任务。

    第三个要点,一定要尝试用源码编译整个项目。参与代码贡献,需要保证你的代码可以可以被编译运行起来。提的 PR 也要有质量,经过测试的。做到这些的前提,第一步是要知道如何把一个项目从源码就变成可执行文件。所以说,学习项目的编译是贡献的第一步。

    第四点,我们要去安装 Check Style的插件,因为项目需要有统一的代码风格,要符合社区的规范。安装这个插件可以快速让你融入规范中。

    第五,做 Dependencies 自测,因为 Apache 开源项目有比较严格的 license,所以如果你的开发依赖了外部的科研项目或者商业 SDK,就需要进行声明,保证合法合规性。

    第六,修改项目代码模板,因为 Apache 项目需要证明版权,所以每一个代码文件都会有一个 License 声明归基金会所有。目前,我们使用 Apache License 2.0,使用代码模板的功能,新建一个接口,或者新建一个文件就可以去快速给生成  License,免去手动添加,提高你的开发效率。

    第七,熟练 git 技能,因为开源社区贡献者在众多,在同一个分支上进行开发免不了冲突发生,这时需要用到 git 高级功能,所以一定要熟悉 git 的技能,搞清楚你当前的分支对别人有没有影响,如果别人对你有影响了,你要去怎么解决这个事情。

    最后一点回归到主题,就是我们要脸皮够厚,不要怕错。开源是一件很振奋人心的事情,开源不仅给了我技术上的成长,我也认识到了很多志同道合的朋友。我也是从一个菜鸟慢慢成长为到现在一个老鸟,第一次贡献我也踩了很多坑,但社区的文化包容,社区伙伴们很细心,很热爱给我提供意见和帮助,review 我的代码。

    如果说你有一颗想要贡献的心,那么你只需要去迈出你的第一步,学习我刚才提到这些点,那么你就完全可以具备做贡献的条件。


    Apache SeaTunnel


    Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

    仓库地址: 
    https://github.com/apache/incubator-seatunnel

    网址:
    https://seatunnel.apache.org/

    Proposal:
    https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro

    Apache SeaTunnel(Incubating)  下载地址:
    https://seatunnel.apache.org/download
     
    衷心欢迎更多人加入!

    我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

    我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

    提交问题和建议:
    https://github.com/apache/incubator-seatunnel/issues

    贡献代码:
    https://github.com/apache/incubator-seatunnel/pulls

    订阅社区开发邮件列表 : 
    dev-subscribe@seatunnel.apache.org

    开发邮件列表:
    dev@seatunnel.apache.org

    加入 Slack:
    https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

    关注 Twitter: 
    https://twitter.com/ASFSeaTunnel

    往期推荐




    技术详解 | SeaTunnel WAL 机制实现内存数据持久化存储




    如何使用 SeaTunnel 同步 MySQL 数据到 Hive




    SeaTunnel 社区 Committer 迎新!



    分享、点赞、在看,给个3连击呗!

    文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论