暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

代码演示 | SeaTunnel 连接器极简开发流程

SeaTunnel 2022-07-28
5517
   点亮 ⭐️ Star · 照亮开源之路


GitHub:https://github.com/apache/incubator-seatunnel



在 7 月 24 日 Apache SeaTunnel(Incubating)&Apache Doris 联合 Meetup 上,白鲸开源高级工程师,Apache SeaTunnel contributor 刘黎为大家带来一次轻松的分享,主要分享如何在 SeaTunnel 中快速地开发一个连接器。


今天的分享主要分为四个部分:
  • 什么是 Connector
  • 如何接入数据源和目标
  • 代码演示如何实现一个 Connector
  • 目前支持的源和目标

什么是 Connector


Connector 是接入数据源的具体实现,由 Source 和 Sink 组成。Source 负责从源端读取数据,例如 MysqlSource、HDFSSource ,DorisSource,TXTSource 等;Sink 负责将读取的数据写入目标端,例如 MySQLSink、ClickHouseSink、HudiSink 等。通过 Source 和 Sink 的配合完成数据的搬运,也就是数据的同步。


当然,不同的 Source 和 Sink 是可以相互配合的,比如可以使用 MySQL 的 Source 配合 Doris 的 Sink,就能完成数据从 MySQL 同步到 Doris,或者从 MySQL source 和 HDFS Sink 就能完成从 MySQL 读入数据,然后写入 HDFS。

如何接入数据源和目标


如何接入 Source

DS


先来看一下如何接入 Source,即如何实现一个 Source,以及接入 Source 需要实现哪些核心接口。

最简单的 Source 就是一个单并发的 Source。如果一个 Source 不支持状态存储和一些高级功能,在这样一个简单的单并发Source,我们要实现哪些接口呢?

首先在 Source这里面,我们需要用 getBoundedness 去标识 Source 支持实时的还是离线的,还是都能支持。createReader 创建一个 Reader,这个 Reader 主要的作用就是读取数据的具体实现。在 Reader 里面,其实我们只用实现一个方法就可以,那就是 pollNext,通过这个方法把读取到的数据进行发送。单并发 source 其实就是这么简单。

如果需要并发读取,我们需要额外实现哪些接口呢?


在并发读取里面,我们会引入一个新的成员,Enumerator。我们在 Source 中实现 createEnumerator,它的主要作用是创建一个枚举器,用于将任务拆分为 split,并下发给 reader。例如一个任务可以拆分为 4 个 split,如果是两并发,就会对应两个 Reader, 拆分好的四个 split 可以发两个给 reader1,再发两个给 reader2。如果并发数更多,比如有 4 个并发,那就创建4 个 Reader,刚好用对应创建好的 4 个 split 进行并发读取,提升读取效率。

Enumerator 里有一个对应的接口,addSplitsBack,将我们拆分好的 split 发送到对应的 reader。通过这个方法我们可以指定 reader 的 ID。

与之对应的,reader 里面有一个 addSplits 用于接收枚举器下发下来的 split,进行数据读取。

总结一下,在并发读取的情况下,我们需要一个枚举器,在枚举器里实现任务的拆分,将拆分好的 split 下发给 reader,reader 接收这个 split 并用于读取。

如果我们需要支持断点续传和精确一次语义,那我们需要额外实现哪些接口呢?
 
如果是断点续传,那我们需要状态的保存和通过状态的恢复,在 Source 里面,我们就需要去实现一个 restoreEnumerator。这个方法的具体的含义,是通过状态恢复一个枚举器,并恢复Split。与之对应,我们需要在这个枚举器里实现一个 snapshotState,在我们做 checkpoint 的时候,用于保存当前枚举器的状态和失败恢复。同时 reader 里面一样的,也会有一个 snapshotState 方法,用于保存Reader的Split状态,在发生失败重启的时候,通过保存的状态恢复一个枚举器,split 恢复后,就能从失败的地方继续读取数据和传入数据。

精确一次语义,其实需要 source 支持数据重放,比如 Kafka 、Pulsar 等,可以通过Offset进行重放。同时还需要 Sink 是两阶段提交的,即在这两种 source 和 Sink 的配合下,就能实现精确一次语义。

如何接入 Sink

DS


再来看一下如何接入 Sink,Sink 需要实现哪些接口呢?

其实 Sink 相对来说是更简单。对于并发 Sink,不支持状态存储或不支持两阶段提交的情况下,就是非常简单的 Sink。Sink 是不区分流同步和批同步的,因为不仅是 Sink,实际上整个 SeaTunnel API 体系就是批流一体的。

首先我们需要实现一个 createWriter,Writer 用于数据写出。在 Writer 里实现一个方法 write,通过这个方法将数据写出到目标库。


如图所示,如果设置两并发,那引擎会去调两次 createWriter,生成两个 Writer,引擎会向这两个 writer 喂数据,通过 write 方法把数据写到目标端。
进阶一点,比如我们需要支持状态存储和两阶段提交,需要额外实现哪些接口呢?首先我们引入了一个新的成员,Commiter,它的主要作用是用于第二阶段的提交。


