暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

[翻译]Scalable Partition Handling ... in Apache Spark 2.1

偷功 2016-12-23
496

[翻译]Scalable Partition Handling for Cloud-NativeArchitecture in Apache Spark 2.1


https://databricks.com/blog/2016/12/15/scalable-partition-handling-for-cloud-native-architecture-in-apache-spark-2-1.html

by Eric LiangMichael Allman and Wenchen Fan Posted in ENGINEERING BLOGDecember 15,2016

 

大致内容翻译,Not逐字逐句……

本博客讨论了即将发布的最重要的功能之一:可扩展分区处理.


Spark SQL让我们能过在一个job中查询T级别的数据。但通常情况下,用户只希望访问其中的一小部分数据,比如,扫描San Francisco而不是全世界的用户活动。这是通过用常用的过滤字段,如日期或地点,将表的数据文件进行分区来实现的。

之后Spark SQL就可以使用这些分区信息进行“裁剪”或忽略与用户查询无关的文件。然而,之前的Spark 版本中,首次读取表可能会很慢,因为Spark必须先发现哪些分区是存在的(分区发现)。

Spark 2.1 中,我们大大改进了只查询一小部分表分区的初始延迟。某些情况下,在一个新集群上需要花费数十分钟的查询,现在可以以秒为单位来执行。我们的改进降低了表的内存开销,使SQL从表元数据完全缓存在内存中的“热”集群转换为冷集群。

Spark 2.1也统一了数据源(DataSource)和Hive表的分区管理功能。这意味着这两种表现在支持相同的分区DDL操作,比如,增加,删除和重定位特定分区。


一、Spark表管理

为了更好的理解为什么会有延迟,先介绍下Spark之前版中是如何管理表的。在这些版本中,Sparkcatalog支持两种类型的表:

1. DataSource tables Spark中创建表时首选的格式。这种类型的表可以直接通过将一DataFrame保存到文件系统来定义,比如:

df.write.partitionBy("date").saveAsTable("my_metrics")
,或通过一个表创建语句(CREATETABLE statement),如CREATE TABLE my_metricsUSING parquet PARTITIONED BY date
.

在之前的版本,Spark从文件系统和内存缓存中发现DataSource的表元数据。该元数据包含分区列表以及每个分区的文件统计。一旦被缓存,后续的查询就可以直接在内存中对表分区进行裁剪。

 

2. 对Apache Hive转来的用户,Spark SQL也可以读取由Hive serdes定义的catalog 表。Spark可以透明地将Hive表转换为DataSource格式,以便利用在Spark SQL中在IO性能上的改进。在内部,Spark会从Hive metastore中读取表和分区元数据信息并将其缓存到内存中。

这种策略下,在表元数据被缓存到内存时可以优化性能,但同时也存在两个缺点:首先,表的初始查询会被阻塞,直到Spark加载了所有的表分区元数据。对分区很多的表,初始查询时,递归地扫描文件系统来发现文件元数据会消耗不少分钟,尤其是存储在云存储,如S3上的数据。其次,表的所有元数据都需要存储在driver进程的内存中,这会增加内存压力。

我们已经从我们的客户和其他大规模Spark的用户处看到了这个问题。虽然有时候可以通过使用其他Spark APIs直接读取文件来避免初始查询的延迟,但却不能让表的性能可扩展。

Spark 2.1 DatabricksVideoAmp一起合作来消除此开销,以及统一DataSourceHive格式的表管理。

VideoAmp简介…….(略)

 

Michael Allman (VideoAmp) 描述了项目中他们参与的部分:

  Spark 2.1之前,获取最大的表的元数据需要几分钟时间,并且每次增加一个新的分区就需要重新获取一次。Spark 2.0发布不久,我们开始构建一个基于延迟加载分区元数据的方法原型。同时,在社区阐明我们的想法。我们为其中一个原型向Spark源仓库提交一个PR(pullrequest),并开始为实现生产级别的可靠性和性能而一起协作。


二、性能基准

在一头扎进技术细节之前,我们先展示下我们内部一个度量表的查询延迟改进,该表有50,000个以上的分区(在Databrickswe believe in eating our own dog food)。表按日期和度量类型进行分区,大致如下:

CREATE TABLE metricData (

     value BIGINT,

     dimensions struct<...>,

     date STRING,

     metric STRING)

USING parquet

OPTIONS (path  'dbfs:/mnt/path/to/data-root')

PARTITIONED BY (date, metric)

 

我们使用一个小的Databricks集群,包含8workers32cores250G内存。并基于一天,一周和一个月的数据运行一个简单的聚合查询,并评估新启动的Spark 集群上,首次获取结果的时间开销:

SELECT metric, avg(value)

FROM metricData

WHERE date >= "2016-11-01"  AND date <= "2016-11-07"   // ex. 1 week

