暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark(8) Spark数据源——其他常用数据源

别动我的月亮啊 2021-04-28
1232

除了Parquet以外,Spark还支持操作多种数据源,如ORC、JSON、JDBC等等。


ORC文件

从Spark 2.3开始,Spark SQL 支持矢量化读取ORC数据。并新添了几种配置。当spark.sql.orc.implis
设置为native且spark.sql.enableVectorizedReader
设置为true的时候,矢量化读取器可以用来读取本地ORC table。对于Hive ORC serde table,矢量化读取器可以在spark.sql.hive.convertMetastoreOrc
设置为true的时候使用。

Property NameDefaultMeaningSince Version
spark.sql.orc.impl
native
The name of ORC implementation. It can be one of native
and hive
. native
means the native ORC support. hive
means the ORC library in Hive.
2.3.0
spark.sql.orc.<br />enableVectorizedReader
true
Enables vectorized orc decoding in native
implementation. If false
, a new non-vectorized ORC reader is used in native
implementation. For hive
implementation, this is ignored.
2.3.0

JSON Files

Spark SQL可以自动推断JSON数据集的schema,并且将其加载为dataset[set]
。这种转化可以使用Dataset[String]或JSON文件上的SparkSession.read.json()

注意以json文件形式提供的文件不是典型的JSON文件。它每行必须要包含一个单独的、有效的JSON对象。想要获得更多信息,参阅 JSON Lines text format, also called newline-delimited JSON.

{"name":"Michael"}
{"name":"Andy""age":30}
{"name":"Justin""age":19}

复制

对于常规的多行JSON文件,请将multiLine
选项设置为true

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

复制

Hive Table

Spark SQL支持从Apache Hive中读取或者写入数据。不过,由于Hive有很多依赖项,这些依赖项并没有包含在Spark的发行版中。如果Hive能在类路径下找到依赖,Spark就能自动获取。要注意的是,在所有的worker节点中,这些依赖也必须存在,因为如果想将数据存入Hive,必须要使用Hive序列化和反序列化库(SerDes)。

hive-site.xml
core-site.xml
(安全配置)和hdfs-site.xml
(HDFS配置)放在conf/
下,可以完成Hive的配置。

使用Hive时,必须在Hive支持下实例化SparkSession,包括Hive metastore的持久化连接,Hive serdes的支持以及Hive用户定义函数。没有现有Hive部署的用户仍然可以启用Hive支持。当没有通过hive-site.xml设置时,上下文会在当前目录中自动创建metastore_db
,并创建一个由spark.sql.warehouse.dir
参数配置的目录——这个目录默认是spark-warehouse
,在Spark应用启动时所在的目录下。

注意hive-site.xml
文件中的hive.metastore.warehouse.dir
属性从Spark 2.0.0以后被删除。而是由spark.sql.warehouse.dir
代替。Spark 2.0.0以前的用户使用的是前一个参数

import java.io.File

import org.apache.spark.sql.{RowSaveModeSparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// |  0|
// |  1|
// |  2|
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition""true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode""nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()

复制

指定Hive table的存储格式

当用户创建一个Hive表的时候,需要指定这个表读写数据使用的文件系统(input format
output format
),同时也需要确定这个表如何进行序列化和反序列化。下面几个选项就是用户指定这些格式(比如CREATE TABLE src(id int) USING hive OPTION(fileFormat 'parquet')
)。默认情况下,我们将会以普通文本的形式读取数据。

注意:创建表的时候,不支持配置Hive store hanler,用户可以使用storege handler
在Hive端创建表,并使用Spark SQL读取它

Property NameMeaning
fileFormat
fileFormat是一种存储格式规范,其中内容包括serde
input format
output format
。目前支持六种数据格式:sequencefile
rcfile
orc
parquet
textfile
avro
inputFormat, outputFormat
These 2 options specify the name of a corresponding InputFormat
and OutputFormat
class as a string literal, e.g. org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
. These 2 options must be appeared in a pair, and you can not specify them if you already specified the fileFormat
option.
serde
This option specifies the name of a serde class. When the fileFormat
option is specified, do not specify this option if the given fileFormat
already include the information of serde. Currently "sequencefile", "textfile" and "rcfile" don't include the serde information and you can use this option with these 3 fileFormats.
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim
These options can only be used with "textfile" fileFormat. They define how to read d

其他数据库的JDBC

SparkSQL可以使用JDBC来从其他的数据库中获取数据。此功能优先使用JdbcRDD(以DataFrame的形式),这是为了能够更加轻松地使用SparkSQL进行操作。

其他还有很多种数据源,操作也都大同小异,具体去看官方文档即可 。篇幅问题本文不再赘述


文章转载自别动我的月亮啊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论