暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

窗口函数的N个使用技巧 (SQL & PySpark)

JW的随笔 2021-08-06
3103

    作为一名数据打工人,窗口函数是我们的老朋友了。在日常工作中,学会和窗口函数好好相处,能够让我们的工作事半功倍。


    在本文中,我们将先简单复习一下窗口函数,然后通过案例来了解不同窗口函数的应用场景和使用技巧。文中将会附上对应的 Spark SQL 和 PySpark代码实现。


一、 窗口函数介绍

1.1 厘清概念

     什么叫窗口?—— 窗口可以理解为“指定记录的集合”,每一条记录都有其对应的窗口。
     什么是窗口函数?—— 窗口函数就是在窗口(满足某种条件的记录集合)上执行的特殊函数。
     窗口函数和普通聚合函数的有什么区别?

     a. 聚合函数是将多条记录聚合运算为一条,是数据聚合过程;窗口函数是就每一条记录对应的窗口进行运算,返回一个对应值,不改变记录的条数。

     b. 窗口函数的执行顺序是在 FROM、JOIN、WHERE、GROUP BY、HAVING之后,在ORDER BY、LIMIT、SELECT、DISTINCT之前。它执行时 GROUP BY的聚合过程已经完成了,所以不会再产生数据聚合。

     c. 当窗口函数和聚合函数一起使用时,窗口函数是基于聚合后的数据执行的。


    以数据TB为例,在使用groupby和窗口函数时采用avg()的区别:

1.2 窗口函数的使用语法

    窗口函数使用时由“窗口函数”和over从句组成;其中,over从句分为三部分:分组(partition by)、排序(order by)、frame选取(rangeBetween 和 rowsBetween)。


    SQL和PySpark的写法分别如下所示:


