第四章 結(jié)構(gòu)化API概述
結(jié)構(gòu)化API主要指三種核心分布式集合類型API:Dataset、DataFrame、SQL表和視圖
DataFrame和Dataset類型
- DataFrame和Dataset是具有行和列的類似于數(shù)據(jù)表的集合類型
- Spark中的DataFrame和Dataset代表不可變的數(shù)據(jù)集合
Schema
定義了DataFrame的列名和類型
兩者比較
- 非類型化的DataFrame和類型化的Dataset,Spark負責(zé)維護DataFrame的類型(Schema),Dataset在編譯時就會檢查類型是否符合規(guī)范
- 在Scala版本Spark中,DataFrame就是Row類型的Dataset的集合
- Row類型是Spark用于支持內(nèi)存計算而優(yōu)化的數(shù)據(jù)格式,避免JVM類型的垃圾回收開銷和對象實例化開銷
結(jié)構(gòu)化API執(zhí)行概述
- 編寫代碼
- Spark生成邏輯計劃
- Spark將邏輯計劃轉(zhuǎn)換成物理計劃
- 在集群上執(zhí)行物理計劃

邏輯計劃

- 最開始是尚未被解析的邏輯計劃
- 加入了元數(shù)據(jù)Catalog,計劃變得可理解了
- 經(jīng)過邏輯優(yōu)化生成優(yōu)化后的邏輯計劃
物理計劃

- 獲取到很多個物理執(zhí)行計劃
- 代價模型會分析得到最優(yōu)的物理執(zhí)行計劃,比如鏈接操作、物理表的大小
- 交給集群執(zhí)行 運行在底層編程接口RDD上
第五章 基本的結(jié)構(gòu)化操作
主要是針對DataFrame的操作
模式
schema 文件的結(jié)構(gòu)可以通過schema指定,也可查看schema
val mySchema = StructType(Array(
StructFiled(名稱,類型,是否為空),
StructFiled(名稱,類型,是否為空),
))
val df = spark.read.format("json").schema(mySchema).load("xxx")
列和表達式
列
df.col("count")
col("xxx")
column("xxx")
表達式
- 表達式是對DataFrame中某一個或多個值的一組轉(zhuǎn)換操作
expr("somecol-5")
記錄和行
- DataFrame每一行都是一個記錄,而記錄是Row類型的對象
- Spark使用列表達式操作Row類型對象,Row對象內(nèi)部是字節(jié)數(shù)組
DataFrame轉(zhuǎn)換操作
- 創(chuàng)建DataFrame
df.createOrReplaceTempView
- 操作DataFrame的列 select selectExpr
df.select(col(xxx)) df.selectExpr(xxx)
- 添加列 withColumn
df.withColumn("列名", 表達式)
- 重命名列 withColumnRenames("原名","新名")
- 刪除列 drop
- 過濾行 where filter
df.filter(col("count") < 2)
- 去重 distinct()
df.select("xxx").distinct().count()
- 連接 union
df.union(otherdf)
- 行排序 sort orderBy
df.sort("xxx")
df.orderBy(expr("count desc"))
- 重新分區(qū) repartition
df.repartition(4)
第六章 處理不同類型的數(shù)據(jù)
布爾
val priceFilter = col("unitPrice") > 600
df.withColum("isExpensive",priceFilter) // 創(chuàng)建一個列判斷價格是否大于600
字符串
- 正則匹配
import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
// the | signifies `OR` in regular expression syntax
df.select(
regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
col("Description")).show(2)
空值
df.na.drop()
df.na.fill("填的值", Seq("列名1", "列名2"))
用戶自定義函數(shù)
- 一個例子
val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
- UDF如何運作
- 當(dāng)使用Scala Java的UDF時,JVM中運行,可能導(dǎo)致性能下降(GC)
- 使用Python寫的UDF,Spark在worker上啟動一個Python進程,將所有數(shù)據(jù)序列化為Python可解釋的格式,在Python進程中對該數(shù)據(jù)進行執(zhí)行函數(shù),最終將結(jié)果返回JVM和Spark。這會有兩個問題:計算安規(guī),進入Python進程后,Spark無法管理workjer內(nèi)存

