暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

电力行业数据湖技术方案Flink、Hudi、Hive on Spark案例全攻略记录及Hive查询MOR rt表异常修复

大数据从业者 2024-09-09
322

前言  

本文主要记录电力行业客户的数据湖技术方案实践案例,方案概括为基于FlinkSQL+Hudi流式入湖、同步表元数据到Hive,基于Hive catalog统一元数据管理,然后基于Hive on Spark离线分析计算。该方案主要考虑与已有Hive数据仓库、数据解析、报表应用等结合。欢迎关注微信:大数据从业者   

组件版本信息

Hadoop

3.1.1

Hive

3.1.3

Spark

3.3.2

Flink

1.17.2

Hudi

0.14.1

Spark编译部署   

    wget https://github.com/apache/spark/archive/refs/tags/v3.3.2.tar.gz


    tar -xvf v3.3.2.tar.gz


    cd spark-3.3.2/

    修改pom.xml中maven.version为自己环境已部署的版本

      ./dev/make-distribution.sh -tgz -Phive -Phive-thriftserver -Pyarn

      涉及修改源码,可以只编译指定模块,如下:

        ./build/mvn -pl :spark-streaming_2.12 clean package


        ./build/mvn -Phive-thriftserver -DskipTests clean package

        之前文章已经记录Spark整合Hadoop3与Hive3,本文不再重复赘述!

        Hive编译部署  

          wget  https://github.com/apache/hive/archive/refs/tags/rel/release-3.1.3.tar.gz


          tar -xvf release-3.1.3.tar.gz


          cd hive-rel-release-3.1.3/


          mvn clean package -DskipTests -Pdist -Dmaven.test.skip=true -T 1C

               

            cp packaging/target/apache-hive-3.1.3-bin.tar.gz home/myHadoopCluster/


            cd /home/myHadoopCluster/


            tar -xvf apache-hive-3.1.3-bin.tar.gz


            cd apache-hive-3.1.3-bin/conf

            hive-env.sh内容

              vim hive-env.sh


              if [ "$SERVICE" = "metastore" ]; then


              export HADOOP_HEAPSIZE=8096 # Setting for HiveMetastore


              export HADOOP_OPTS="$HADOOP_OPTS -Xloggc:/var/log/hive313/hivemetastore-gc-%t.log -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCCause -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/hive313/hms_heapdump.hprof -Dhive.log.dir=/var/log/hive313 -Dhive.log.file=hivemetastore.log"


              fi
                     
              if [ "$SERVICE" = "hiveserver2" ]; then


              export HADOOP_HEAPSIZE=8096 # Setting for HiveServer2 and Client


                export HADOOP_OPTS="$HADOOP_OPTS -Xloggc:/var/log/hive313/hiveserver2-gc-%t.log -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCCause -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/hive313/hs2_heapdump.hprof -Dhive.log.dir=/var/log/hive313 -Dhive.log.file=hiveserver2.log"


              fi


              HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop


              export HIVE_HOME=/home/myHadoopCluster/apache-hive-3.1.3-bin


              export HIVE_CONF_DIR=/home/myHadoopCluster/apache-hive-3.1.3-bin/conf


              export METASTORE_PORT=19083          

              hive-site.xml内容  

                vim hive-site.xml
                <?xml version="1.0" encoding="UTF-8" standalone="no"?>
                <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


                <configuration>
                <!--zookeeper start-->
                <property>
                <name>hive.server2.support.dynamic.service.discovery</name>
                <value>true</value>
                </property>
                <property>
                <name>hive.zookeeper.quorum</name>
                <value>felixzh:2181</value>
                </property>
                <property>
                <name>hive.server2.zookeeper.namespace</name>
                <value>hiveserver313</value>
                </property>
                <!--zookeeper end-->


                <!--metastore start-->
                <property>
                <name>hive.metastore.uris</name>
                <value>thrift://felixzh:19083</value>
                </property>
                <property>
                <name>hive.metastore.warehouse.dir</name>
                <value>/user/hive313/warehouse</value>
                </property>
                <property>
                <name>hive.metastore.failure.retries</name>
                <value>10</value>
                </property>
                <property>
                <name>hive.metastore.connect.retries</name>
                <value>10</value>
                </property>
                <!--metastore end-->


                <!--hiveserver start-->
                <property>
                <name>hive.server2.thrift.bind.host</name>
                <value>felixzh</value>
                </property>
                <property>
                <name>hive.server2.thrift.port</name>
                <value>10010</value>
                </property>
                <property>
                <name>hive.server2.webui.port</name>
                <value>10012</value>
                </property>
                <!--hiveserver end-->


                <!--postgresql start-->
                <property>
                <name>javax.jdo.option.ConnectionDriverName</name>
                <value>org.postgresql.Driver</value>
                </property>
                <property>
                <name>javax.jdo.option.ConnectionURL</name>
                <value>jdbc:postgresql://felixzh:5432/hive313?createDatabaseIfNotExist=true</value>
                </property>
                <property>
                <name>javax.jdo.option.ConnectionUserName</name>
                <value>postgres</value>
                </property>
                <property>
                <name>javax.jdo.option.ConnectionPassword</name>
                <value>123456</value>
                </property>
                <!--postgresql end-->


                <!--spark start-->
                <property>
                <name>spark.home</name>
                <value>/home/myHadoopCluster/spark-3.3.2-bin-3.3.2</value>
                </property>
                <!--spark end-->
                </configuration>


                hive-log4内容

                  cp hive-log4j2.properties.template hive-log4j2.properties          

                  创建Hive元数据库    

                    psql -h felixzh -d postgres -U postgres -p 5432


                    postgres=# create database hive313;


                    CREATE DATABASE          

                    初始化Hive元数据库

                      ./schematool -dbType postgres –initSchema

                          

                      确保Hive与Hadoop使用相同版本guava

                        rm –rf home/myHadoopCluster/apache-hive-3.1.3-bin/lib/guava*jar


                        cp guava-28.0-jre.jar home/myHadoopCluster/apache-hive-3.1.3-bin/lib/

                        否则异常如下:

                          java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V             

                          确保Hive与Spark使用相同版本libthrift

                            rm –rf home/myHadoopCluster/apache-hive-3.1.3-bin/lib/libthrift*jar


                            cp libthrift-0.12.0.jar home/myHadoopCluster/apache-hive-3.1.3-bin/lib/

                            否则异常如下:

                              org.apache.thrift.transport.TTransportException: null


                              at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ~[hive-exec-3.1.3.jar:3.1.3]          

                              启动服务

                                nohup ./hive --service metastore &


                                nohup ./hive --service hiveserver2 &

                                   

                                Beeline登录验证

                                方法1:HA

                                  ./beeline -u 'jdbc:hive2://felixzh:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver313' -n hive

                                  方法2:非HA

                                    ./beeline -u 'jdbc:hive2://felixzh:10010' -n hive    

                                    Hive on Spark功能验证

                                      set hive.execution.engine=spark;


                                      create table test(name string);


                                      insert into test values('felixzh');


                                      select * from test;

                                      Flink编译部署  

                                        wget https://github.com/apache/flink/archive/refs/tags/release-1.17.2.tar.gz


                                        tar -xvf release-1.17.2.tar.gz


                                        cd flink-release-1.17.2/


                                        mvn clean package -DskipTests -Dfast -Dhadoop.version=3.1.1 -Dhive.version=3.1.3 -Pscala-2.12 -T 1C

                                          cp -r flink-dist/target/flink-1.17.2-bin/flink-1.17.2/ home/myHadoopCluster/    


                                          cp flink-connectors/flink-sql-connector-hive-3.1.3/target/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar home/myHadoopCluster/flink-1.17.2/lib/


                                          ln -s usr/hdp/3.1.5.0-152/hadoop-mapreduce/hadoop-mapreduce-client-core-3.1.1.3.1.5.0-152.jar home/myHadoopCluster/flink-1.17.2/lib/hadoop-mapreduce-client-core.jar          

                                          声明环境变量

                                            vi bin/config.sh


                                            export HADOOP_CLASSPATH=/usr/hdp/3.1.5.0-152/hadoop-hdfs/*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/lib/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/lib/*:/usr/hdp/3.1.5.0-152/hadoop/*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/*


                                            export HADOOP_CONF_DIR=/etc/hadoop/conf/
                                              vi conf/flink-conf.yaml


                                              state.backend.type: rocksdb


                                              state.checkpoints.dir: hdfs:///flink/flink-ck


                                              state.backend.incremental: true              

                                              整合Hive Catalog

                                                vi  InitHiveCatalog


                                                CREATE CATALOG HiveCatalog


                                                WITH (


                                                'type' = 'hive',


                                                'hive-conf-dir' = '/home/myHadoopCluster/apache-hive-3.1.3-bin/conf/'


                                                );


                                                use catalog HiveCatalog   

                                                启动FlinkSQL Client

                                                  ./bin/sql-client.sh -i InitHiveCatalog


                                                  show tables

                                                  可以看到上文通过Hive beeline创建的测试表。        

                                                  Hudi编译部署  

                                                    wget https://github.com/apache/hudi/archive/refs/tags/release-0.14.1.tar.gz


                                                    tar -xvf release-0.14.1.tar.gz


                                                    cd hudi-release-0.14.1


                                                    mvn clean package -DskipTests -Dfast -Dspark3.3 -Dscaka-2.12 -Dflink1.17 -Pflink-bundle-shade-hive3 -Drat.skip=true -Dcheckstyle.skip         

                                                        

                                                      cp packaging/hudi-flink-bundle/target/hudi-flink1.17-bundle-0.14.1.jar /home/myHadoopCluster/flink-1.17.2/lib/


                                                      cp packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.14.1.jar home/myHadoopCluster/apache-hive-3.1.3-bin/lib/


                                                      cp packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.14.1.jar home/myHadoopCluster/apache-hive-3.1.3-bin/lib/          

                                                      Flink流写Hudi  

                                                      启动Flink yarn-session集群

                                                        ./bin/yarn-session.sh -jm 8G -tm 16G –d


                                                        ./bin/sql-client.sh -i InitHiveCatalog


                                                        set execution.target=yarn-session;


                                                        set yarn.application.id=<上述yarn-session applicationId>;


                                                        set execution.checkpointing.inerval=30000;          

                                                        数据源表

                                                          create table if not exists datagen1


                                                          (id int, data string, ts timestamp(3), partitionId int)


                                                          with(


                                                          'connector' = 'datagen',


                                                          'number-of-rows' = '5000000',


                                                          'rows-per-second' = '10000',


                                                          'fields.partitionId.min' = '1',


                                                          'fields.partitionId.max' = '2'


                                                          );        

                                                          数据目的表

                                                            create table if not exists mor1


                                                            (id int primary key not enforced, data varchar(20), ts timestamp(3), `partition` int) partitioned by(`partition`)


                                                            with(


                                                            'connector' = 'hudi',


                                                            'path' = 'hdfs:///flink/mor1',


                                                            'table.type' = 'MERGE_ON_READ',


                                                            'write.operation' = 'upsert',


                                                            'hive_sync.enable' = 'true',


                                                            'hive_sync.metastore.uris' = 'thrift://felixzh:19083',


                                                            'hive_sync.table' = 'mor1_hive',


                                                            'compaction.schedule.enabled' = 'true',


                                                            'compaction.async.enabled' = 'true'


                                                            );          

                                                            提交作业

                                                              insert into mor1 select id,data,ts,partitionId from datagen1;

                                                              Hive查询Hudi  

                                                                1.更新hudi jar到Hive/lib,需要重启Hive服务。


                                                                2.如果使用Hive on mr请自行将hudi相关jar(hudi-hadoop-mr-bundle-0.14.1.jar、hudi-hive-sync-bundle-0.14.1.jar)更新到mapred-site.xml配置参数mapreduce.application.framework.path所指定的hdfs目录,比如/hdp/apps/3.1.5.0-152/mapreduce/mapreduce.tar.gz


                                                                3.如果使用Hive n spark自行将hudi相关jar(hudi-spark3.3-bundle_2.12-0.14.1.jar)更新到spark/jars;如果指定了spark.yarn.archive或者spark.yarn.jars,需要同步更新。另外,需要将hive-cli-3.1.3.jar、hive-exec-3.1.3.jar需要放入spark/jars,删除原本的hive*jar          

                                                                Hive on MR相关参数如下:

                                                                  set hive.execution.engine=mr;


                                                                  set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;


                                                                  set hive.vectorized.execution.enabled=false;


                                                                  set mapreduce.map.memory.mb=8096;


                                                                  set mapreduce.map.java.opts=-Xmx8096m;


                                                                  set mapreduce.reduce.memory.mb=8096;


                                                                  set maprdeuce.reduce.java.opts=-Xmx8096m;          

                                                                  Hive on MR效果验证    

                                                                      

                                                                  Hive on Spark相关参数如下:

                                                                    set hive.execution.engine=spark;


                                                                    set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;


                                                                    set hive.vectorized.execution.enabled=false;


                                                                    set spark.task.maxFailures=1;


                                                                    set spark.executor.instances=10;


                                                                    set spark.executor.memory=2G;


                                                                    set spark.executor.cores=1;


                                                                    set spark.default.parallelism=3;              

                                                                    Hive on Spark效果验证

                                                                       

                                                                    遇到的问题

                                                                      java.lang.ClassCastException: org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit cannot be cast to org.apache.hadoop.hive.shims.HadoopShimsSecure$InputSplitShim

                                                                      这个属于原生Bug,我已经修复并提交到社区,目前已经合入主线。

                                                                        https://issues.apache.org/jira/browse/HUDI-8104    

                                                                          IllegalArgumentException: HoodieRealtimeRecordReader can only work on RealtimeSplit and not with hdfs://felixzh1:8020/flink/mor1/

                                                                           这个需要指定hive.input.format参数

                                                                            set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
                                                                              ClassCastException: org.apache.hadoop.io.ArrayWritable cannot be cast to org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch

                                                                              这个需要关闭向量化

                                                                                set hive.vectorized.execution.enabled=false;

                                                                                文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                                评论