GROUP BY metric

但在Spark 2.1 中启动了可扩展分区管理,读取一天的数据仅10秒多一点。该时间与查询所涉及的分区数成线性关系。为了理解时间花在什么地方,我们将查询时间分成查询计划期间的时间开销和Spark job执行的时间开销:


可以看到,在可扩展分区管理启动后,查询计划时间的开销(红条)是随着执行时间(蓝条)是成比例增加的,因为扫描了更多的数据。如果再次查询,则由于元数据缓存,查询计划时间可以忽略不计(太小而无法图形化)。

而对应的,如果没有启动,可以看到首次查询表时,不管是哪个查询,这都有一个很大的近200秒的常数因子。

我们预测这些改进在原生云上的Spark部署中会非常明显,因为在云上文件系统元数据的处理性能是相当慢的,并且由于短期Spark集群的使用,会频繁地要求重新缓存元数据。即使是HDFS用户也可以看到这个改进对大分区表的好处。

(一)   VideoAmp 生产基准

我们也表明了这些改进极大地影响了VideoAmp工作负载中的生产查询。它们运行复杂的多阶段(multi-stage)的查询,包含数十个列,在带有成千上万个分区的定期更新的表上多次聚合和联合。

VideoAmp测量了一些日常查询中在计划生成上花的时间比例,比较了Spark 2.02.1之间的性能。他们发现了重大的(有时是戏剧性的)改进:


三、 实现

这些好处是SparkSQL内部的两个重大改变来实现的。

  1. Hive DataSource 表, Spark 现在在system catalog (和Hive metastore 一样)中存储Hive DataSource 表的分区元数据。通过新的PruneFileSourcePartitions  rule,在从文件系统读取元数据信息前,Catalyst优化器使用cataloglogical planning期间进行分区裁剪。这避免了在未被使用的分区中查找文件。

  2. 文件统计信息现在可以在planning期间部分地、增量地缓存,而不是全部预缓存。Spark 需要知道文件的大小,以便在物理计划中将它们拆分到各个读取tasks中。而不是急于在内存中缓存所有表文件统计信息,表现在共享一个固定250M大小(可配置)的缓存,可以不冒内存错误的风险,加快重复性查询速度。

总的来说,这些修改意味着在一个冷启动的Spark中查询速度更快。由于增量文件统计缓存,和旧的分区管理策略相比,重复查询几乎没有性能损失。


四、 新支持的分区 DDLs

这些修改的另一个好处是DataSource表支持之前只在Hive表上可用的几个DDL命令。这些DDLs允许为分区指定文件位置,而不是默认布局,比如,partition (date='2016-11-01', metric='m1')
可以放置到任意的文件系统位置,而不仅仅是
/date=2016-11-01/metric=m1

ALTER TABLE table_name ADD [IF NOT  EXISTS]

     (PARTITION part_spec [LOCATION path], ...)

ALTER TABLE table_name DROP [IF EXISTS]  (PARTITION part_spec, ...)

ALTER TABLE table_name PARTITION  part_spec SET LOCATION path

SHOW PARTITIONS [db_name.]table_name  [PARTITION part_spec]

当然,你仍然可以使用原生的DataFrameAPIs,如df.write.insertInto

df.write.saveAsTable
来添加到分区的表中。关于
Databricks支持的
DDLs
的更多信息,参考
language manual


五、 迁移提示

虽然在Spark 2.1创建新的DataSource表时,会默认使用新的可扩展的分区管理策略,但为了后向兼容,已有的表并不会这样。为了让已经存在的DataSource表也能利用这些改进,你可以使用MSCK命令将已经存在的一个表,从使用旧的分区管理策略转换为使用新的:

MSCK REPAIR TABLE table_name;

在现有文件上创建新表时,你也需要使用MSCK REPAIR TABLE

注意,这可能是一个向后不兼容的更改,因为直接写入表的底层文件将不再反映在表中,直到catalog也被更新。这个同步由Spark 2.1自动完成,但是从较旧的Spark版本,外部系统或Spark表的API之外的写入将需要再次调用MSCK REPAIR TABLE

如何获知一个表的catalog分区管理是否启用? 运行一个DESCRIBEFORMATTED table_name 命令,然后检查输出信息中是否包含PartitionProvider Catalog 

scala> sql("describe formatted test_table")
复制
.filter("col_name like '%Partition Provider%'").show
复制
+-------------------+---------+-------+
复制
|           col_name|data_type|comment|
复制
+-------------------+---------+-------+
复制
|Partition Provider:|  Catalog|       |
复制
+-------------------+---------+-------+
复制

六、   结论

本博客描述内容包含在Apache Spark 2.1版本中,对应参考JIRASPARK-17861即可。

 




文章转载自偷功,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论