暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

超详细教程 | 通过 SeaTunnel 集成平台将数据写入 OSS-HDFS 服务

SeaTunnel 2023-03-22
3673

点击蓝字 关注我们

    

SeaTunnel是一个开源、易用的超高性能分布式数据集成平台,支持海量数据的实时同步。本文介绍如何通过SeaTunnel集成平台将数据写入OSS-HDFS服务。




背景信息


SeaTunnel可稳定高效地同步百亿级数据,专注于数据集成和数据同步,主要解决数据集成领域的以下问题。
  • 数据源多样
    常用的数据源有数百种,版本不兼容。随着新技术的出现,可能出现更多的数据源。用户很难找到能够全面快速支持这些数据源的工具。
  • 复杂同步场景
    数据同步需要支持离线-全量同步、离线-增量同步、CDC、实时同步、全库同步等多种同步场景。
  • 资源需求高
    现有的数据集成和数据同步工具往往需要大量的计算资源或JDBC连接资源来完成海量小文件的实时同步,这在一定程度上加重了企业的负担。
  • 缺乏数据监控
    数据集成和同步过程经常会丢失或重复数据。同步过程缺乏监控,无法直观了解任务过程中数据的真实情况。
  • 技术栈复杂
    企业使用的技术组件各不相同,您需要针对不同的组件开发相应的同步程序来完成数据集成。
  • 管理和维护困难
    受限于不同的底层技术组件(Flink或者Spark),通常单独开发和管理离线同步和实时同步,增加了管理和维护的难度。

