暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark(7) Spark数据源——parquet文件

别动我的月亮啊 2021-04-27
1193

Parquet是Hadoop生态圈中一种新型列式存储格式,其被多种查询引擎及文件系统支持。

相比于行存储,列存储能带来下面这些优化:

  • 查询的时候不需要扫描全部的数据,而只需要读取每次查询涉及的列,这样可以将IO降低,也可以保存每一列的统计信息,实现部分的谓词下推
  • 由于每一列的成员都是同构的,可以针对不同的数据类型使用更高效的数据压缩算法
  • 由于每一列的成员都是同构的,可以使用更加适合CPU pipeline的编码方式,减小CPU的缓存失效。

Spark SQL支持Parquet文件的读写。当读取Parquet文件的时候,所有的列都会因为兼容性的原因转换为nullable

Parquet文件

Loading Data Programming

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

复制

Partition Discovery

表分区是在Hive这样的系统中常见的优化方法。在分区表中,数据通常被存储在不同的目录下,每个分区的目录路径由分区列值编码形成。

Spark所有的内置数据源都是可以发现和推断分区信息的。例如,我们可以将所有之前是通过的人口数据存储到下面的目录结构中,使用两个列——gender
country
作为分区列。

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

复制

path/to/table
路径下面的内容传递给SparkSession.read.parquet
或者SparkSession.read.load
,Spark可以从路径推断出分区信息。现在DataFrame如下所示:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

复制

注意,分区的列的数据类型是Spark自己推测的。目前支持数字数据类型、日期、时间戳和字符串类型。有时,用户不想要Spark自己推断数据类型,对于这种情况,可以通过spark.sql.sources.partitionColumnTypeInference.enabled
(默认为true)设置。如果这个变量被设置为false,所有的分区列都会使用String类型。

从Spark 1.6.0开始,partition discovery
(分区发现)默认仅在给定路径下查找分区。对于上面的例子,如果用户将path/to/table/gender=male
传递给SparkSession.read.parquet
或者SparkSession.read.load
gender
不会被看作是一个分区列。如果用户指定partition discovery
base path
,可以使用basePath
选项。像上面的例子,设置basePath为path/to/table
后,就算只看部分路径,也是能将gender
作为分区列。

Schema Merging

类似于Protocol Buffer, Avro, 和Thrift。Parquet也支持schema evolution
(schema演化)。用户能能从一个简单的schema开始,逐渐添加更多的列到schema中。最后,用户可以得到多种不同的复杂的Parquet文件,但是它们有着互相兼容的架构。Parquet可以检测到这种情况,并合并这些schema。

schema是一种昂贵的操作,在大多数情况下不会使用,因此从Spark 1.5.0开始就默认关闭它了。我们可以通过下面两种方式开启它:

  • 当读取Parquet文件的时候,设置数据源选项mergeSchema
    为true
  • 设置全局SQL选项spark.sql.parquet.mergeSchema
    为true
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value""square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value""cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema""true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

复制

Hive metastore Parquet表转换

当从Hive metastore parquet表中读取数据或者写入未分区的表时,Spark会尝试使用自己的Parquet支持而不是Hive SerDe来获取更好的性能。这种行为由spark.sql.hive.convertMetastoreParquet
配置控制(默认开启)

Hive/Parquet Schema协调

从table schema处理的角度上面来说,Hive和Parquet有两个关键的不同点

  1. Hive不区分大小写
  2. Hive认为所有列都可以为空,而nullability特性在Parquet是非常重要的。

由于上面的原因,在将一个Hive metastore Parquet table
转换成一个Spark SQL Parquet table
的时候我们必须对Hive metastore schema
Parquet schema
进行协调。协调规则有:

  1. 无论是不是为nullability
    ,两个schema中具有相同名称的字段必须具有相同的数据类型。协调字段应该具有Parquet的数据类型,以便遵循nullabitity
  2. 协调后的schema应该完全包含在Hive Metastore schema中定义的字段
  • 仅仅出现在Parquet schema的字段会在协调schema中销毁
  • 只出现在hive metastore schema的字段会以nullable
    字段的形式添加到schema中。

metadata Refreshing(元数据刷新)

为了获取更好的性能,Spark SQL缓存Parquet的元数据。当Hive metastore Parquet table是能被转换的时候,被转换的表的元数据也会被缓存。如果那些表被Hive或者其他工具更新的时候,我们就需要自己去刷新它们,来保证数据的一致性。

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")

复制

Configuration

Parquet的配置能使用SparkSession的setConf
方法,或者使用SQL运行SET key=value
指令。

来自官方文档:Parquet Files - Spark 3.1.1 Documentation (apache.org)

Property NameDefaultMeaningSince Version
spark.sql.parquet.binaryAsString
falseSome other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.1.1.1
spark.sql.parquet.int96AsTimestamp
trueSome Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.1.3.0
spark.sql.parquet.compression.codec
snappySets the compression codec used when writing Parquet files. If either compression
or parquet.compression
is specified in the table-specific options/properties, the precedence would be compression
, parquet.compression
, spark.sql.parquet.compression.codec
. Acceptable values include: none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd. Note that zstd
requires ZStandardCodec
to be installed before Hadoop 2.9.0, brotli
requires BrotliCodec
to be installed.
1.1.1
spark.sql.parquet.filterPushdown
trueEnables Parquet filter push-down optimization when set to true.1.2.0
spark.sql.hive.convertMetastoreParquet
trueWhen set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in support.1.1.1
spark.sql.parquet.mergeSchema
falseWhen true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.1.5.0
spark.sql.parquet.writeLegacyFormat
falseIf true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values will be written in Apache Parquet's fixed-length byte array format, which other systems such as Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For example, decimals will be written in int-based format. If Parquet output is intended for use with systems that do not support this newer format, set to true.1.6.0


文章转载自别动我的月亮啊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论