本文主要介绍Spark数据源的加载与保存有关的函数。通过一些库及其选项可以让数据源与DataFrame之间进行灵活的转换
通用加载/保存函数
如果用户不另外配置spark.sql.sources.default
,所有的操作将会使用默认数据源。
scala> val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
scala> usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")复制
手动指定选项
用户可以手动设置选项来使用自己想要的数据源。数据源可以通过全限定名(i.e., org.apache.spark.sql.parquet
),也能通过使用简单名称的形式(json
,parquet
,jdbc
,orc
等等)。可以使用这种语法,将任意类型的数据源加载的DataFrame
转换成其他类型。
“详情请参阅相关语言的API文档来获取可用选项。例如
”org.apache.spark.sql.DataFrameReader
和org.spark.sql.DataFrameWriter
。这些选项在非ScalaAPI中也可以可用的(例如:PySpark)。对于其他格式,阅读API文档可以了解
例如,我们可以使用下面的形式来加载JSON文件
scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
scala> peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")复制
还可以通过下面的形式加载CSV文件:
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")复制
写操作也能使用选项。例如,我们能控制ORC数据源的bloom过滤器和字典编码。下面的例子就是对favorite_color
创建一个bloom过滤器并使用字典编码。对于Parquet
来说,也存在parquet.enable.dictionary
。查看Apache ORC/Parquet
可以查看ORC/Parquet
的细节
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")复制
保存模式
Save操作可以选择SaveMode
来指定如何处理现有数据。这些保存模式不使用任何锁定,也不是原子的。除此之外,当数据被Overwrite
的时候,旧数据会在写入新数据之前被删除。
Scala/Java | Any Language | Meaning |
---|---|---|
SavaMode. ErrorIfExists (default) | “error” or “errorifexist" | 当保存一个DataFrame到数据源中时,如果数据已经存在,会抛出异常 |
SaveMode.Append | “append" | 当保存DataFrame到数据源时,如果数据已经存在,那么DataFrame中的内容会追加到现有数据的后面 |
SaveMode.Overwrite | “overwrite” | 如果数据已经存在,那么旧数据会被新数据覆盖 |
SaveMode.Ignore | “ignore” | 如果数据已经存在,sava 操作将不会保存DataFrame中的任何数据,也不会改变现有的数据 |
保存到持久表
通过saveAsTabel
命令,DataFrame
能以持久表的方式保存到Hive中。使用这个功能不需要现有的Hive。Spark会创建本地Hive存储。与CreateOrReplaceTempView
不通过,saveAsTable
会物化DataFrame并创建一个指向Hive存储数据的指针。持久表在Spark重启之后也会存在。也可以使用SparkSession.table()
方法将表转成DataFrame
对于基于文件的数据源来说(如text
,parquet
,json
等等)。通过path选项来指定表路径df.write.option("path", "/{path}).saveAsTable("t")"
。当表被销毁,表路径不会被移出而表中的数据依旧存在。如果没有指定表路径,Spark将会向仓库目录下的默认表路径中写数据。当这里的表被销毁时,默认表路径也会被删除。
从Spark 2.1开始,持久数据源的表Hive存储单元中存储了每个分区的元数据,这带来了下面几个好处
元数据能够只返回查询的必要分区 Datasource API可以使用Hive DDL(像 ALTER TABLE PARTITION ... SET LOCATION
)来创建表了
注意:在创建外部数据源表(使用了path
选项)的时候,默认情况下不会收集分区信息。
为了同步存储单元的分区信息,用户可以调用MSCK REPAIR TABLE
Bucketing,Sorting and Partitioning
对于基于文件的数据源,可以对输出进行bucket
、sort
和partition
。Bucketing和Sorting可以被持久表使用
scala> peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
复制
使用DataSet API的时候,partitioning
能与save
和saveAsTable
一起使用。
scala> usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
复制
对于单个表来说,也可以同时使用partitioning
和bucketing
scala> usersDF.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("users_partitioned_bucketed")
复制
partitionBy
创建了上面那种的目录结构。因此,它对具有高基数(重复值比较少)的列的适用性有限,相反,bucketBy
可以将数据分发到固定数量的bucket中,能在唯一键的数量没有约束的情况下使用。