在 Sink 里,因为是状态存储的,需要做到通过状态恢复 Writer 所以需要实现  restoreWriter。

因为我们引入了一个新的成员 Committer,在 Sink 中就需要实现一个 createCommitter,通过这个方法创建一个 Committer 用于第二阶段提交/回滚。

在这种情况下,Writer 又需要额外实现哪些接口呢?因为是两阶段提交,第一阶段提交其实是放到 Writer 里面做的,需要实现 prepareCommit 方法,主要用于第一阶段提交。此外,我们支持状态存储和失败恢复,所以还需要一个 snapshotState,在 checkpoint 时做快照,通过保存状态用于失败恢复场景。

这里比较核心的是 Committer。其主要用于第二阶段的提交和回滚操作。对应的流程大致是,我们需往数据库写数据,引擎在做 checkpoint 时会触发第一阶段提交,那么 Writer 就需要去做 prepare commit。同时它会返回一个 commitInfo 给引擎,引擎去判断是否所有 Writer 的第一阶段提交都成功,如果都成功,引擎会通过 commit 方法,进行 commit 实际提交。

对于 MySQL 来说,第一阶段提交只是保存一个事务 ID,并把事务 ID 给 commit,该事务 ID 是提交还是回滚由引擎决定。

如何实现 Connector

DS


讲完 Source 和 Sink,再来看如何接入一个数据源,实现自己的 Connector。

首先,我们需要搭建 Connector 的开发环境。

必要环境


1、Java 1.8\11
2、Maven
3、IntelliJ IDEA

Windows 用户需要额外下载 gitbash(https://gitforwindows.org/)

有了这些东西之后,就可以通过 git clone 的方式 去下载 SeaTunnel 源码。

下载SeaTunnel源码
1、git clone https://github.com/apache/incubator-seatunnel.git
2、cd incubator-seatunnel

SeaTunnel 工程结构


然后我们再通过 IDE 打开,会看到如图所示的目录结构:


目录大致分为几大部分:

1、connectors-v2  
      新连接器的具体实现会放在这个 module 下

2、connector-v2-dist 
      新连接器的翻译层,将连接器翻译成具体的引擎实现,而不用在对应的引擎下做实现,例如 Spark、Flink、ST-Engine。其中,ST-Engine 是社区最近正在紧锣密鼓实现中的“大活”,值得期待。

3、examples 
      这个包提供单机本地运行方法,方便实现连接器过程中用于调试

4、e2e 
      这个包下放的是连接器的  e2e 测试

接下来看如何创建一个 connector(基于新连接器),步骤如下:

1、在 seatunnel-connectors-v2 目录下新建一个 module,命名为 connector-{连接器名}

2、pom 文件可以参考已有连接器的 pom 文件,并在父 model 的 pom 文件中添加当前子 model

3、新建两个 package,分别对应 source 和 sink
      package org.apache.seatunnel.connectors.seatunnel.{连接器名}.source
      package org.apache.seatunnel.connectors.seatunnel.{连接器名}.sink

以 mysocket 为例如图:


开发连接器去做一些实现,在实现过程中,如果需要调试,可以使用 example模块进行本地调试,这个模块主要提供了 Flink 和 Spark 的本地运行环境。

可以看到,example 模块下有 seatunnel-flink-connector-v2-example 等。
那么如何使用它们呢?

例如在 Flink 上进行调试步骤如下:

在 seatunnel-flink-connector-v2-example 模块下的操作:

      1、pom.xml 中添加连接器的依赖

      2、在 resources/examples 下添加任务配置文件

      3、在 SeaTunnelApiExample main 方法中配置文件

      4、运行 main 方法即可

代码演示


代码演示以 DingTalk 为主。

参考视频 🕖📺 19:35-37:10
实现效果:


现阶段支持的新连接器


截止 7/14 日统计完成的连接器,以及对应的 Contributor,欢迎大家试用,看看性能如何,发现 bug 也欢迎在我们社区提相关的 issues。


正在贡献中的连接器,已经有人认领开发了:


Roadmap 中的连接器,是我们近期想要支持的一些连接器,欢迎大家认领贡献。


其实飞书、钉钉、Facebook Marketing 连接器的实现都非常简单,因为里连接器不需要扛很大的数据,不像 Hive 等数据库需要考虑事务一致性或并发问题,只是很简单的 Source 或 Sink 实现。最后,也欢迎大家来做相关贡献,加入我们 Apache SeaTunnel 大家庭!

Apache SeaTunnel

//  保持联络 //

微信号 : Seatunnel

来,和社区一同成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

仓库地址: 
https://github.com/apache/incubator-seatunnel

网址:
https://seatunnel.apache.org/

Proposal:
https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

Apache SeaTunnel(Incubating) 2.1.0 下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

能够进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才刚刚开始,但社区的发展壮大需要更多人的加入。我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/incubator-seatunnel/issues

贡献代码:
https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

往期推荐






点击阅读原文,报名成为讲师

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论