暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

技术前沿|探秘 Spark 3.3.0 新特性 Runtime Filter

Kyligence 2022-11-16
794

本文作者耿嘉安,畅销书《深入理解 Spark》和《Spark 内核设计的艺术》作者,Apache Spark Contributor,开源项目 Gluten 核心研发,15 年 IT 经验的 Kyligence 高级性能工程师。


Runtime Filter 可以说是 Spark 3.3.0 在性能方面最重要的优化之一在本文中,作者将对 Spark 3.3.0 新特性 Runtime Filter 进行深入分析,并分享 Kyligence 开源团队如何致力于研究和使用这一特性,进一步优化产品性能,改进用户使用体验,欢迎大家在评论区分享你的看法。


Spark Runtime Filter 在查询执行期间可以显著缩减中间数据量,并进而减少计算带来的成本。当查询执行中存在 Shuffle 阶段,由于 Executor 首先需要将 Shuffle 数据写到本地磁盘,然后由其他 Executor 通过网络拉取从磁盘读取的 Shuffle 数据,所以其成本格外高昂!


因此,能够缩减 Shuffle 的数据量一直以来是 Spark 性能优化的主要工作动向。Spark runtime filter 正是通过将 Join 一端具有高选择性的 Filter 推送到 Join 另一端来缩减 Shuffle 数据量的。



#01

Bloom Filter

Spark runtime filter 为了提升性能,引入了大名鼎鼎的 Bloom Filter。Bloom Filter 是由 Bloom 在1970年提出的一种多哈希函数映射的快速查找算法。是一种基于概率数据结构来实现的算法。


本文不是专门介绍 Bloom Filter 的学术文档,旨在用最简单的方式告诉大家 Bloom Filter 是什么?Bloom Filter 有哪些优点?Bloom Filter 有哪些应用场景?


为了便于理解,假设我们有一套身份证注册系统。其中很重要的功能就是要保证每个身份证不要重复注册。我们可能会采用以下三种方案:
  1. 设计一张数据库表存储身份证号码,通过 SQL 语句来判断是否已经存在。

  2. 将身份证号码存储在 Set 集合中。

  3. 将身份证号存储到 BitSet 中。

这些方法在数据量不大的情况下都很有效。但是随着数据量的不断增加,那么问题就暴露出来了。


方案一:SQL查询的代价本身较为高昂,而且随着时间的推移,当数据库系统的数据量越来越大时,查询的性能将导致系统响应缓慢甚至不可用。


方案二:Set 要想提高性能,肯定放置在内存中。随着数据量增加,占用的内存越来越多。我国14亿公民,每个人的身份证号码如果占用30个字节,那么一共需要大约 40GB 内存。如果以 Java 语言为例,其 HashSet 占用的内存要远远大于 40GB。这个大对象放在内存中,将持续考验 Java 的 GC 性能。


方案三:BitSet 将每一个身份号码映射到一位,相比于 Set 的确减少了很多内存的浪费。为了降低冲突的概率,BitSet 必须要足够稀疏。对于14亿身份号来说,如果要保证冲突的概率不高于 1%,那么 BitSet 的长度必须要有1400亿,因而依然很浪费内存。


Bloom Filter 也采用了类似 BitSet 的方式,但是有更优秀的办法降低冲突的概率。BitSet 采用一个哈希函数,而 Bloom Filter 采用多个。任何一个哈希函数都有一定概率产生冲突,假设 Bloom Filter 采用了 n 个不同的哈希函数,这些哈希函数发生冲突的概率分别为:P0、P1、P2、...、Pn。由于这些哈希函数是彼此独立的,因此所有的哈希函数发生冲突的概率是 P0*P1*P2*...*Pn,这个概率将会非常小。


Add

当要存储一个身份证号码时,由各个哈希函数分别计算它的哈希值,再将每个哈希值在 BitSet 中对应的位置设置为1。这就是 Bloom Filter 的添加操作了。


Check

对身份证号码进行判重的过程与添加操作十分类似。首先使用各个哈希函数分别计算它的哈希值,再判断每个哈希值在 BitSet 中对应的位置是否是 1。如果有某个位置仍然是 0,则可以判定此身份证号一定没有被记录。如果每个位置都是1,则可以“基本”认为身份证号已经存在。从前面的概率模型中,我们知道——当所有的哈希值对应的位全是1,实际上并不能100%确定身份证号已经存在。


Quota