更多信息,请参见SeaTunnel(https://seatunnel.apache.org/docs/2.3.0/about?spm=a2c4g.11186623.0.0.66b68cc4Djgwmb)。



前提条件


已开通OSS-HDFS服务。具体步骤,请参见开通并授权访问OSS-HDFS服务(https://help.aliyun.com/document_detail/419505.htm?spm=a2c4g.11186623.0.0.66b68cc4Djgwmb。



使用限制


仅允许通过专有网络VPC的方式访问OSS-HDFS服务。创建专有网络VPC时,需确保创建的VPC与待开启OSS-HDFS服务的Bucket位于相同的地域。



步骤一:部署SeaTunnel

1

本地部署


重要:执行以下步骤前,您需要确保已安装Java 8或者Java 11并设置JAVA_HOME。

  1. 下载SeaTunnel。
  • 在Apache SeaTunnel页面,下载最新版本的seatunnel--bin.tar.gz。
  • 在终端通过以下命令下载最新版本的SeaTunnel。
    export version="2.3.0" wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz" tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"
    复制

    2. 安装connector。
    自2.2.0-beta版本开始,二进制包默认不提供connector依赖。以2.3.0版本为例,您在首次使用SeaTunnel时,需要执行以下命令安装connector。
      sh bin/install_plugin.sh 2.3.0
      复制
      说明 :${SEATUNNEL_HOME}/config/plugin.properties配置文件默认下载全部connector插件,您可以结合业务需求适当增减connector。

      connector插件列表如下:
           --connectors-v2--
        connector-amazondynamodb
        connector-assert
        connector-cassandra
        connector-cdc-mysql
        connector-cdc-sqlserver
        connector-clickhouse
        connector-datahub
        connector-dingtalk
        connector-doris
        connector-elasticsearch
        connector-email
        connector-file-ftp
        connector-file-hadoop
        connector-file-local
        connector-file-oss
        connector-file-oss-jindo
        connector-file-s3
        connector-file-sftp
        connector-google-sheets
        connector-hive
        connector-http-base
        connector-http-feishu
        connector-http-gitlab
        connector-http-github
        connector-http-jira
        connector-http-klaviyo
        connector-http-lemlist
        connector-http-myhours
        connector-http-notion
        connector-http-onesignal
        connector-http-wechat
        connector-hudi
        connector-iceberg
        connector-influxdb
        connector-iotdb
        connector-jdbc
        connector-kafka
        connector-kudu
        connector-maxcompute
        connector-mongodb
        connector-neo4j
        connector-openmldb
        connector-pulsar
        connector-rabbitmq
        connector-redis
        connector-s3-redshift
        connector-sentry
        connector-slack
        connector-socket
        connector-starrocks
        connector-tablestore
        connector-selectdb-cloud
        connector-hbase
        --end--
        复制

        2

        Kubernetes(Beta)部署

        重要:
        • 通过Kubernetes(Beta)部署SeaTunnel目前处于试运行阶段。以下以Flink引擎为例,不推荐在生产环境中使用。
        • 确保已在本地安装Docker,Kubernetes以及Helm。

        1. 启动集群。
        以Kubernetes 1.23.3版本为例,您可以使用以下命令启动集群。

          minikube start --kubernetes-version=v1.23.3
          复制

          2. 使用SeaTunnel运行镜像。

            ENV SEATUNNEL_VERSION="2.3.0-beta"
            ENV SEATUNNEL_HOME = "/opt/seatunnel"


            RUN mkdir -p $SEATUNNEL_HOME


            RUN wget https://archive.apache.org/dist/incubator/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz
            RUN tar -xzvf apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz


            RUN cp -r apache-seatunnel-incubating-${SEATUNNEL_VERSION}/* $SEATUNNEL_HOME/
            RUN rm -rf apache-seatunnel-incubating-${SEATUNNEL_VERSION}*
            RUN rm -rf $SEATUNNEL_HOME/connectors/seatunnel
            复制

            3. 构建镜像。

              docker build -t seatunnel:2.3.0-beta-flink-1.13 -f Dockerfile .
              复制

              4. 将图像加载至minikube。

                minikube image load seatunnel:2.3.0-beta-flink-1.13
                复制

                5. 在Kubernetes集群上安装证书管理器。

                在Kubernetes集群上安装证书管理器以启用Webhook组件,每个Kubernetes集群只需要安装一次证书管理器。

                  kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
                  复制

                  6. 使用Helm图表部署最新的Flink Kubernetes Operator版本。

                  a. 下载Flink Kubernetes Operator。

                    helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-0.1.0/
                    复制

                    b. 部署Flink Kubernetes Operator。

                      helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
                      复制

                      7. 验证kubectl是否已成功安装。

                        kubectl get pods
                        复制
                        返回以下结果说明已成功安装kubectl。

                          NAME                                                   READY   STATUS    RESTARTS      AGEflink-kubernetes-operator-5f466b8549-mgchb             1/1     Running   3 (23h ago)   1
                          复制



                          步骤二:设置配置文件


                          通过添加配置文件,确定SeaTunnel启动后数据输入、处理和输出的方式和逻辑。配置示例如下:
                            env {                                                                                                                                                                          
                            # You can set SeaTunnel environment configuration here
                            execution.parallelism = 10
                            job.mode = "BATCH"
                            checkpoint.interval = 10000
                            #execution.checkpoint.interval = 10000
                            #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
                            }




                            source {
                            LocalFile {
                            path = "/data/seatunnel-2.3.0/testfile/source"
                            type = "csv"
                            delimiter = "#"
                            schema {
                            fields {
                            name = string
                            age = int
                            gender = string
                            }
                            }
                            }
                            }



                            transform {

                            }
                            # In this case we don't need this function. If you would like to get more information about how to configure Seatunnel and see full list of sink plugins,
                            # please go to https://seatunnel.apache.org/docs/category/sink-v2



                            sink {
                            OssJindoFile {
                            path="/seatunnel/oss03"
                            bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"
                            access_key = "LTAI5t7h6SgiLSganP2m****"
                            access_secret = "KZo149BD9GLPNiDIEmdQ7d****"
                            endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"
                            }


                            # If you would like to get more information about how to configure Seatunnel and see full list of sink plugins,
                            # please go to https://seatunnel.apache.org/docs/category/sink-v2
                            }
                            复制
                            以上配置示例包含四个模块,详细说明如下。

                            模块
                            是否必选说明
                            env

                            用于配置引擎的环境变量。

                            关于env的更多信息,请参见env(https://seatunnel.apache.org/docs/connector-v2/JobEnvConfig)。

                            source

                            用于定义SeaTunnel需要获取的数据源,并将获取的数据用于下一个模块transform,支持同时定义多个数据源。

                            关于支持的数据源列表,请参见source(https://seatunnel.apache.org/docs/2.3.0/category/source-v2/)。

                            transform

                            用于定于数据处理模块。当定义了数据源后,可能还需要对数据做进一步的处理。如果您不需要做数据处理,可以直接忽略transform模块,数据将直接从source写入sink。

                            关于transform的更多信息,请参见transform(https://seatunnel.apache.org/docs/2.3.0/category/transform)。

                            sink

                            用于定义SeaTunnel将数据写入的目标端,本教程以写入OSS-HDFS服务为例。

                            • 关于支持的数据写入目标端列表,请参见sink(https://seatunnel.apache.org/docs/2.3.0/category/sink-v2/)。

                            • 关于sink的配置示例,请参见OSS-HDFS配置说明(https://help.aliyun.com/document_detail/607447.html?spm=a2c4g.11186623.0.0.23da231cSI2Drp#bcc4f240b706l)。




                            步骤三:运行SeaTunnel


                            通过以下命令运行SeaTunnel。
                              cd "apache-seatunnel-incubating-${version}" ./bin/seatunnel.sh --config ./config/seatunnel.streaming.conf.template -e local
                              复制
                              数据同步结束后,您可以通过SeaTunnel控制台查看输出结果,示例如下。
                                ***********************************************                                                                                                                                
                                Job Statistic Information
                                ***********************************************
                                Start Time : 2023-02-22 17:12:19
                                End Time : 2023-02-22 17:12:37
                                Total Time(s) : 18
                                Total Read Count : 10000000
                                Total Write Count : 10000000
                                Total Failed Count : 0
                                ***********************************************
                                复制



                                OSS-HDFS配置说明


                                使用OssJindoFile将数据输出到OSS-HDFS文件系统。

                                配置参数


                                配置示例

                                以下是text格式的配置示例。如果您需要配置为其他格式的配置文件,仅需相应替换以下示例中file_format的值,例如file_format = "csv"

                                  OssJindoFile {
                                  path="/seatunnel/sink"
                                  bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"
                                  access_key = "LTAI5t7h6SgiLSganP2m****"
                                  access_secret = "KZo149BD9GLPNiDIEmdQ7d****"
                                  endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"
                                  file_format = "text"
                                  }
                                  复制



                                  相关文档


                                  相关文档
                                  • OSS-HDFS服务概述 https://help.aliyun.com/document_detail/405089.htm?spm=a2c4g.11186623.0.0.66b6935bSzRMW0
                                  • OSS-HDFS服务使用前须知 https://help.aliyun.com/document_detail/428212.htm?spm=a2c4g.11186623.0.0.66b6935bSzRMW0
                                  • 开通并授权访问OSS-HDFS服务 https://help.aliyun.com/document_detail/419505.htm?spm=a2c4g.11186623.0.0.66b6935bSzRMW0


                                  Apache SeaTunnel


                                  Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

                                  仓库地址: 
                                  https://github.com/apache/incubator-seatunnel

                                  网址:
                                  https://seatunnel.apache.org/

                                  Proposal:
                                  https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro

                                  Apache SeaTunnel(Incubating)  下载地址:
                                  https://seatunnel.apache.org/download
                                   
                                  衷心欢迎更多人加入!

                                  我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

                                  我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

                                  提交问题和建议:
                                  https://github.com/apache/incubator-seatunnel/issues

                                  贡献代码:
                                  https://github.com/apache/incubator-seatunnel/pulls

                                  订阅社区开发邮件列表 : 
                                  dev-subscribe@seatunnel.apache.org

                                  开发邮件列表:
                                  dev@seatunnel.apache.org

                                  加入 Slack:
                                  https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

                                  关注 Twitter: 
                                  https://twitter.com/ASFSeaTunnel

                                  往期推荐




                                  SeaTunnel 让 Apache Flink 和 Spark SQL作业更简单!




                                  从 ETL 到 EtLT 架构的演进历程




                                  最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!



                                  分享、点赞、在看,给个3连击呗!

                                  文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                  评论