暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark(6) Spark数据源——通用文件选项

别动我的月亮啊 2021-04-27
1952

这些通用选项/配置仅在使用基于文件的数据源(parquet
orc
avro
json
csv
text
)时才有效


注意:本文的示例中使用的目录结构层次是:

dir1/
 ├── dir2/
 │    └── file2.parquet (schema: <file: string>, content: "file2.parquet")
 └── file1.parquet (schema: <file, string>, content: "file1.parquet")
 └── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")

复制

通用文件选项

忽略损坏文件

Spark允许用户在读取文件的时候使用spark.sql.files.ignoreCorruptFiles
去忽略损坏的文件。当设置为true的时候,Spark作业在遇到损坏文件的时候会继续运行,并将已经读取的内容返回

我们可以使用下面的例子演示这个过程:

// enable ignore corrupt files
scala> spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
scala> val testCorruptDF = spark.read.parquet(
  "examples/src/main/resources/dir1/",
  "examples/src/main/resources/dir1/dir2/")
scala> testCorruptDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+

复制

忽略遗失文件

Spark允许用户在读取文件的时候使用spark.sql.files.ignoewMissingFiles
参数去忽略遗失文件。在这里,遗失文件是在创建完DataFrame之后,用户将目录下的文件删除。当设置为真的时候,Spark作业会在遇到遗失文件的时候继续运行,并且将已经读取的数据返回。

Path Global Filter

如果我们想要通过匹配一个模式串得到想要的文件,可以使用pathGlobFilter
选项。这不会改变partition discovery
的行为。

要加载与给定的glob pattern
匹配的文件时,并保持partition discovery
的行为,可以像下面这样用:

scala> val testGlobFilterDF = spark.read.format("parquet")
  .option("pathGlobFilter""*.parquet"// json file should be filtered out
  .load("examples/src/main/resources/dir1")
scala> testGlobFilterDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+

复制

递归文件查找

recursiveFileLookup
被用来递归加载文件,它会禁止自动分区推断。默认为flase。如果recursiveFileLookup
为真,且数据源显示指定partitionSpec
的时候会抛出异常

自动分区推断:Spark SQL的Parquet数据源可以支持自动根据目录名推断出分区信息。

val recursiveLoadedDF = spark.read.format("parquet")
  .option("recursiveFileLookup""true")
  .load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+

复制

Modification Time Path Filters

为了让Spark批处理查询能够实现更大的文件加载粒度,可以使用modifiedBefore
modifiedAfter
选项,它们可以同时使用,也可以单独使用。

  • modifiedBefore
    :仅筛选出修改时间在指定时间之前的文件,提供的时间戳要用以下格式YYYY-MM-DDTHH:mm:ss
  • modifiedAfter
    :筛选修改时间在指定时间之后的文件,提供的时间戳要用下面的格式YYYY-MM-DDTHH:mm:ss

当没有提供timezone
选项的时候,时间戳由spark.sql.session.timeZone
进行解释。

scala> val beforeFilterDF = spark.read.format("parquet")
  // Files modified before 07/01/2020 at 05:30 are allowed
  .option("modifiedBefore""2020-07-01T05:30:00")
  .load("examples/src/main/resources/dir1");
scala> beforeFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
scala> val afterFilterDF = spark.read.format("parquet")
   // Files modified after 06/01/2020 at 05:30 are allowed
  .option("modifiedAfter""2020-06-01T05:30:00")
  .load("examples/src/main/resources/dir1");
scala> afterFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// +-------------+

复制


文章转载自别动我的月亮啊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论