为了进一步降低 Bloom Filter 的冲突概率或误判的概率,即提升判重的准确性。有几个指标需要考虑:

  • m:BitSet 的位数

  • n:数据集的大小(例如:我国的身份证号有多少?)

  • k:哈希函数的个数

对于可以估算的 n 来说,m 越大、k 越多,准确性越高。


介绍到这里,我们知道 Bloom Filter 可以极大的提高判重的效率,通常应用在一些需要快速判断某个元素是否属于集合,但是并不严格要求 100% 正确的场合。我相信读者对于理解和应用 Bloom Filter,已经没有什么大的障碍了。如果你对 Bloom Filter 的数学模型和论文感兴趣,可以进一步研究。


Complexity

Space complexity(空间复杂度):从上面的介绍,我们知道 Bloom Filter 的空间复杂度是 O(m),即与 BitSet 的位数相关。

Time complexity(时间复杂度):从上面的介绍,我们知道查询中,Bloom Filter 的时间复杂度是 O(k),即与哈希函数的个数相关。



#02

Design

由于 Spark runtime filter 本身是针对 Spark 的物理计划所做的性能优化,因此注入 Runtime Filter 发生在物理计划优化阶段。Spark runtime filter 通过将 Join 一端具有高选择性的 Filter 推送到 Join 另一端来缩减 Shuffle 数据量,进而优化执行性能。我们先来介绍 Spark runtime filter 引入的概念,然后介绍它支持优化的场景以及它的具体实现模式。最后,还会专门介绍它的缺点以及需要的改进。


Concept

为便于理解,Spark runtime filter 引入了以下概念:

  • Filter Creation Side(Runtime Filter的构造端)Join 的两端中具有高选择性 Filter 的一端。Spark 将创建一个子查询对 Runtime Filter 构造端的 Join key 进行聚合,此子查询将会在应用端作为 Runtime Filter。

  • Filter Application Side(Runtime Filter的应用端)Join 的两端中运用 Runtime Filter 的一端。在构造端构造的子查询作为 Runtime Filter 将拼接到应用端,作为一个新的 Filter。


Scenarios

Spark runtime filter 目前为以下两种场景提供了支持。

1. 当 Join 本身是 Shuffle Join 时;



2. 当 Join 本身是 Broadcast Join 并且 join 的某一端下含有 Shuffle Join 时。


Mode

