暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

教你完美避坑,从头到尾开发一个 SeaTunnel Connector

SeaTunnel 2022-09-01
5053

   点亮 ⭐️ Star · 照亮开源之路

GitHub:https://github.com/apache/incubator-seatunnel



SeaTunnel  Connector 接入计划

在近日举行的 SeaTunnel  Connector 接入计划专题直播活动上,白鲸开源 高级工程师 王海林带来了《SeaTunnel Connector 接入计划与开发避坑指南》的分享,手把手教大家了解如何从一个新手,从头到尾地开发一个 connector,包括准备开发到测试、最终 PR 的全过程。

介绍


- 王海林 白鲸开源 高级工程师 -


开源爱好者,SkyWalking Committer、Dolphinscheduler & SeaTunnel Contributor,目前工作内容围绕性能监控、数据处理等方向,喜欢研究相关的技术实现以及参与社区的交流贡献

本次演讲主要分为 5 各部分:

  • 关于 connector  接入激励计划
  • 认领/开发 connector 前的准备
  • 开发中的一些小事
  • 编写 E2E 测试的注意事项
  • 提交 PR 前的准备

1 关于connector接入激励计划


首先我来介绍一下 SeaTunnel Connector 接入激励计划,以及如何从一个新手,从头到尾地开发一个 connector,包括准备开发到测试、最终 PR 的全过程。

SeaTunnel 社区不久前发布了新的 connector API,支持一次开发后在各种引擎上运行,比如 Flink、Spark 等,免去了老版本需要重复开发的问题。新 API 发布后,需要对老的 connector 进行迁移,或支持新的 connector。为了激励社区踊跃参与 SeaTunnel Connector 接入工作,把 SeaTunnel 打造成为更高效的数据集成平台,SeaTunnel 社区发起,并由白鲸开源赞助了这次活动。

活动对接入 connector 的任务设置了简单、中度和困难三种模式,门槛较低,活动 issue 列表上可以看到还有哪些任务需要认领,以及优先级、难度的划分,大家可以选择自己感兴趣或容易上手的任务开始贡献。


点击链接了解活动详情。

大家的加入才能让 SeaTunnel 的生态建设更加完善,越来越强大,欢迎大家踊跃参与。为了表示感谢,我们的活动设置了积分兑换实物奖品的环节,多领任务多积分,就可以获得更多奖品。

目前我们看到活动已经有不少小伙伴参与并提交了 connector,现在上车也不晚,因为离活动结束还有一段时间,根据 connector 的难度,deadline 也会有适当的宽松。


2 认领/开发connector前的准备


那么,该如何参与活动呢?首先了解下什么是 connector?

01

什么是 Connector?



Connector 由 source + sink 组成,从上图中可以看到,它们在上层和下层分别对接各种各样的数据源,source 负责从外部数据源读取数据,sink 负责把数据写到外部的数据源。source 和 sink 之间有一个抽象层,通过抽象层可以把各类数据源的数据类型统一转化成 SeaTunnelRow 的数据格式,从而让用户可以任意地组装各类 source 和 sink,做到在异构数据源、多种数据源之间的数据同步搬运工作。

02

认领 Connector


了解了基本概念后,接着就是认领 connector 了。

在 GitHub 链接中 https://github.com/apache/incubator-seatunnel/issues/1946 可以看到我们接入 connector 的规划,大家有需要补充的也可以随时提出。

首先找到还未被认领的 connector,然后在整个 issue 中搜索一下是否有人曾经提交过 PR,以免发生冲突。在 issue 下留言认领,管理员会回复填写认领信息。

认领完后建议大家可以创建一个对应的 feature 的 issue,同步你在开发中遇到的问题,以及讨论方案设计,或者是有问题需要求助,都可以在 issue 中描述,社区小伙伴看到了可以共同参与讨论帮助解决问题,同时这也是功能实现过程的记录,方便以后维护修改的时候参考。

03

编译工程

 
认领后,现在需要准备开发环境了。

首先把 SeaTunnel 工程 fork 到本地开发环境中来,进行编译。

编译参考文档:https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/setup.md  

编译成功后运行文档中的 testcase 。但在第一次接触编译过程中可能会遇到一些问题,比如下面的编译错误。



解决上述异常的办法:

  • rm {your_maven_dir}/repository/org/apache/seatunnel
  • ./mvnw clean
  • 重新编译

04

了解 Connector 相关代码结构


工程编译过了之后,代表开发环境准备好了,接下来里了解一下 connector 的工程代码结构以及 API 接口结构。


  • 工程代码结构

工程编译出来之后,connector 相关的有三部分,第一部分是新 connector 模块代码实现以及依赖管理。

  • seatunnel-connectors-v2 存放 connector 子模块
  • seatunnel-connectors-v2-dist 管理 connectors-v2 maven 依赖

