暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark(5) Spark数据源——通用加载/保存函数

别动我的月亮啊 2021-04-27
885

本文主要介绍Spark数据源的加载与保存有关的函数。通过一些库及其选项可以让数据源与DataFrame之间进行灵活的转换

通用加载/保存函数

如果用户不另外配置spark.sql.sources.default
,所有的操作将会使用默认数据源。

scala> val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
scala> usersDF.select("name""favorite_color").write.save("namesAndFavColors.parquet")

复制

手动指定选项

用户可以手动设置选项来使用自己想要的数据源。数据源可以通过全限定名(i.e., org.apache.spark.sql.parquet
),也能通过使用简单名称的形式(json
parquet
jdbc
orc
等等)。可以使用这种语法,将任意类型的数据源加载的DataFrame
转换成其他类型。

详情请参阅相关语言的API文档来获取可用选项。例如org.apache.spark.sql.DataFrameReader
org.spark.sql.DataFrameWriter
。这些选项在非ScalaAPI中也可以可用的(例如:PySpark)。对于其他格式,阅读API文档可以了解

例如,我们可以使用下面的形式来加载JSON文件

scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
scala> peopleDF.select("name""age").write.format("parquet").save("namesAndAges.parquet")

复制

还可以通过下面的形式加载CSV文件:

val peopleDFCsv = spark.read.format("csv")
  .option("sep"";")
  .option("inferSchema""true")
  .option("header""true")
  .load("examples/src/main/resources/people.csv")

复制

写操作也能使用选项。例如,我们能控制ORC数据源的bloom过滤器和字典编码。下面的例子就是对favorite_color
创建一个bloom过滤器并使用字典编码。对于Parquet
来说,也存在parquet.enable.dictionary
。查看Apache ORC/Parquet
可以查看ORC/Parquet
的细节

usersDF.write.format("orc")
  .option("orc.bloom.filter.columns""favorite_color")
  .option("orc.dictionary.key.threshold""1.0")
  .option("orc.column.encoding.direct""name")
  .save("users_with_options.orc")

复制

保存模式

Save操作可以选择SaveMode
来指定如何处理现有数据。这些保存模式不使用任何锁定,也不是原子的。除此之外,当数据被Overwrite
的时候,旧数据会在写入新数据之前被删除。

Scala/JavaAny LanguageMeaning
SavaMode. ErrorIfExists
(default)
“error” or “errorifexist"
当保存一个DataFrame到数据源中时,如果数据已经存在,会抛出异常
SaveMode.Append
“append"
当保存DataFrame到数据源时,如果数据已经存在,那么DataFrame中的内容会追加到现有数据的后面
SaveMode.Overwrite
“overwrite”
如果数据已经存在,那么旧数据会被新数据覆盖
SaveMode.Ignore
“ignore”
如果数据已经存在,sava
操作将不会保存DataFrame中的任何数据,也不会改变现有的数据

保存到持久表

通过saveAsTabel
命令,DataFrame
能以持久表的方式保存到Hive中。使用这个功能不需要现有的Hive。Spark会创建本地Hive存储。与CreateOrReplaceTempView
不通过,saveAsTable
会物化DataFrame并创建一个指向Hive存储数据的指针。持久表在Spark重启之后也会存在。也可以使用SparkSession.table()
方法将表转成DataFrame

对于基于文件的数据源来说(如text
parquet
json
等等)。通过path选项来指定表路径df.write.option("path", "/{path}).saveAsTable("t")"
。当表被销毁,表路径不会被移出而表中的数据依旧存在。如果没有指定表路径,Spark将会向仓库目录下的默认表路径中写数据。当这里的表被销毁时,默认表路径也会被删除。

从Spark 2.1开始,持久数据源的表Hive存储单元中存储了每个分区的元数据,这带来了下面几个好处

  • 元数据能够只返回查询的必要分区
  • Datasource API可以使用Hive DDL(像ALTER TABLE PARTITION ... SET LOCATION
    )来创建表了

注意:在创建外部数据源表(使用了path
选项)的时候,默认情况下不会收集分区信息。

为了同步存储单元的分区信息,用户可以调用MSCK REPAIR TABLE

Bucketing,Sorting and Partitioning

对于基于文件的数据源,可以对输出进行bucket
sort
partition
。Bucketing和Sorting可以被持久表使用

scala> peopleDF.write.bucketBy(42"name").sortBy("age").saveAsTable("people_bucketed")

复制

使用DataSet API的时候,partitioning
能与save
saveAsTable
一起使用。

scala> usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

复制
image-20210426203225756

对于单个表来说,也可以同时使用partitioning
bucketing

scala> usersDF.write.partitionBy("favorite_color").bucketBy(42"name").saveAsTable("users_partitioned_bucketed")

复制

partitionBy
创建了上面那种的目录结构。因此,它对具有高基数(重复值比较少)的列的适用性有限,相反,bucketBy
可以将数据分发到固定数量的bucket中,能在唯一键的数量没有约束的情况下使用。


文章转载自别动我的月亮啊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论