1.3 窗口函数类型
    现有专用窗口函数可以分为5种类型:
    分布函数:percent_rank()、cume_dist()
    序号函数:row_number()、rank()、dense_rank()
    前后函数:lag()、lead()
    头尾函数:first()、last()
    其他函数:如分桶函数 nth_value()
        聚合函数:原有聚合函数都可以作为窗口函数,如 sum()、avg()、max()、min(),但此时order by中的字段值不会影响最终输出结果。


    1.4 over从句中的frame子句

    1.4.1 讲讲你可能不知道的frame子句语法默认规则

    1.4.2 frame子句格式规范


    二、窗口函数应用案例(SQL & Pyspark)

    2.1 数据准备

        在本节中,我们创建并使用南华西地区2010~2019年销售量数据进行讲解,数据和创建代码如下:

      # 我们先来创建一个SparkSession实例,并配置好相关参数
      import os
      import sys
      import numpy as np
      import pandas as pd
      import pyspark.sql.functions as f
      from pyspark.sql import Window
      from pyspark.sql import SparkSession


      os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.6"
      os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6"
      os.environ["SPARK_HOME"] = "/opt/app/spark-2.2.0"


      # 获取一个SparkSession的实例
      spark = (
      SparkSession.builder
      .master("yarn")
      .appName("Window_JW")
      .config("spark.sql.execution.arrow.enabled", "true")
      .config("spark.debug.maxToStringFields", "9999")
      .config('spark.executor.memory', '8g')
      .config('spark.driver.memory', '48g')
      .enableHiveSupport()
      .getOrCreate()
      )
      spark.conf.set("spark.sql.execution.arrow.enabled", "true")


      test_sdf = spark.createDataFrame(
      [
      ('华南', 2010, 60),
      ('华南', 2011, 70),
      ('华南', 2012, None),
      ('华南', 2013, 80),
      ('华南', 2014, 80),
      ('华南', 2015, None),
      ('华南', 2016, 90),
      ('华南', 2017, 90),
      ('华南', 2018, 100),
      ('华南', 2019, 120),
      ('华西', 2010, None),
      ('华西', 2011, 90),
      ('华西', 2012, 90),
      ('华西', 2013, 90),
      ('华西', 2014, None),
      ('华西', 2015, 100),
      ('华西', 2016, 70),
      ('华西', 2017, None),
      ('华西', 2018, 120),
      ('华西', 2019, 100)
      ]
      ).toDF("district", "year", "quantity")


      # 可以使用 createOrReplaceTempView() 将 spark_dataframe 注册成一张临时的hive表:
      test_sdf.createOrReplaceTempView("temp_table_sales_data")


      # 数据预览如下
      test_sdf.show()



      2.2  分布函数 —— 巧用 <colname> is not null 进行分组

          Question:分布函数会把null值并入百分比排序计算中,如何得到剔除null值后分布排序/累计值?
      • SQL:adding <colname> is not null
        sql_percent_rank = spark.sql(
        """
        select district
        , year
        , quantity
        , round(percent_rank() over(partition by district order by quantity asc), 2) as percent_rk
        , round(percent_rank() over(partition by quantity is not null, district order by quantity asc), 2) as percent_rk_ignore_null
        , case when quantity is not null
        then round(percent_rank() over(partition by quantity is not null, district order by quantity asc), 2)
        else null end as percent_rk_ignore_null2
        from temp_table_sales_data
        order by district, quantity desc
        """
        )
        sql_percent_rank.show()


        • Pyspark:adding isnull("colname")
          pyspark_percent_rank = (
          test_sdf.withColumn(
          "percent_rk",
          f.round(f.percent_rank().over(Window.partitionBy("district").orderBy(f.asc("quantity"))), 2)
          )
          .withColumn(
          "percent_rk_ignore_null",
          f.round(f.percent_rank().over(Window.partitionBy(f.isnull("quantity"), "district").orderBy(f.asc("quantity"))), 2)
          )
          .withColumn(
          "percent_rk_ignore_null2",
          f.when(
          ~f.isnull("quantity"),
          f.round(f.percent_rank().over(Window.partitionBy(f.isnull("quantity"), "district").orderBy(f.asc("quantity"))), 2)
          )
          )
          )
          pyspark_percent_rank.orderBy("district", f.desc("quantity")).show()


              结果如下:


          2.3  序号函数 —— 使用 `NULLs last` 将 NULL值 放置最后;或者增加一个排序列 <colname> is null 达到同样效果

              Question:升序排序时,序号函数会将NULL值排在首位,是否可以将NULL值排在最后?

          • SQL:adding nulls last
            sql_row_number_rank = spark.sql(
            """
            select district
            , year
            , quantity
            , row_number() over(partition by district order by quantity asc) as row_number_rk
            , row_number() over(partition by district order by quantity asc nulls last) as row_number_rk_nulls_last1
            , row_number() over(partition by district order by quantity is null, quantity asc) as row_number_rk_nulls_last2
            from temp_table_sales_data
            order by district, row_number_rk asc
            """
            )
            sql_row_number_rank.show()


            • Pyspark:using `asc_nulls_first`
              # 由于目前pyspark版本无法使用asc_nulls_first()函数,仅演示增加一列排序列isnull("quantity")的效果。
              pyspark_row_number_rank = (
              test_sdf.withColumn(
              "row_number_rk",
              f.row_number().over(Window.partitionBy("district").orderBy(f.asc("quantity")))
              )
              .withColumn(
              "row_number_rk_nulls_last2",
              f.row_number().over(Window.partitionBy("district").orderBy(f.isnull("quantity"), f.asc("quantity")))
              )
              .orderBy("district", "row_number_rk")
              )
              pyspark_row_number_rank.show()


                  结果如下:


              2.4 前后函数 ——常用于求解“曾经连续 N 年出现某种行为”



                  Question:类似“曾经连续 N 年销售量都未曾增长的地区” 这类问题如何求解?


                  以“求曾经连续3年销售量都未曾增长的地区” 为例:

              • SQL


                # Step1:找到之后第三次出现同销售量时对应的年份 later_thrid_year
                # Step2. 只有 later_thrid_year 和 当前年份 相差2年时,才是连续3年未曾增长
                sql_quantity_maintain_step2 = spark.sql(
                """
                select tb.district as district
                from
                (
                select district
                , year
                , lead(year, 2) over(partition by district, quantity order by year asc) as later_thrid_year
                from temp_table_sales_data
                where quantity is not null
                ) tb
                where tb.later_thrid_year is not null
                and tb.later_thrid_year - tb.year = 2
                group by tb.district
                """
                )
                sql_quantity_maintain_step2.show()


                • Pyspark

                  pyspark_quantity_maintain = (
                  test_sdf.where("quantity is not null")
                  .withColumn(
                  "later_thrid_year",
                  f.lead("year", 2).over(Window.partitionBy("district", "quantity").orderBy(f.asc("year")))
                  )
                  .where("later_thrid_year is not null and later_thrid_year - year = 2")
                  .select("district")
                  .drop_duplicates()
                  )
                  pyspark_quantity_maintain.show()


                      在这份数据中,只有华西地区符合题意。



                  2.5 头尾函数 —— 常用于缺失值填补



                      Question:对各个区域中销售量缺失值,使用按年份往前推第一个非缺失数据填补,如何操作?

                  • SQL:spark sql 不支持ignore nulls
                     语法,没有找到简洁的写法!
                    # 使用传统解法:借助 row_number() 窗口函数求解
                    sql_filled_nulls_quantity = spark.sql(
                    """
                    select tb.district as district
                    , tb.year as year
                    , tb.quantity as quantity
                    , tb.filled_nulls_quantity as filled_nulls_quantity
                    from
                    (
                    select a.district as district
                    , a.year as year
                    , a.quantity as quantity
                    , row_number() over(partition by a.district, a.year order by b.year desc) as rk
                    , b.quantity as filled_nulls_quantity
                    from temp_table_sales_data a
                    left join
                    (
                    select district, year, quantity
                    from temp_table_sales_data
                    where quantity is not null
                    ) b
                    on a.district = b.district and a.year >= b.year
                    ) tb
                    where tb.rk = 1
                    order by tb.district, tb.year
                    """
                    )
                    sql_filled_nulls_quantity.show()


                    • Pyspark

                      pyspark_quantity_maintain = (
                      test_sdf.withColumn(
                      "filled_nulls_quantity",
                      f.last("quantity", ignorenulls=True).over(Window.partitionBy("district").orderBy(f.asc("year")))
                      )
                      .orderBy("district", "year")
                      )
                      pyspark_quantity_maintain.show()


                          结果如下:


                      三、over从句相关应用案例

                      3.1 frame子句 —— 不得不提的动态窗口

                          frame子句主要有两种用法——row子句和range子句,它们的区别如下:
                      • range是逻辑窗口,通过指定列(列数不固定)行值的范围来定义窗口框架。即根据order by 子句中指定列,只要行值在当前行对应行值范围内,就在当前行的对应窗口内。

                      • rows是物理窗口,通过当前行中指定物理偏移来定义窗口框架。即根据order by 子句排序后,取当前行的前N行及后N行的数据计算(与当前行的值无关,只与排序后的行号相关)。


                          Question:对各个区域中销售量缺失值,使用当前区域前一年和后一年的均销售量值填充,如何操作?

                         错误解法:大家往往会惯性选择rows从句,当年份缺失时,计算结果就不对了。

                         正确解法:使用range从句。


                      • SQL

                        sql_dynamic_windows = spark.sql(
                        """
                        select district
                        , year
                        , quantity
                        , round(avg(quantity) over(partition by district order by year asc rows between 1 preceding and 1 following), 1) as dynamic_avg_quantity_rows
                        , round(avg(quantity) over(partition by district order by year asc range between 1 preceding and 1 following), 1) as dynamic_avg_quantity_range
                        , coalesce(quantity, round(avg(quantity) over(partition by district order by year asc range between 1 preceding and 1 following), 1)) as filled_quantity
                        from temp_table_sales_data
                        where district = '华西' or (district = '华南' and year <> 2013)
                        order by district, year
                        """
                        )
                        sql_dynamic_windows.show(truncate = False)


                        • Pyspark

                          pyspark_dynamic_windows = (
                          test_sdf.withColumn(
                          "dynamic_avg_quantity_rows",
                          f.round(
                          f.avg("quantity").over(
                          Window.partitionBy("district").orderBy(f.asc("year")).rowsBetween(Window.currentRow - 1, Window.currentRow + 1)
                          ), 1
                          )
                          )
                          .withColumn(
                          "dynamic_avg_quantity_range",
                          f.round(
                          f.avg("quantity").over(
                          Window.partitionBy("district").orderBy(f.asc("year")).rangeBetween(Window.currentRow - 1, Window.currentRow + 1)
                          ), 1
                          )
                          )
                          .withColumn(
                          "filled_quantity",
                          f.coalesce(f.col("quantity"), f.col("dynamic_avg_quantity_range"))
                          )
                          .orderBy("district", "year")
                          )
                          pyspark_dynamic_windows.show(truncate = False)


                              结果如下:


                          四、其他小技巧

                          4.1 如何使用窗口函数实现distinct操作

                              Question:窗口函数不支持直接使用distinct操作,如果不想进行groupby操作,可以如何实现distinct操作呢?

                              Answer:先使用 collect_set 进行聚合操作,生成去重后的list,然后使用 size() 函数对 list 进行计算操作。


                          • SQL

                            sql_quantity_maintain_step1 = spark.sql(
                            """
                            select district
                            , year
                            , collect_set(quantity) over(partition by district) as quantity_sets
                            , size(collect_set(quantity) over(partition by district)) as quantity_kinds
                            from temp_table_sales_data
                            order by district, year
                            """
                            )
                            sql_quantity_maintain_step1.show(truncate = False)


                            • Pyspark

                              pyspark_quantity_maintain = (
                              test_sdf.withColumn(
                              "quantity_sets",
                              f.collect_set("quantity").over(Window.partitionBy("district"))
                              )
                              .withColumn("quantity_kinds", f.size("quantity_sets"))
                              .orderBy("district", "year")
                              )
                              pyspark_quantity_maintain.show(truncate = False)


                                  结果如下:





                              - 版权声明 - 

                              文章版权属于本文作者

                              若有侵权,请联系本公众号删除或修改~

                              如有问题,欢迎留言~

                              文章转载自JW的随笔,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论