Spark runtime filter 有两种具体的实现可供选择:IN FilterBloom Filter。为了便于读者理解本节内容,以如下 SQL 为例:

    SELECT *
    FROM R
    JOIN S
    ON R.r_sk = S.s_sk
    WHERE S.x = 5


    假设上面 SQL 中所示的 R 和 S 分别是 JOIN 的两个输入端:R 的输入量大,S 的输入量小。下面将以此实例来详细介绍 IN FilterBloom Filter


    1. IN Filter

    首先,我们从 SQL 语法的角度来看看当选择 IN Filter 时,在构造端创建的子查询。可以将它理解为如下的形式:


      SELECT key`
      FROM [table|subquery]
      GROUP BY key`

      这里的 key` 指构造端的 Join key


      结合本节的例子,那就是:

        SELECT S.s_sk
        FROM S
        GROUP BY S.s_sk
        WHERE S.x = 5


        IN Filter 实际还会使用 Spark 支持的函数 Murmur3Hash 计算获得 Join key 的哈希值,用此哈希值来代替 Join key。因此,用下面的 SQL 来表示实际的情况更为准确:


          SELECT Murmur3Hash(S.s_sk)
          FROM S
          GROUP BY Murmur3Hash(S.s_sk)
          WHERE S.x = 5


          上面只是构造了 IN Filter 所需的子查询,实际需要在谓词中使用 IN 从句。从这个形式我们可以看到为什么以 IN Filter 来命名。

            ...
            WHERE key IN
               (SELECT key`
               FROM [table|subquery]
               GROUP BY  key` )

            这里的 key 和 key` 分别指应用端和构造端的 Join key。


            结合本节的例子,就是下面这样。

              SELECT *
              FROM R
              JOIN S
                 ON R.r_sk = S.s_sk
              WHERE Murmur3Hash(R.r_sk) IN
                 (SELECT Murmur3Hash(S.s_sk)
                 FROM S
                 GROUP BY  Murmur3Hash(S.s_sk)
                 WHERE S.x = 5)

              由此可见,IN Filter 只是针对 Join key 构建子查询,因此只需要读取 Join key 这一列的数据。如果表本身支持按列读取,则可以减少 I/O,提升性能。


              2. Bloom Filter

              为了支持 Bloom Filter,Spark runtime filter 引入了一个特殊的聚合函数——BloomFilterAggregate。BloomFilterAggregate 具有两个非常重要的构造参数:

              • estimatedNumItemsExpression:此表达式表示估算得到的元素数量,例如:14亿身份证号码。这对应了 Bloom Filter 的 n 指标。

              • numBitsExpression:此表达式表示要使用的位的数量。这对应了 Bloom Filter 的 m 指标。BloomFilterAggregate 对于使用的哈希函数的数量则通过 (m n) * log(2) 公式计算得出。


              1)BloomFilterAggregate

              Data Structure:BloomFilterAggregate 使用 BitArray 作为数据存储的数据结构。

                final class BitArray {
                private final long[] data;
                private long bitCount;
                ...
                }


                Add:BloomFilterAggregate 没有直接采用多个不同的哈希函数,而是只采用了两个哈希函数,但是通过代码模拟出了多个哈希函数的效果。

                  public boolean putLong(long item) {
                   // Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
                   // hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
                   // Note that `CountMinSketch` use a different strategy, it hash the input long element with
                   // every i to produce n hash values.
                   // TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
                   int h1 = Murmur3_x86_32.hashLong(item, 0);
                   int h2 = Murmur3_x86_32.hashLong(item, h1);


                   long bitSize = bits.bitSize();
                   boolean bitsChanged = false;
                   for (int i = 1; i <= numHashFunctions; i++) {
                     int combinedHash = h1 + (i * h2);
                     // Flip all the bits if it's negative (guaranteed positive number)
                     if (combinedHash < 0) {
                       combinedHash = ~combinedHash;
                     }
                     bitsChanged |= bits.set(combinedHash % bitSize);
                   }
                   return bitsChanged;
                  }


                  CheckBloomFilterAggregate的判重和添加操作类似,依然采用两个哈希函数,并通过代码模拟多个哈希函数的效果。


                    public boolean mightContainLong(long item) {
                     int h1 = Murmur3_x86_32.hashLong(item, 0);
                     int h2 = Murmur3_x86_32.hashLong(item, h1);


                     long bitSize = bits.bitSize();
                     for (int i = 1; i <= numHashFunctions; i++) {
                       int combinedHash = h1 + (i * h2);
                       // Flip all the bits if it's negative (guaranteed positive number)
                       if (combinedHash < 0) {
                         combinedHash = ~combinedHash;
                       }
                       if (!bits.get(combinedHash % bitSize)) {
                         return false;
                       }
                     }
                     return true;
                    }


                    2)BloomFilterMightContain

                    为了在 Predicate 中支持 Bloom Filter,Spark runtime filter 还引入了一个新的 Predicate 实现——BloomFilterMightContain。BloomFilterMightContain 的作用就是调用 MightContainLong 方法来判重

                      override def eval(input: InternalRow): Any = {
                       if (bloomFilter == null) {
                         null
                       } else {
                         val value = valueExpression.eval(input)
                         if (value == null) null else bloomFilter.mightContainLong(value.asInstanceOf[Long])
                       }
                      }


                      3)Example

                      现在已经介绍了 Bloom Filter 在 Spark 中的实现,我们从 SQL 语法的角度,可以将 Bloom Filter 构造的子查询理解为:


                        SELECT BloomFilterAggregate(XxHash64(key`), n_items, n_bits)
                        FROM [table|subquery]


                        上面 SQL 中的 n_items 是估算元素规模的表达式, n_bits 是要使用的位数。可以看到,这里也使用哈希函数提前计算了 Join key 的哈希值。Spark 实现的 Bloom Filter 并不是直接作用在原始数据上的。

                        结合本节的例子,SQL 如下:

                          SELECT BloomFilterAggregate(XxHash64(S.s_sk), n_items, n_bits)
                          FROM S
                          WHERE S.x = 5


                          上面只是构造了 Bloom Filter 所需的子查询,实际需要在谓词中使用 BloomFilterMightContain。


                            SELECT *
                            FROM R
                            JOIN S
                            ON R.r_sk = S.s_sk
                            WHERE BloomFilterMightContain(
                            (SELECT BloomFilterAggregate(n_bits,
                            XxHash64(S.s_sk)) bloom_filter
                            FROM S
                            WHERE S.x = 5), XxHash64(R.r_sk))


                            我们看到 Bloom Filter 也只是针对 Join key 构建子查询,因此只需要读取 Join key 这一列的数据。如果表本身支持按列读取,则可以减少 I/O,提升性能。


                            3. Redundant computation

                            目前,Spark runtime filter 的实现存在冗余的计算——即构建端的物理计划会执行两次,一次用于原来的 Join,另一次则用于新构建的 Runtime Filter 所需要的子查询。社区这么做的原因有三点:

                            1. Spark 仅仅为构建端是小输入时创建 Runtime Filter;

                            2. 可以在优化器中消除相同的子树来优化。这个工作,社区还没有做。

                            3. 设计文档提到的场景——当 Join 本身是 Broadcast Join 并且 join 的某一端下含有 Shuffle Join 时,并没有支持。这是因为实际的代码实现中,发现小端可以Broadcast,将放弃注入Runtime Filter。



                            #03

                            Optimization

                            Support Second Scenario

                            针对 Spark runtime filter 的实现没有支持场景二的问题,我们提交了https://github.com/apache/spark/pull/38388  来支持,并且能够让 Runtime Filter 的子查询复用 Broadcast 的小端数据。


                            Contrast

                            Spark runtime filter 同时采用了互斥的两种实现:In Filter 和 Bloom Filter。这说明这两者各有自己的特点。


                            1. In Filter

                            In Filter 针对 Join key 构建了非常简单的聚合,在应用端也只是用 IN 从句(或转换为 Semi Join)来起到缩减数据量的作用。因此整体实现都很简单。当 In Filter 的子查询返回的数据量很小时,这种方式的执行效率也很高。与此相反,当应用端很大并且 In Filter 的子查询的数据量比较大时,性能会变差。In Filter 的应用端由于只是简单的聚合,因此执行很快。而整个计算的复杂度主要在应用端。


                            目前,In Filter 没有复用 Exchange/Subquery(单独起 Job 更可靠),依赖于存在 Shuffle Join,单独起 Job 执行 Scan + Filter + Aggregate(Join keys, Join keys);构建端输出要求满足 Broadcast 的大小,即默认小于等于 10MB。


                            2. Bloom Filter

                            Bloom Filter 在构建端不只是简单的聚合,其中包含了有状态的 BloomFilterAggregate。BloomFilterAggregate 封装了 Bloom Filter 的算法实现,其空间复杂度是 O(m),时间复杂度是 O(n * k)。当在应用端应用 Bloom Filter 时,其查询的时间复杂度是 O(k)。因此,即使 Bloom Filter 的子查询返回的数据量比较大时,在应用端的执行仍然很高效。而且由于其线性的空间复杂度,Bloom Filter 在通过网络传输到应用端时也会有很高的效率。


                            目前,Bloom Filter 没有复用 Exchange/Subquery(单独起 Job 更可靠),依赖于存在 Shuffle Join,单独起 Job 执行 Scan + Filter + BloomFilterAggregate;应用端数据量默认要求大于 10GB,构建端输出要求小于等于 10MB。


                            参考链接:

                            1.http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg

                            2. https://issues.apache.org/jira/browse/SPARK-11150

                            3. https://issues.apache.org/jira/browse/SPARK-32268

                            4.https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit#


                            关于 Kyligence

                            上海跬智信息技术有限公司 (Kyligence) 由 Apache Kylin 创始团队于 2016 年创办,致力于打造下一代企业级智能多维数据库,为企业简化数据湖上的多维数据分析(OLAP)。通过 AI 增强的高性能分析引擎、统一 SQL 服务接口、业务语义层等功能,Kyligence 提供成本最优的多维数据分析能力,支撑企业商务智能(BI)分析、灵活查询和互联网级数据服务等多类应用场景,助力企业构建更可靠的指标体系,释放业务自助分析潜力。

                            Kyligence 已服务中国、美国、欧洲及亚太的多个银行、证券、保险、制造、零售等行业客户,包括建设银行、浦发银行、招商银行、平安银行、宁波银行、太平洋保险、中国银联、上汽、Costa、UBS、MetLife 等全球知名企业,并和微软、亚马逊、华为、Tableau 等技术领导者达成全球合作伙伴关系。目前公司已经在上海、北京、深圳、厦门、武汉及美国的硅谷、纽约、西雅图等开设分公司或办事机构。



                            👇点击「阅读原文」了解 Gluten

                            文章转载自Kyligence,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                            评论