0301 Getting Started

轉(zhuǎn)載請(qǐng)注明出處,謝謝合作~

該篇中的示例暫時(shí)只有 Scala 版本~

上手 Spark SQL

入口:SparkSession

Spark 應(yīng)用程序的編程入口是 SparkSession 類,可以通過 SparkSession.builder() 創(chuàng)建一個(gè)基礎(chǔ)的 SparkSession

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

Spark 2.0 中的 SparkSession 內(nèi)置了對(duì) Hive 的支持,包括使用 HiveSQL 編寫查詢語句,使用 Hive UDF,以及從 Hive 表中讀取數(shù)據(jù)。這些功能需要首先安裝好 Hive。

創(chuàng)建 DataFrame

應(yīng)用程序可以使用 SparkSession 通過一個(gè)現(xiàn)有的 RDD(existing RDD),通過 Hive 表,或者通過 Spark 數(shù)據(jù)源(Spark data sources)創(chuàng)建 DataFrame。

下面的示例通過一個(gè) JSON 文件創(chuàng)建 DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

類型無關(guān)的 DataFrame 算子

DataFrame 針對(duì)操作結(jié)構(gòu)化的數(shù)據(jù)提供了特定的算子(Scala, Java, PythonR)。

上文提到,對(duì)于 Spark 2.0 中的 Scala 和 Java API, DataFrame 只是 Row 類型的 Dataset。相較于強(qiáng)類型相關(guān)的 Dataset,這些算子是類型無關(guān)的。

這里我們展示一些使用 Dataset 進(jìn)行 結(jié)構(gòu)化數(shù)據(jù)處理的基礎(chǔ)樣例:

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

此類操作 Dataset 的算子的完整列表詳見 API Documentation。

除了簡(jiǎn)單的列引用和表達(dá)式,Dataset 還有一個(gè)強(qiáng)大的函數(shù)庫(kù),包括操作字符串,日期計(jì)算,常見的數(shù)據(jù)計(jì)算等等。完整的函數(shù)列表參見 DataFrame Function Reference

編程中使用 SQL 查詢

SparkSessionsql 方法可以讓應(yīng)用程序通過編程使用 SQL 查詢,返回值一個(gè) DataFrame。

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

全局臨時(shí)視圖

Spark SQL 中的臨時(shí)視圖在當(dāng)前 SparkSession 存在范圍內(nèi)有效,一旦 SparkSession 結(jié)束,臨時(shí)視圖就消失了。如果需要在不同的應(yīng)用程序之間共享臨時(shí)視圖,即使 SparkSession 結(jié)束依舊存在,可以使用全局臨時(shí)視圖。全局臨時(shí)視圖與一個(gè)系統(tǒng)保留數(shù)據(jù)庫(kù) global_temp 綁定,必須使用全限定名稱來使用,比如 SELECT * FROM global_temp.view1

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

創(chuàng)建 Dataset

Dataset 跟 RDD 類似,但是不像 RDD 那樣使用 Java 或者 Kryo 序列化器,在計(jì)算以及網(wǎng)絡(luò)傳輸過程中 Dataset 使用一個(gè)特定的 Encoder 來序列化對(duì)象。盡管 encoder 和標(biāo)準(zhǔn)的序列化器都是用來將一個(gè)對(duì)象轉(zhuǎn)換為字節(jié),encoder 采用的是動(dòng)態(tài)代碼生成的,并且采用了一種特殊的格式,Spark 可以對(duì)這種格式進(jìn)行像過濾、排序和哈希運(yùn)行而不用將其反序列化為對(duì)象。

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

與 RDD 交互

Spark SQL 支持兩種不同的方式把一個(gè)現(xiàn)有的 RDD 轉(zhuǎn)換為 Dataset。第一種方式是通過反射推斷一個(gè)定義了類型的 RDD 的表結(jié)構(gòu)。這種基于反射的方式可以使代碼簡(jiǎn)潔,在已知表結(jié)構(gòu)的場(chǎng)景下工作良好。

第二種方式是通過編程的方式構(gòu)建一個(gè)表結(jié)構(gòu)對(duì)象,并把它賦予一個(gè)現(xiàn)有的 RDD。盡管這種方式相對(duì)復(fù)雜,但是能夠在無法得知運(yùn)行時(shí)類型的情況下創(chuàng)建 Dataset。

通過反射推斷表結(jié)構(gòu)

Spark SQL 的 Scala 接口自動(dòng)支持將一個(gè)樣本類類型的 RDD 轉(zhuǎn)換為一個(gè) DataFrame。樣本類定義了表結(jié)構(gòu),通過反射獲取類中的字段名稱并將其應(yīng)用為列名。樣本類可以嵌套,還可以包含像 SeqArray 這樣的復(fù)雜類型。該 RDD 會(huì)被隱式轉(zhuǎn)換為一個(gè) DataFrame,之后可以注冊(cè)成一張表,該表可以通過 SQL 進(jìn)行查詢。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」

編程指定表結(jié)構(gòu)

如果樣本類無法被實(shí)現(xiàn)創(chuàng)建(例如,一行數(shù)據(jù)以字符串的格式編碼,或者是需要被解析的文本類型的數(shù)據(jù)集,以及對(duì)于不同用戶來說需要抽取的字段不同),可以分三步編程創(chuàng)建一個(gè) DataFrame 。

  1. 通過一個(gè) RDD 創(chuàng)建一個(gè) Row 類型的 RDD;
  2. 創(chuàng)建一個(gè) StructType 類型的表結(jié)構(gòu)對(duì)象,需要與第 1 步 Row 中的數(shù)據(jù)相對(duì)應(yīng);
  3. 通過 SparkSession 提供的 createDataFrame 方法將第 1 步生成的 RDD 和第 2 步生成的表結(jié)構(gòu)結(jié)合起來。

例如:

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

標(biāo)量函數(shù)

標(biāo)量函數(shù)通過一行數(shù)據(jù)只返回一個(gè)單值,而不是像聚合函數(shù)那樣接收多行數(shù)據(jù)返回一個(gè)單值。Spark SQL 支持許多內(nèi)置標(biāo)量函數(shù)(Built-in Scalar Functions),同事也支持自定義標(biāo)量函數(shù)(User Defined Scalar Functions)。

聚合函數(shù)

聚合函數(shù)接收多行數(shù)據(jù)返回一個(gè)單值。內(nèi)置的聚合函數(shù)(Built-in Aggregation Functions)提供了常見的聚合函數(shù),比如 count(), countDistinct(), avg(), max(), min() 等等。用戶不用受預(yù)定義聚合函數(shù)的限制,可以定義自己的聚合函數(shù),詳情參見 User Defined Aggregate Functions

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容