暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用过滤器下推优化 Spark 查询

ApacheHudi 2022-04-20
912

Apache Spark 是一个处理大数据的分布式计算框架。Spark Driver将整个查询拆分为Task任务,并将这些任务发送到集群不同节点上的Executor,为了提高查询性能,一种策略是减少从数据存储传输到这些Executor的数据量。防止加载不需要数据的一种方法是过滤器下推(有时也称为谓词下推),它允许在将数据源加载到Executor进程之前在数据源处执行某些过滤器,如果执行程序与数据不在同一台物理机器上,这将大大减少网络带宽。在许多情况下,Spark 会自动应用过滤器下推,无需用户显示指定。但在某些情况下,我们将在本文中看到,用户必须提供特定信息,甚至自己实现某些功能,尤其是在创建自定义数据源时针对不支持的数据库类型或不支持的文件类型。示例将使用 Scala 语言编写,因为使用的 Spark 中大量类都有Scala编写,但是也可以使用 Java 等其他语言实现此功能,示例项目的完整源代码如下:https://github.com/dynatrace-research/filter-pushdown-examples

1. 入门

为了使用过滤器下推和其他优化,我们使用 Spark SQL 模块,这个模块允许我们通过合并底层数据的模式信息来提高查询性能——使用 Spark DataFrames。Spark SQL 操作通过 SparkSession 访问,我们可以使用构建器创建它:

val session = SparkSession
 .builder
 .config(“spark.master”, “local[*]”)
 .getOrCreate

对于示例,我们将在本地模式下启动 Spark——从上面源代码中的配置可以看出。当使用设置 local[N] 时,Spark 创建 N 个本地执行器,而 local[*] 为运行程序的机器的每个Core创建一个Executor。

2. 示例数据集

本文示例将使用如下简单的人员表,包含四列:id,name,age,position

id,name,age,position
0,Alice,20,developer
1,Bob,35,tester
2,Homer,40,developer
3,Marge,23,developer
4,Bart,29,designer
5,Lisa,43,developer
6,Maggie,20,tester
7,Frodo,33,developer
8,Sam,50,team lead
9,Gandalf,50,db engineer

3. 如何获取过滤器

第一个示例将从 CSV 文件中读取上表并从列表中选择所有"测试人员"(tester)。当执行一个包含这样一个过滤器但没有过滤器下推的查询时,执行器将评估这个过滤器。查看上面的示例数据集可以看到表中的大部分行将被过滤器删除,并且十个人中只有两个将作为查询结果返回。

val dataFramePosition = session
  .read.option("header", value = true)
  .csv("Filter/src/main/resources/data.csv")
  .filter(col("position") === "tester")
dataFramePosition.show()

第二行和第三行从 CSV 文件中读取数据,并使用 CSV 文件第一行中的列名来创建数据集的Schema。在 filter 命令中我们定义postion列的值必须等于“tester”,最后一行的 show 方法将结果打印到控制台。

+---+------+---+--------+
| id|  name|age|position|
+---+------+---+--------+
|  1|   Bob| 35|  tester|
|  6|Maggie| 29|  tester|
+---+------+---+--------+

使用 DataFrame 时Spark允许此过滤器在数据源处执行——过滤器被下推到数据源,我们可以通过使用 explain 方法分析 DataFrame 的执行计划来确认过滤器下推:

dataFramePosition.explain()

上面的查询给出以下输出:

