暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

数据湖Hudi实践案例:hudi-kafka-connect设计原理及部署详解

大数据从业者 2023-03-02
1957

背景  

Hudi可以用于构建实时数据湖,除了使用Spark/Flink计算引擎实时消费Kafka数据写入到Hudi表,也可以使用Hudi提供的基于kafka connect框架实现的hudi-sink-connector,轻松将Kafka数据导入到Hudi表。kafka connect是apache kafka的免费开源组件,它标准化了kafka与其他数据系统的集成,提供将数据从外部系统写入kafka的source connector和将数据从kafka写入到外部系统的sink connector。Kafka connect具备高扩展性、高可靠性、负载均衡、易用性(REST API)等特性。


hudi-sink-connector具备如下关键特性:
    保证exactly-once语义,无数据丢失或数据重复;
    支持多任务管理,允许connector按需扩缩容;
    复制
    原理解析 

    由于kafka sink connector在分布式环境运行并发执行多个tasks处理多个partitions数据。所以,hudi-sink-connector需要提供一个支持多tasks同时写入Hudi表的并发模型。

    如图所示,hudi-sink-connector实现以两阶段提交协议(two-phase commit protocol)为基础,包括一个Coordinator和多个Participants。Coordinator仅仅处理partition-0的数据(如上图Worker1.Task1),这样可以避免实现leader选举机制。Coordinator负责所有participants事务写及其他表服务。

    每个partition对应一个Participant,Coordinator与Participant之间通过特定Topic(oodie.kafka.control.topic)进行通信。当Coordinator开始一个新事务时,所有participants开始从自己负责的partition消费数据写入到Hudi表不相同的file groups。file ID包含kafka partition序号,即使两个Participants同时写入一个Hudi partition也不会失败。当Coordinator结束一个事务时,所有participants停止写数据并返回写入状态。如果所有participants写入状态成功,coordinator提交所有写入数据。
    一旦kafka connect集群存在Works或者Tasks失败的情况,kafka partition会被重新分配到运行中的tasks。为了保持coordinator或participants协议简洁明了,整个事务在有Tasks失败时都不提交,之前写入的数据后续会被Hudi cleaner服务删除。Hudi commit文件中有每个kafka partition offset,方便恢复时从最新offset消费。

    实践案例  

    如图所示为hudi-sink-connector端到端部署案例。各种应用程序或者kafka source connector将数据写入到Kafka,可以选择将数据schema注册到Schema Registry。hudi-sink-connector消费Kafka数据写入到Hudi表中,可以选择从Schema Registry获取最新的schema信息。如果配置了Hive集成,hudi-sink-connector将会持续同步Hudi表metadata信息到Hive Metastore。Hudi表能够被多种查询引擎查询,如Presto、Trino等等。

    Hudi源码提供有特别详细的实践流程,见github地址:
      https://github.com/apache/hudi/tree/master/hudi-kafka-connect
      复制
      关键步骤如下(我的集群未部署Schema Registry,跳过相关内容):
      1.创建Topic(默认为hudi-control-topic,kafka自动创建开启时可跳过)
        cd $KAFKA_HOME
        ./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
        复制
        2.启动Hudi connector集群
          cd $KAFKA_HOME
          ./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
          复制
          connect-distributed.properties配置文件内容参考如下:
            https://github.com/apache/hudi/blob/master/hudi-kafka-connect/demo/connect-distributed.properties
            复制

            注意:plugin.path为hudi-kafka-connect-0.13.0-SNAPSHOT.jar实际路径
            3.使用REST API启动hudi-sink-connector
              curl -X DELETE http://localhost:8083/connectors/hudi-sink(第一次启动的话,可以跳过)
              curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors
              复制
              4.Hudi表服务

              当使用MOR表时,默认开启异步压缩,可以通过配置hoodie.kafka.compaction.async.enable修改;默认关闭异步clustering,可以通过配置hoodie.clustering.async.enabled修改。

              大数据量场景,出于性能考虑,不建议hudi-sink-connector执行compaction和clustering,推荐使用单独的Flink/Spark/Hudi CLI作业执行compaction和clustering。


              5.完整实践用例配置详见如下地址
                https://github.com/apache/hudi/tree/master/hudi-kafka-connect/demo
                复制

                总结  

                实时数据入湖场景,除了利用计算引擎Flink/Spark以外,hudi-sink-connector提供一种可选项。hudi-sink-connector这种架构简单明了,易于维护。可以直接复用Kafka集群资源,不依赖第三方计算引擎,快速构建实时数据湖。用户可以根据实际场景需求,作出选择。

                文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论