暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark SQL 部署和测试(下)

862

接上文《Spark SQL 部署和测试(上)》分享 Spark SQL 里测试 TPC-H 以及查看执行计划的方法。

Spark SQL 执行计划不及传统关系数据库那么强大但基本的都有,原理很容易理解。学习后对理解新兴的分布式数据库执行计划也有帮助。


TPC-H 测试

TPC-H(商业智能计算测试) 是国际交易处理效能委员会(TPC,Transaction Processing Performance Council) 组织制定的用来模拟决策支持类应用的一个测试方案。

 

数据文件初始化

TPC-H 从官方网站下载:
https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp

 

下载后解压缩即可直接用,里面包含了数据初始化程序 dbgen。由于要支持Hive表(Spark SQL兼容Hive SQL)语法,需要对源码文件做一些修改。

 

  • 修改 makefile

 

指定数据库类型为 SQLSERVER(TPC-H不支持HIVE,用SQLServer替代一下),以及指定编译工具为 gcc 。


    [root@sfx111188 dbgen]# cp makefile.suitemakefile
    [root@sfx111188 dbgen]# vim makefile


    100 ################
    101 ## CHANGE NAME OF ANSI COMPILER HERE
    102 ################
    103 CC = gcc
    104 # Current values for DATABASE are:INFORMIX, DB2, TDAT (Teradata)
    105 # SQLSERVER,SYBASE, ORACLE, VECTORWISE
    106 # Current values for MACHINE are: ATT, DOS, HP, IBM, ICL, MVS,
    107 # SGI, SUN,U2200, VMS, LINUX, WIN32
    108 # Current values for WORKLOAD are: TPCH
    109 DATABASE= SQLSERVER
    110 MACHINE = LINUX
    111 WORKLOAD = TPCH
    复制


    • 修改头文件tpcd.h

    修改对应的 SQLSERVER 类型的语法为 HIVE 语法。

     


       [root@sfx111188 dbgen]# vim tpcd.h 


      #ifdef SQLSERVER
      #define GEN_QUERY_PLAN "explain;"
      #define START_TRAN "starttransaction;\n"
      #define END_TRAN "commit;\n"
      #define SET_OUTPUT ""
      #define SET_ROWCOUNT "limit %d;\m"
      #define SET_DBASE "use%s;\n"
      #endif
      复制

       

      • 初始化数据

       

      在当前目录编译并运行,并生成数据。


        [hadoop@sfx111188 dbgen]$ make 
        [hadoop@sfx111188 dbgen]$ mkdir tpch_350
        [hadoop@sfx111188 dbgen]$ cp dbgen tpch_350/
        [hadoop@sfx111188 dbgen]$ cp qgen tpch_350/
        [hadoop@sfx111188 dbgen]$ cp dists.dss tpch_350/
        [hadoop@sfx111188 dbgen]$ cd tpch_350
        [hadoop@sfx111188 tpch_350]$ ./dbgen -s 350
        TPC-H Population Generator (Version 3.0.0)
        Copyright Transaction Processing Performance Council 1994 - 2010


        [hadoop@sfx111188 tpch_350]$ du -sh *.tbl
        8.1G customer.tbl
        263G lineitem.tbl
        4.0K nation.tbl
        59G orders.tbl
        41G partsupp.tbl
        8.1G part.tbl
        4.0K region.tbl
        480M supplier.tbl


        复制


         TPC-H 建表

         

        首先在 Spark SQL 里建外部表,导入TPC-H 数据。
        创建数据库 tpch。并登录。
          spark-sql> create database tpch;
          Time taken: 0.048 seconds
          spark-sql> use tpch;
          Time taken: 0.016 seconds
          spark-sql>
          复制
          提前创建这个 HDFS 目录。
            [hadoop@sfx111188 ~]$ hdfs dfs -mkdir -p /tpch/
            复制


            创建外部表,外部表存储路径是 HDFS 根路径 :/tpch/。
              use tpch;


              create external table lineitem (
              l_orderkey int,
              l_partkey int,
              l_suppkey int,
              l_linenumber int,
              l_quantity double,
              l_extendedprice double,
              l_discount double,
              l_tax double,
              l_returnflag string,
              l_linestatus string,
              l_shipdate string,
              l_commitdate string,
              l_receiptdate string,
              l_shipinstruct string,
              l_shipmode string,
              l_comment string)
              row format delimited
              fields terminated by '|'
              stored as textfile
              location'/tpch/lineitem';


              create external table nation (
              n_nationkey int,
              n_name string,
              n_regionkey int,
              n_comment string)
              row format delimited
              fields terminated by '|'
              stored as textfile
              location'/tpch/nation';


              create external table region (
              r_regionkey int,
              r_name string,
              r_comment string)
              row format delimited
              fields terminated by '|'
              stored as textfile
              location '/tpch/region';


              create external table part (
              p_partkey int,
              p_name string,
              p_mfgr string,
              p_brand string,
              p_type string,
              p_size int,
              p_container string,
              p_retailprice double,
              p_comment string)
              row format delimited
              fields terminated by '|'
              stored as textfile
              location '/tpch/part';


              create external table supplier (
              s_suppkey int,
              s_name string,
              s_address string,
              s_nationkey int,
              s_phone string,
              s_acctbal double,
              s_comment string)
              row format delimited
              fields terminated by '|'
              stored as textfile
              location '/tpch/supplier';


              create external table partsupp (
              ps_partkey int,
              ps_suppkey int,
              ps_availqty int,
              ps_supplycost double,
              ps_comment string)
              row format delimited
              fields terminated by '|'
              stored as textfile
              location '/tpch/partsupp';


              create external table customer (
              c_custkey int,
              c_name string,
              c_address string,
              c_nationkey int,
              c_phone string,
              c_acctbal double,
              c_mktsegment string,
              c_comment string)
              row format delimited
              fields terminated by '|'
              stored as textfile
              location '/tpch/customer';


              create external table orders (
              o_orderkey int,
              o_custkey int,
              o_orderstatus string,
              o_totalprice double,
              o_orderdate date,
              o_orderpriority string,
              o_clerk string,
              o_shippriority int,
              o_comment string)
              row format delimited
              fields terminated by '|'
              stored as textfile
              location '/tpch/orders';
              复制
              导入 TPC-H 数据文件到外部表。
                load data local
                inpath '/data/sfdv1n1/tpch350/region.tbl' into table region;
                load data local
                inpath '/data/sfdv1n1/tpch350/customer.tbl' into table customer;
                load data local
                inpath '/data/sfdv1n1/tpch350/lineitem.tbl' into table lineitem;
                load data local
                inpath '/data/sfdv1n1/tpch350/nation.tbl' into table nation;
                load data local
                inpath '/data/sfdv1n1/tpch350/orders.tbl' into table orders;
                load data local inpath '/data/sfdv1n1/tpch350/part.tbl' into table part;
                load data local
                inpath '/data/sfdv1n1/tpch350/partsupp.tbl' into table partsupp;
                load data local
                inpath '/data/sfdv1n1/tpch350/supplier.tbl' into table supplier;


                复制
                上面建的表是外部表,再新建一组 Spark 表,不开启压缩,使用 parquet 列存格式。然后将外部表数据导入到 Spark 表中。
                  create database tpch_uncompressed;
                  set spark.sql.parquet.compression.codec=uncompressed;


                  Create table tpch_uncompressed.lineitem (L_ORDERKEY INT, L_PARTKEY INT, L_SUPPKEY INT,
                  L_LINENUMBER INT, L_QUANTITY DOUBLE, L_EXTENDEDPRICE DOUBLE, L_DISCOUNT DOUBLE,
                  L_TAX DOUBLE, L_RETURNFLAG STRING, L_LINESTATUS STRING, L_SHIPDATE STRING,
                  L_COMMITDATE STRING, L_RECEIPTDATE STRING, L_SHIPINSTRUCT STRING, L_SHIPMODE
                  STRING, L_COMMENT STRING) stored as parquet;
                  create table tpch_uncompressed.part (P_PARTKEY INT, P_NAME STRING, P_MFGR STRING,
                  P_BRAND STRING, P_TYPE STRING, P_SIZE INT, P_CONTAINER STRING, P_RETAILPRICE
                  DOUBLE, P_COMMENT STRING) stored as parquet;
                  create table tpch_uncompressed.supplier (S_SUPPKEY INT, S_NAME STRING, S_ADDRESS
                  STRING, S_NATIONKEY INT, S_PHONE STRING, S_ACCTBAL DOUBLE, S_COMMENT STRING)
                  stored as parquet;
                  create table tpch_uncompressed.partsupp (PS_PARTKEY INT, PS_SUPPKEY INT, PS_AVAILQTY
                  INT, PS_SUPPLYCOST DOUBLE, PS_COMMENT STRING) stored as parquet;
                  create table tpch_uncompressed.nation (N_NATIONKEY INT, N_NAME STRING, N_REGIONKEY
                  INT, N_COMMENT STRING) stored as parquet;
                  create
                  table tpch_uncompressed.region (R_REGIONKEY INT, R_NAME STRING, R_COMMENT
                  STRING) stored as parquet;
                  create table tpch_uncompressed.orders (O_ORDERKEY INT, O_CUSTKEY INT, O_ORDERSTATUS
                  STRING, O_TOTALPRICE DOUBLE, O_ORDERDATE STRING, O_ORDERPRIORITY STRING,
                  O_CLERK STRING, O_SHIPPRIORITY INT, O_COMMENT STRING) stored as parquet;
                  create table tpch_uncompressed.customer (C_CUSTKEY INT, C_NAME STRING, C_ADDRESS
                  STRING, C_NATIONKEY INT, C_PHONE STRING, C_ACCTBAL DOUBLE, C_MKTSEGMENT STRING,
                  C_COMMENT STRING) stored as parquet;


                  use tpch_uncompressed;
                  insert into customer select * from tpch.customer;
                  insert into lineitem select * from tpch.lineitem;
                  insert into part select * from tpch.part;
                  insert into supplier select * from tpch.supplier;
                  insert into partsupp select * from tpch.partsupp;
                  insert into nation select * from tpch.nation;
                  insert into region select * from tpch.region;
                  insert into orders select * from tpch.orders;
                  复制

                  parquet 简介

                  parquet 是面相分析型业务的列式存储格式。相比行存格式有以下优势:
                  • 谓词下推,可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量。

                  • 使用更高效的存储编码,进一步压缩存储空间。

                  • 映射下推,只需读取需要的列,支持向量运算,能获得更好的扫描性能。



                  HDFS 文件分析
                  hdfs 查看方式

                  HDFS 的目录可以通过 WEB 页面查看,监听端口默认 9870 。


                  也可以通过命令 hdfs 查看。
                    [hadoop@sfx111188bcache]$ hdfs dfs -du -s -h tpch/*
                    8.1 G 8.1 G tpch/customer
                    263.0 G 263.0 G tpch/lineitem
                    2.2 K 2.2 K tpch/nation
                    58.6 G 58.6 G tpch/orders
                    8.0 G 8.0 G tpch/part
                    48.1 G 48.1 G tpch/partsupp
                    389 389 tpch/region
                    479.6 M 479.6 M tpch/supplier


                    [hadoop@sfx111188 ~]$ hdfs dfs -du -h user/hadoop/warehouse/tpch_uncompressed.db/
                    7.8 G 7.8 G user/hadoop/warehouse/tpch_uncompressed.db/customer
                    112.4 G 112.4 G user/hadoop/warehouse/tpch_uncompressed.db/lineitem
                    3.8 K 3.8 K user/hadoop/warehouse/tpch_uncompressed.db/nation
                    43.9 G 43.9 G user/hadoop/warehouse/tpch_uncompressed.db/orders
                    4.2 G 4.2 G user/hadoop/warehouse/tpch_uncompressed.db/part
                    37.7 G 37.7 G user/hadoop/warehouse/tpch_uncompressed.db/partsupp
                    1.5 K 1.5 K user/hadoop/warehouse/tpch_uncompressed.db/region
                    498.0 M 498.0 M user/hadoop/warehouse/tpch_uncompressed.db/supplier
                    [hadoop@sfx111188 ~]$


                    复制
                    从上面可以看出,每个表的原始数据导入到 parquet 格式表后,数据感觉都被压缩了。



                    文件块大小是 128MB,实际块大小可能会比块小。如 lineitem 表的块平均为 56MB 左右。


                    CSD 数据压缩比


                    当 HDFS 文件放在 CSD 2000上时,可以看数据压缩比。

                    查看 lineitem 原始文件的一个块信息(找到 blockid)
                    查看该Block ID 对应的文件的 CSD 压缩比。文件有2个,数据文件部分压缩比在 2.15 左右。
                      [hadoop@sfx111188subdir0]$ find . |grep 1073743944 |sudo xargs sfx-filesize -h
                      Logical Alocated Physical Ratio File
                      128M 128M 59.50M 2.15 ./subdir8/blk_1073743944

                      Logical Alocated Physical Ratio File
                      1.00M 1.00M 1.00M 1.00 ./subdir8/blk_1073743944_3121.meta
                      复制


                      再查看该表 lineitem(parquet格式)的一个数据块信息。

                      同样查看该数据块在 CSD 2000 上压缩比,约为 1.64 。
                        [hadoop@sfx111188 dfs]$ find . |grep 1073745231 |sudo xargs sfx-filesize -h |grep .
                        Logical Alocated Physical Ratio File
                        442.50K 444K 442.77K 1.00 ./data/current/BP-294683394-127.0.0.1-1651761708423/current/finalized/subdir0/subdir13/blk_1073745231_4407.meta
                        Logical Alocated Physical Ratio File
                        55.31M 55.31M 33.64M 1.64 ./data/current/BP-294683394-127.0.0.1-1651761708423/current/finalized/subdir0/subdir13/blk_1073745231

                        复制


                        TPC-H 查询

                             Spark SQL 能满足 TPC-H 22个查询需求。下面给两个例子。

                        • q1_pricing_summary_report.sql

                          SELECT
                          L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY), SUM(L_EXTENDEDPRICE), SUM(L_EXTENDEDPRICE*(1-L_DISCOUNT)), SUM(L_EXTENDEDPRICE*(1-L_DISCOUNT)*(1+L_TAX)), AVG(L_QUANTITY), AVG(L_EXTENDEDPRICE), AVG(L_DISCOUNT), COUNT(1)
                          FROM
                          lineitem
                          WHERE
                          L_SHIPDATE<='1998-09-02'
                          GROUP BY L_RETURNFLAG, L_LINESTATUS
                          ORDER BY L_RETURNFLAG, L_LINESTATUS;
                          复制
                          • q2_minimum_cost_supplier.sql

                            select
                            s_acctbal,
                            s_name,
                            n_name,
                            p_partkey,
                            p_mfgr,
                            s_address,
                            s_phone,
                            s_comment
                            from
                            part,
                            supplier,
                            partsupp,
                            nation,
                            region
                            where
                            p_partkey = ps_partkey
                            and s_suppkey = ps_suppkey
                            and p_size = 15
                            and p_type like '%BRASS'
                            and s_nationkey = n_nationkey
                            and n_regionkey = r_regionkey
                            and r_name = 'EUROPE'
                            and ps_supplycost = (
                            select
                            min(ps_supplycost)
                            from
                            partsupp,
                            supplier,
                            nation,
                            region
                            where
                            p_partkey = ps_partkey
                            and s_suppkey = ps_suppkey
                            and s_nationkey = n_nationkey
                            and n_regionkey = r_regionkey
                            and r_name = 'EUROPE'
                            )
                            order by
                            s_acctbal desc,
                            n_name,
                            s_name,
                            p_partkey
                            limit 100;
                            复制

                            查询执行方式:

                              [hadoop@sfx111188 ~]$ spark-sql --database tpch_uncompressed -f tpch_queries/q1_pricing_summary_report.sql
                              Setting default log level to "WARN".
                              To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                              2022-05-23 14:54:52,272 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                              Spark master: local[*], Application Id: local-1653288893920
                              A F 1.3214595889E10 1.981526484204869E13 1.8824474573410145E13 1.9577452155215535E13 25.500070938291696 38237.31451022952 0.05000213280898179 518218005
                              N F 3.44841059E8 5.171030920476401E11 4.91243409840054E11 5.108955072236212E11 25.49886259627669 38236.57406246567 0.050010318859026945 13523782
                              N O 2.6023136781E10 3.902133887866225E13 3.707025733533616E13 3.855306479917973E13 25.50024030295549 38237.26273749808 0.049999221336791196 1020505551
                              R F 1.3214518093E10 1.9815185947287605E13 1.882442544492562E13 1.9577409002481715E13 25.50045772433984 38237.96736230632 0.05000015503457316 518207094
                              Time taken: 52.726 seconds, Fetched 4 row(s)
                              [hadoop@sfx111188 ~]$
                              复制


                              Spark UI

                                   spark-sql 运行时,会给出一个链接地址用于跟踪该作业的信息,默认端口通常是4040。如果被占用,会自动加1。通过web浏览器可以访问该地址。如果spark-sql 会话退出该监听会失效。


                              每次 spark-sql 运行的时候会有唯一的id表示。spark-sql 会话重新启动后就看不到上次会话的作业信息。这个时候就要去 Spark Job History 服务中找。默认监听端口 18080.


                              点击左边链接,进入该应用的详细跟踪页面。会分为 Jobs,Stages,Storages,Environments,Executors and SQL 等页面。


                              简单介绍一下。

                              • Jobs 会展示这个应用运行的作业,有些作业会处于 skiped 状态。

                              • Stages 是更细粒度的划分。

                              • Storage 可以看到 Spark SQL 缓存的数据(只能看到明确调用 cache 或 persist 方法保存的数据)

                              • Environments 可以看到这次运行时的Spark 环境参数。有些是参数文件指定的,有些可以在 spark-sql 会话里修改或者在命令行参数里指定。

                              • Executors 可以看到该节点 Executor 运行需要的资源数。


                              • SQL 看到该应用运行过的 SQL。

                              点开这个 SQL 链接,可以看到更详细的执行计划信息(图和文字版)。


                              Spark SQL 支持的 Join

                              Spark SQL 支持常用的表连接算法(INNER JOIN,OUTER JOIN,SEMI JOIN, ANTI JOIN 等)。通常大表连接的时候,是大表的各个分区进行 N对N的连接,这个叫 Shuffle Join ,这种 IO 和网络流量会比较大。

                              当其中一个表数据量小于 Worker 节点的工作内存,Spark SQL 可能会优化为将小表数据广播到所有 Worker 节点,实现某些连接在节点内部完成。这样做可以减少网络流量,但会增加一点 Worker 节点本地 CPU 消耗。



                              通过 Spark Web UI,在 TPC-H 查询的执行计划里,可以看到很多这样的广播(broadcast)操作。

                              更多关于排序、去重、分组统计原理可以看文末参考。



                              更多阅读

                              文章转载自数据库技术闲谈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论