暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

最新发布版本Kyuubi整合Flink实战案例与源码改造

大数据从业者 2023-04-23
3109
Kyuubi概述   
Kyuubi是一个分布式、多租户的网关,支持主流引擎(Spark、Flink、Hive、Trino、Doris等),能够提供基于数据仓库和数据湖的serverless SQL 能力。      
通常来说,Kyuubi完整生态系统架构层次如上图所示,每层架构松散耦合、互不绑定。比如,用户可以使用Kyuubi、Spark、Hudi构建纯SQL的数据湖,用于处理ETL或OLAP业务;当然,也可以使用Kyuubi、Flink、Hudi构建纯SQL的实时数据入仓入湖架构。
总之,所有业务处理都可以基于一个平台、一份数据、纯SQL实现完成,与RDBMS一样的用户体验。除此之外,Kyuubi支持LDAP和Kerberos安全认证,支持多租户资源隔离,提供多种编程接口和访问形式:
    Hive Thrift Protocol:兼容HiveServer2语法,提供JDBC方式提交SQL的能力。
    RESTful APIs:提供管理engines、sessions、operations功能和REST方式提交SQL的能力。


    源码编译  

    版本信息:Flink1.16、Kyuubi1.7
      git clone -b v1.7.0 https://github.com/apache/kyuubi.git
      cd kyuubi && ./build/dist --tgz --spark-provided --flink-provided --hive-provided
      如果想并发编译,可以在build/dist增加-T 1C:
      编译成功,得到可解压安装的tar整包,结果如图所示:
      源码调试时,也可以设置只编译指定模块(可选):
        build/mvn clean package -pl kyuubi-common -DskipTests
        或者,也可以设置不编译指定模块(可选):
          mvn clean install -pl '!dev/kyuubi-codecov,!kyuubi-assembly' -DskipTests


          Kyuubi1.7整合Flink1.16  

          Hadoop集群搭建和Flink on yarn适配这里不再赘述(参见之前的文章)。
          当前版本,Kyuubi仅支持Flink yarn-session集群。
          启动yarn-session集群:
            ./bin/yarn-session.sh –d
             
            Kybuubi安装路径conf文件夹下新建两个文件:
              kyuubi-env.shkyuubi-defaults.conf
              kyuubi-env.sh文件内容:
                export HADOOP_CONF_DIR=/etc/hadoop/conf
                export FLINK_HOME=/home/myHadoopCluster/flink-1.16.0
                export FLINK_HADOOP_CLASSPATH=/usr/hdp/3.1.5.0-152/hadoop-hdfs/*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/lib/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/lib/*:/usr/hdp/3.1.5.0-152/hadoop/*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/*
                kyuubi-defaults.conf文件内容:
                  kyuubi.ha.zookeeper.quorum  felixzh:2181
                  kyuubi.ha.zookeeper.namespace kyuubi
                  kyuubi.engine.type FLINK_SQL
                  flink.execution.target yarn-session
                  kyuubi.session.engine.flink.max.rows 5
                  启动kyuubi server服务:
                    ./kyuubi start
                    Kyuubi beeline连接Flink yarn-session集群
                      ./beeline -u 'jdbc:hive2://felixzh:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;#flink.yarn.application.id=application_1679471682027_0006'
                      执行测试SQL语句:
                        SELECT
                        name,
                        COUNT(*) AS cnt
                        FROM
                        (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
                        GROUP BY name;
                        至此,Kyuubi整合Flink配置与验证完成!

                        源码改造之select  

                        Kyuubi1.7查询Flink表时候,返回结果不准确,存在重复数据。
                        以Kafka表为例演示,建表语句如下:
                          CREATE TABLE KafkaTable (
                          `id` BIGINT,
                          `str` STRING
                          ) WITH (
                          'connector' = 'kafka',
                          'topic' = 'json_topic',
                          'properties.bootstrap.servers' = 'felixzh:6667',
                          'properties.group.id' = 'testGroup',
                          'scan.startup.mode' = 'earliest-offset',
                          'format' = 'json'
                          );
                          KafkaTable表对应Topic:(json_topic)只有一条数据:
                          使用Kyuubi查询该表:
                            select * from KafkaTable;
                            如上图可以看出:返回5条重复数据,问题复现。
                            该问题已经提了issues:
                              https://github.com/apache/kyuubi/issues/4083
                              我也尝试提交修复方法的PR:
                                https://github.com/apache/kyuubi/pull/4701
                                其中,附带了单元测试,测试结果ok:
                                  mvn test -Dsuites=org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite


                                  源码改造之insert  

                                  使用Kyuubi写入数据到上述KafkaTable表:
                                    insert into KafkaTable values(2, 'ss');
                                    如图可以看到,insert写入成功,返回结果为-1。
                                    但是,如果提交insert select语句,beeline会一直卡住,ctrl+c都停不掉,现象如下:
                                    社区有相关issues讨论:
                                      https://github.com/apache/kyuubi/issues/4446
                                      原因就是kyuubi适配flink的实现方式就是等待sql语句执行完成。该issues讨论区有种改造方式,个人感觉更合理一些,测试效果ok。
                                      具体效果就是:insert语句直接返回JobId。


                                      源码改造之日志  

                                      Kyuubi1.7使用beeline不使用-n参数指定username时候,会提示warn异常,如下:
                                        org.apache.hadoop.security.ShellBasedUnixGroupsMapping$PartialGroupNameException: The user name 'anonymous' is not found. id: anonymous: no such user
                                        id: anonymous: no such user
                                        关于这个问题,我也尝试提交相应修复方法:
                                          https://github.com/felixzh2020/kyuubi/commit/5f02d2cbc3d8792486c9e8eace2b3696b536d5cf

                                          总结  

                                          鉴于Kyuubi版本迭代很快,本文仅适用于最新发布的Kyuubi1.7.0。据我所知,master分支已经支持Flink1.17以及application模式集群;与Flink适配方式也从原来基于sql-client模块改为sql-gateway模块,后续实践再继续发文。

                                          文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                          评论