这些通用选项/配置仅在使用基于文件的数据源(parquet
,orc
,avro
,json
,csv
,text
)时才有效
注意:本文的示例中使用的目录结构层次是:
dir1/
├── dir2/
│ └── file2.parquet (schema: <file: string>, content: "file2.parquet")
└── file1.parquet (schema: <file, string>, content: "file1.parquet")
└── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")复制
通用文件选项
忽略损坏文件
Spark允许用户在读取文件的时候使用spark.sql.files.ignoreCorruptFiles
去忽略损坏的文件。当设置为true的时候,Spark作业在遇到损坏文件的时候会继续运行,并将已经读取的内容返回
我们可以使用下面的例子演示这个过程:
// enable ignore corrupt files
scala> spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
scala> val testCorruptDF = spark.read.parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
scala> testCorruptDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+复制
忽略遗失文件
Spark允许用户在读取文件的时候使用spark.sql.files.ignoewMissingFiles
参数去忽略遗失文件。在这里,遗失文件是在创建完DataFrame之后,用户将目录下的文件删除。当设置为真的时候,Spark作业会在遇到遗失文件的时候继续运行,并且将已经读取的数据返回。
Path Global Filter
如果我们想要通过匹配一个模式串得到想要的文件,可以使用pathGlobFilter
选项。这不会改变partition discovery
的行为。
要加载与给定的glob pattern
匹配的文件时,并保持partition discovery
的行为,可以像下面这样用:
scala> val testGlobFilterDF = spark.read.format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1")
scala> testGlobFilterDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+复制
递归文件查找
recursiveFileLookup
被用来递归加载文件,它会禁止自动分区推断。默认为flase。如果recursiveFileLookup
为真,且数据源显示指定partitionSpec
的时候会抛出异常
“自动分区推断:Spark SQL的Parquet数据源可以支持自动根据目录名推断出分区信息。
”
val recursiveLoadedDF = spark.read.format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+复制
Modification Time Path Filters
为了让Spark批处理查询能够实现更大的文件加载粒度,可以使用modifiedBefore
和modifiedAfter
选项,它们可以同时使用,也可以单独使用。
modifiedBefore
:仅筛选出修改时间在指定时间之前的文件,提供的时间戳要用以下格式YYYY-MM-DDTHH:mm:ss
modifiedAfter
:筛选修改时间在指定时间之后的文件,提供的时间戳要用下面的格式YYYY-MM-DDTHH:mm:ss
当没有提供timezone
选项的时候,时间戳由spark.sql.session.timeZone
进行解释。
scala> val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
scala> beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
scala> val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
scala> afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+复制