== Physical Plan ==
*(1) Project [id#16, name#17, age#18, position#19]
+- *(1) Filter (isnotnull(position#19) AND (position#19 = tester))
+- FileScan csv (…) PushedFilters: [IsNotNull(position), EqualTo(position,tester)], ReadSchema: struct<id:string,name:string,age:string,position:string>

可以看到过滤已经下推,还包括检查该字段是否为空的附加过滤器,为了防止所有行被加载到 Spark Executor中,数据源(本身需要有过滤能力)必须评估过滤器并排除相应的行,Spark CSV 读取器仅将那些满足过滤器的行加载到Executor,但并非所有数据源都如此。例如自定义数据源必须实现下推过滤器的处理。

4. 包含cast的过滤器

并非所有过滤器都被推送到数据源,一个值得注意的例子是所有需要转换字段内容的过滤器,为了证明这一点,可以将之前示例中的过滤器更改为仅选择 25 岁以下的人。

val dataFrameAge = session
  .read.option("header", value = true)
  .csv("Filter/src/main/resources/data.csv")
  .filter(col("age") <= 25)

show 和 explain 方法可以再次用于获取最终结果和查询执行计划的描述:

+---+------+---+---------+
| id|  name|age| position|
+---+------+---+---------+
|  0| Alice| 20|developer|
|  3| Marge| 23|developer|
|  6|Maggie| 20|   tester|
+---+------+---+---------+
(…) Filter (isnotnull(age#63) AND (cast(age#63 as int) <= 25))
(…) PushedFilters: [IsNotNull(age)], ReadSchema: struct<id:string,name:string,age:string,position:string>

查看输出可以看到返回了正确的行,但实际上并没有下推过滤器,数据集的默认Schema会阻止过滤器下推,因为它将所有列的类型设置为 StringType。因此过滤器需要强制转换为整数,这是一个不会下推的操作。有两种方法可以避免这个问题:第一种方法是使用 Spark 选项来推断 CSV 文件中数据的Schema。但是这需要加载文件两次——第一次从数据中推断Schema,第二次实际加载数据。第二种方法是显式定义打开的 CSV 文件的Schema。由于我们已经知道数据的结构,我们将定义Schema并将其添加到查询中,在这个新模式中我们将 age 列(以及另外的 id 列)定义为具有整数类型:

val schema = StructType(Array(
 StructField(“id”, IntegerType, nullable = true),
 StructField(“name”, StringType, nullable = true),
 StructField(“age”, IntegerType, nullable = true),
 StructField(“position”, StringType, nullable = true)))
 
 val dataFrameSchema = session
 .read.option(“header”, value = true)
 .schema(schema)
 .csv(“Filter/src/main/resources/data.csv”)
 .filter(col(“age”) <= 25)

此时 DataFrame 的 explain 方法的输出显示过滤器再次被下推,并且没有应用强制转换。

(…) PushedFilters: [IsNotNull(age), LessThanOrEqual(age,25)], ReadSchema: struct<id:int,name:string,age:int,position:string>

5. 自定义实现 DataFrame

如上所述,如果数据源不评估过滤器,则将过滤器下推到数据源不会带来任何好处。例如在创建自己的数据源以连接到不受支持的数据库或使用不受支持的文件类型时。创建新数据源的一种方法是扩展 BaseRelation 并从这个新类创建一个 DataFrame,在我们的例子中名为 MyBaseRelation,在扩展 BaseRelation 时,还应该实现以下 Scanner 特征/接口之一,它们都包含一个名为 buildScan 的方法(具有不同的参数)来加载实际的数据项:

  • • TableScan:最基本的特征,不允许在数据源中进行任何过滤或选择列,具有此特征的数据源的选择等效于 SQL 查询“SELECT * FROM tableA”。

  • • PrunedScan:第二个特征表明数据源可以选择列的子集并改变它们的顺序——相当于“SELECT fieldB, fieldA FROM tableA”

  • • PrunedFilteredScan:有了这个特性,既可以选择列,又可以对数据源的行应用特定的过滤器。由于我们想在 BaseRelation 中使用过滤器,因此将使用这个特性。

  • • CatalystScan:最终的 Scanner 特征还提供选择列和过滤行。但是它直接使用查询计划中的表达式,而不是从它们生成的过滤器。因此某些未转换为过滤器的过滤器表达式可用于 CatalystScan 特征,但不适用于 PrunedFilteredScan,例如 array_contains 或 size 之类的数组表达式,然而使用常见的表达方式非常不推荐,此特性在 Spark 3 中仍被标记为“实验性”且不稳定。

为了使这个简单的示例更加简洁,我们将 CSV 文件替换为存储在字段 DATA 中的硬编码行序列。此外我们必须在 schema 方法中定义这些行的结构,使用与上一个示例相同的Schema。过滤器下推并不意味着必须在加载数据项期间执行过滤器。默认情况下过滤器会在后续步骤中对结果集再次执行,因此仅部分执行或根本不执行的下推过滤器不会导致错误结果。相反如果所选特征支持,则必须在 BaseRelation 中执行所需列的选择(选择下推),因此必须为参数 requiredColumns 中给出的预期列实现基本映射。

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
 val seq = for (row <- DATA if passFilters(row, filters))
 yield Row.fromSeq(for (column <- requiredColumns)
 yield row.get(schema.fieldIndex(column)))
 println(“Number of returned rows: “ + seq.size)
 sparkSession.sparkContext.parallelize(seq)
}

如果原始行通过所有过滤器,则 buildScan 方法从 DATA 中存储的每一行创建一个新的 Row 对象,如果没有过滤掉该行,则只收集 requiredColumns 中请求的列,并将新的 Row 对象添加到集合 seq 中。然后该方法打印返回的行数。与过滤器下推类似,选择下推也旨在减少加载到Executor的数据,此技术可确保仅加载结果中存在的列或计算结果所需的列,例如过滤器中使用的列。在列选择代码中我们还添加了对过滤器处理的调用和返回行数的日志,从 dataFrame.explain() 的输出中我们知道到目前为止,查询已经下推了三种类型的过滤器:IsNotNull、EqualTo 和 LessThanOrEqual。需要 IsNotNull 过滤器,因为Schema中的字段指定它们可以为空(nullable = true)。在相关列的Schema中将 nullable 设置为 false 会删除这些过滤器,上面代码中调用的 passFilters 方法应用了我们示例中使用的过滤器:

def passFilters(row: Row, filters: Array[Filter]): Boolean = {
  for (filter <- filters) {
    filter match {
      case IsNotNull(x) =>
        if (row.get(schema.fieldIndex(x)) == nullreturn false
      case EqualTo(attribute, value) =>
        val index = schema.fieldIndex(attribute)
        if (index == -1 || !row.get(index).equals(value)) return false
      case LessThanOrEqual(attribute, value) =>
        if (!checkInequality(row, attribute, value, (a: Double, b: Double) => a <= b)) return false
      case LessThan(attribute, value) =>
        if (!checkInequality(row, attribute, value, (a: Double, b: Double) => a < b)) return false
      case GreaterThanOrEqual(attribute, value) =>
        if (!checkInequality(row, attribute, value, (a: Double, b: Double) => a >= b)) return false
      case GreaterThan(attribute, value) =>
        if (!checkInequality(row, attribute, value, (a: Double, b: Double) => a > b)) return false
      case _ => // ignore other filters
    }
  }
  true
}

对于不等式过滤器(<=、<、>= 和 >),我们只处理integer和double类型。如上所述所有下推到 MyBaseRelation 的过滤器都会在稍后再次执行,因此通过一些在后续步骤中删除的行也不是问题,由于显式 IsNotNull 过滤器,我们也不必对字段值执行空值检查。

def checkInequality(row: Row, attribute: String, value: Any, comparison: (Double, Double) => Boolean): Boolean = {
  val index = schema.fieldIndex(attribute)
  val doubleVal = schema.fields(index).dataType match {
    case _: IntegerType => row.get(index).asInstanceOf[Integer].doubleValue()
    case _: DoubleType => row.get(index).asInstanceOf[Double]
    case _ => return true
  }
  value match {
    case i: Integer => comparison(doubleVal, i.doubleValue())
    case d: Double => comparison(doubleVal, d)
    case _ => true
  }
}

在此示例中我们从 MyBaseRelation 的实例创建一个新的 DataFrame,并将前面示例中的两个过滤器应用于它。

var dataFrameBaseRelation = session
 .baseRelationToDataFrame(new MyBaseRelation(session))
 .filter(col(“position”) === “tester”)
 .filter(col(“age”) <= 25)
 dataFrameBaseRelation.show()
 dataFrameBaseRelation.explain()

输出结果可以看到过滤器被下推并正确执行,只有一名测试人员最多 25 岁,并且 buildScan 方法的日志语句确认只有一行没有被过滤器删除。

Number of returned rows: 1
+---+------+---+--------+
| id|  name|age|position|
+---+------+---+--------+
|  6|Maggie| 20|  tester|
+---+------+---+--------+

6. 结论

过滤器下推和选择下推是两种可以大大提高查询性能的技术,虽然许多标准技术(例如 CSV 文件或 parquet 文件的 Spark 读取器)已经实现了这些策略,但在实现自定义数据库连接器或特定数据结构时仍然需要自定义实现处理,另外并非每个过滤器操作都被推送到数据源,并且小的更改(例如数据集Schema的变更)可能会对性能产生巨大影响。


文章转载自ApacheHudi,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论