第七章 聚合操作
每個分組操作都會返回RelationGroupedDataset,基于它來進行聚合操作
聚合操作
- count
df.select(count("xxx"))
- countDistinct
df.select(countDistinct("xxx"))
- approx_count_distinct
df.select(approx_count_distinct("xxx",0.1))
- first last
df.select(first("xxx"))
- min max sum sumDistinct avg都是一樣的操作
分組
- groupBy
df.groupBy("xxx")
- 使用表達式分組
df.groupBy("xxxx").agg(count("xxx"))
- 使用map分組
df.groupBy("xxx").agg("colname"->"avg","colname2"->"聚合操作")
window函數(shù)

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
// 每個用戶id是一個分區(qū),內(nèi)部的quantity降序排序
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
自定義聚合函數(shù)UDAF
類似于hive的udaf
- inputSchema指定輸入?yún)?shù)
- bufferSchema指定UDAF中間結(jié)果
- dataType用于指定返回結(jié)果
- deterministic 指定UDAF對于某個輸入是否返回相同結(jié)果
- initialize初始化聚合緩沖區(qū)
- update進行緩沖更新
- merge合并兩個緩沖區(qū)
- evaluate生成聚合最終結(jié)果
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 返回是不是全是ture
class BoolAnd extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", BooleanType) :: Nil)
def bufferSchema: StructType = StructType(
StructField("result", BooleanType) :: Nil
)
def dataType: DataType = BooleanType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = true
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
}
def evaluate(buffer: Row): Any = {
buffer(0)
}
}
第八章 連接操作
基本的鏈接操作
內(nèi)、外、左外、右外、左半(left semi)、左反(left anti)、笛卡爾
- 定義連接的表達式
val joinExpression = person.col("xxx") = otherdf.col("xxx")
- 選一個連接類型
joinType = "outer|left_outer|right_outer|left_sermi|left_anti|cross"
- 執(zhí)行join操作
person.join(otherdf, joinExpression, joinType).show()
常見問題
復(fù)雜類型連接
可以自定返回Boolean的joinExpression來完成復(fù)雜類型連接
import org.apache.spark.sql.functions.expr
person.withColumnRenamed("id", "personId")
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()
重復(fù)列名
- 用不同的連接表達式
- 連接后刪除某一列
- 連接前就重命名
Spark如何進行連接
spark以兩種不同的方式處理集群通信問題,要么執(zhí)行all-to-all通信的shuffle join,要么執(zhí)行廣播join
shuffle join
每個節(jié)點都與其他所有節(jié)點進行通信,對網(wǎng)絡(luò)傳輸有一定要求

broadcast join
當(dāng)表的大小足夠下能夠放入當(dāng)個節(jié)點內(nèi)存還有空余的時候,可以用。
在開始之前會有一次大規(guī)模通信,分發(fā)這個表,通信結(jié)束之后節(jié)點間不再有通信。
需要注意這個會先給driver,driver的空間也有夠,還要注意第一次大規(guī)模通信的timeout

第九章 數(shù)據(jù)源
數(shù)據(jù)源API結(jié)構(gòu)
Read API
spark.read.format("格式")
.option("配置key", "配置value")
.schema(xxx)
.load()
Write API
spark.writer.format("格式").
.option("配置key", "配置value")
//.partuitionBy()
//.sortBy()
//.bucketBy()
.save()
Parquet & ORC
都是壓縮格式
但書里提到Parquet針對Spark進行優(yōu)化 ORC針對hive進行優(yōu)化
寫入SQL數(shù)據(jù)庫
val driver = "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
val dbDataFrame = spark.read.format("jdbc").option("url", url)
.option("dbtable", tablename).option("driver", driver).load()
高級IO概念
- 可分割的文件類型和壓縮
由于分區(qū)讀的都是單個文件,這個文件分不開就影響效率 - 并行讀
可以并行讀取不同的文件 - 并行寫
可以往同一個分區(qū)不同的文件下寫入 - 分桶
具有相同桶ID的數(shù)據(jù)放在一個物理分區(qū),可以避免在讀取數(shù)據(jù)的時候shuffle
val numberBuckets = 10
val columnToBucketBy = "count"
csvFile.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
- 管理文件大小
由于每個task都是處理單個文件,文件太小起的task就太多
maxRecordPerFile來制定每個文件的最大記錄數(shù)
第十章 SparkSQL
Spark和Hive的關(guān)系
Spark SQL可以與Hive metastores鏈接。 Hive metastore維護了Hive跨回話數(shù)據(jù)表的信息
如何運行Spark SQL查詢
Spark可編程SQL接口
spark.sql("select 1+1").show()
這會返回一個DataFrame
Catalog
Spark SQL中最高級別的抽象是Catalog,用于存儲用戶數(shù)據(jù)中的元數(shù)據(jù)以及數(shù)據(jù)庫、數(shù)據(jù)表、函數(shù)、視圖等有用的東西
托管表和非托管表
- 非托管表:定義磁盤上若干文件作為數(shù)據(jù)表
- 托管表:DataFrame上使用saceAsTable創(chuàng)建一個數(shù)據(jù)表,這個函數(shù)會把表寫入一個新的位置
第十一章 Dataset
- Dataset是結(jié)構(gòu)化API的基本類型
- Dataset具有嚴格的Java虛擬機語言特性,僅與Scala和Java一起使用
- 可以在Scala指定case類,Java創(chuàng)建JavaBean然后通過Spark以分布式的方式操作此非Row類型的對象
何時使用Dataset
- 無法使用DataFrame操作表示
- 需要類型安全,并愿意犧牲一定性能
創(chuàng)建Dataset
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flightsDF = spark.read.parquet("F:/spark-3.0.1-bin-hadoop2.7/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
- 動作、轉(zhuǎn)化操作都和DataFrame上類似,可以自定義一些函數(shù)進行轉(zhuǎn)化操作
- 連接提供了joinWith方法,類似co-group(可以對多個RDD同key的進行連結(jié))
- 分組和聚合最終返回的是DataFrame,會丟失類型信息。如果groupByKey可以返回Dataset,這個方法接收的是一個函數(shù)而非特定列名