第二部分是 example,在本地测试时可以在 example 上建立相应 的 case 测试一下 connector。

  • seatunnel-flink-connector-v2-example 运行在 flink 上的 example
  • seatunnel-spark-connector-v2-example 运行在 spark 上的 example

第三部分是 e2e-testcase, 添加 Spark/Flink 各自运行引擎上针对性的测试用例,通过自动化测试验证 connector 功能逻辑。

  • seatunnel-flink-connector-v2-e2e 运行在 flink 上的 testcase
  • seatunnel-spark-connector-v2-e2e 运行在 spark 上的 testcase

  • 代码结构(接口、基类)

在开发时要用到的公共接口和基类,我们的 readme 上有完整描述,例如 API 功能使用场景等。

https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/README.zh.md

05

看看其他人怎么开发 Connector


了解完这些之后,先别着急动工,先看看别人怎么做的。

强烈推荐社区公众号上分享的 connector 新手开发教程:

【SeaTunnel 连接器极简开发流程】
新 API Connector 开发解析
Apache SeaTunnel(Incubating)与计算引擎的解耦之道,重构API我们做了些什么

另外可以参考已经合并的 Connector 代码,看看改动的范围、用到的公共接口和依赖、 测试用例。

https://github.com/apache/incubator-seatunnel/pulls?q=is:pr+is:merged+label:connectors-v2

3 开发中的一些小事

接下来正式进入 connector 开发过程。在开发的过程中可能会遇到哪些问题呢?Connector 分为 source 和 sink 两端,可以任选一个或者都实现。


01

开发 Source 相关


开发 source 需要注意的问题,首先是要确定 source 的读取模式,是流式还是批式?还是都要支持?

使用 Source#getBoundedness 接口,标记  source 支持的模式。比如 Kafka 天然支持流式读取,但也可以通过在 source 中获取 lastOffset 以支持批模式读取。

另一个需要注意的问题是,source 是否需要并发读取?如果是单并发,source 启动之后,会创建一个 reader 到数据源读取数据。如果要实现多并发,就要实现一个 enumerator 接口,通过它来分配数据块给 reader,reader 各自读取各自分到的数据块。

比如 Kafka source 用 partition 分片,jdbc source 用字段做范围查询分片。这里需要注意的是,如果是并发读取方式,要保证数据块分发规则的稳定性,因为目前 connector 在实际运行中每个分片上都有一个对应的 enumerator,要保证 enumerator 在各个分片内数据块分发规则的稳定性。

第三个问题,source 是否要支持断点续传/状态恢复?如果要支持,需要实现:

  • Source#restoreEnumerator 恢复状态
  • Enumerator#snapshotState 存储分片分配
  • Reader#snapshotState 存储读取位置

02

开发 Sink 相关


在 sink 如果是普通的 sink 实现,根据 source 的并发,用 Sink#createWriter 写出数据。

如果需要支持失败恢复,需要实现:

  • Sink#restoreWriter 恢复状态
  • Writer#snapshotState 快照状态

如果要支持两阶段提交,需要实现以下接口:

  • Sink#createCommitter
  • Writer#prepareCommit 预提交
  • Committer#commit、abort 二阶段提交


03

Connector 相关


接下来是一些通用的问题,尤其是第一次贡献时各个环境的风格不一样,经常会有各种问题,所以建议大家开发时从工程中导入 tools/checkstyle/checkStyle.xml,统一编码的格式。

无论是 source 还是 sink,都会涉及到定义数据格式的问题。社区正在推动大家使用统一的数据格式定义。

如需定义 Schema 请参考 PR https://github.com/apache/incubator-seatunnel/pull/2392;

如需定义 Format 请参考 PR https://github.com/apache/incubator-seatunnel/pull/2435

如果觉得编译速度慢,可以临时注释老版本的 connector 相关 module,加速开发调试。

04

如何寻求帮助


开发中遇到问题需要求助时,你可以:

  • 在你的 Issue 中描述问题并召唤活跃贡献者;
  • 在邮件列表、Slack 中讨论;
  • 通过微信群沟通(如果没有加入请关注 SeaTunnel 公众号入群/加小助手微信seatunnel1);
  • 对接第三方组件可能会有社区对接群(让你事半功倍)。

4 编写E2E测试的注意事项


E2E 测试非常重要,可以说是 connector 质量的守门员,因为如果你编写的 connector 不做测试的话,社区的小伙伴很难从静态的代码中判断实现是否有问题。

所以,E2E 测试不只是功能验证,还是一个检查数据逻辑的过程,可以减轻社区 review 代码的压力,保证基本的功能正确性。

在 E2E 测试中,可能会遇到的问题:

01

E2E 失败 - 测试用例网络地址冲突


因为 E2E 网络部署结构具有以下特点:

  • spark、flink、e2e-testcase 依赖的外部组件(举例如 mysql),三者使用容器 networkAliases(host) 作为访问地址
  • spark、flink 两边的 e2e-testcase 可能在同一主机下并行运行
  • e2e-testcase 依赖的外部组件需要映射端口到主机供 e2e-testcase 访问

所以,E2E 需要注意:

  • e2e-testcase 依赖的外部组件,映射到外部的端口、networkAliases 在 spark、flink 两边的 testcase  中不能一样
  • e2e-testcase 使用 localhost、上述映射端口访问外部组件
  • e2e 的配置文件中使用 networkAliases(host) 、容器内端口访问依赖的外部组件

E2E Testcase 参考 PR https://github.com/apache/incubator-seatunnel/pull/2429

02

E2E 失败 - Spark jar 包冲突


Spark 默认使用 parent first 类加载器,可能与 connector 引用的包产生冲突。对此,可以在 Connector env 中配置使用 userClassPathFirst 类加载器。

但是,SeaTunnel 目前的打包结构会导致 userClassPathFirst 不能正常工作,所以我们创建了 issue  https://github.com/apache/incubator-seatunnel/pull/2474 来跟踪这个问题,欢迎大家贡献解决方案。

目前,只能通过文档说明替换 spark jars 目录中冲突的包来解决这个问题。

03

E2E 失败 - Connector jar 包冲突


新老版本 Connector 同时被 E2E 工程依赖导致冲突, PR https://github.com/apache/incubator-seatunnel/pull/2414 已解决这个问题。

Connector-v2 之间版本冲突

  • 主要在 E2E 时发生,因为 E2E 工程依赖了所有的 Connector
  • 未来可能规划为每个 Connector(或版本) 提供单独的测试工程

04

E2E 不充分 - Sink 逻辑验证


Connector-v2 版本的 FakeSource 目前只能生成固定的几个列的随机数据,社区小伙伴正在优化  https://github.com/apache/incubator-seatunnel/pull/2406

我们暂时可以通过 Transform#sql 模拟指定内容的数据来解决这个问题:


05

E2E 不充分 - Source 验证数据


Assert Sink 可以配置列规则,但不能做行级 value 检查。针对这个问题,可以暂时使用其他带有外部存储的 connector sink 用于查询验证数据。

06

E2E 稳定性提升


E2E 启动时,很多 case 中大家会使用 Thread.sleep 等待资源初始化,会出现 sleep 给少了初始化失败,给多了浪费时间的问题。另外,因为资源、网络等问题不稳定,可能会出现当前能跑过,之后跑不过的现象。



为了避免这个问题,可以使用 Awaitility 替换 Thread.sleep。

07

E2E 提速


目前,我看到大多数人是将 source 和 sink 分别跑 E2E 测试,如果想要加速 PR 流程,建议大家可以将 sink、source 合并为一个 E2E testcase 做验证,只跑一次 testcase

5 提交 PR 前的检查

做好了之前的工作,最后在提交 PR 之前,一定要做一下检查,包括以下几个方面:

  • 完整的再编译工程
    • codestyle 验证、依赖验证
    • 之前编译成功不代表现在还能编译成功
  • 本地运行 E2E 成功
    • flink、spark 两边均做验证
  • 补充或更改文档
  • 提交前自己再 Review 一次
    • 未被测试覆盖到的地方
    • 之前已经 review 过了也需要再次检查
    • review 要包括所有文件, 不只是代码

通过以上操作,可以大大节省 CI 资源,加快 PR Merged 速度,同时降低社区 Review 的成本。

Apache SeaTunnel

//  保持联络 //

微信号 : Seatunnel

来,和社区一同成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

仓库地址: 
https://github.com/apache/incubator-seatunnel

网址:
https://seatunnel.apache.org/

Proposal:
https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

Apache SeaTunnel(Incubating) 2.1.0 下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

能够进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才刚刚开始,但社区的发展壮大需要更多人的加入。我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/incubator-seatunnel/issues

贡献代码:
https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1f6uymuh1-KXbFKYoJ8yYy8_4R66yJNA

关注 Twitter: 
https://twitter.com/ASFSeaTunnel


往期推荐





SeaTunnel 2.1.2 封装 Flink 连接数据库的源码解析




SeaTunnel Connector 接入计划已有多个任务完成,Switch、华为 MatePad 等你带回家!




对话 Contributor | 我在 SeaTunnel 学到的个人“硬实力”



点击“阅读原文”,加入计划!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论