Spark權(quán)威指南讀書筆記(二):結(jié)構(gòu)化API

第四章 結(jié)構(gòu)化API概述

結(jié)構(gòu)化API主要指三種核心分布式集合類型API:Dataset、DataFrame、SQL表和視圖

DataFrame和Dataset類型

  • DataFrame和Dataset是具有行和列的類似于數(shù)據(jù)表的集合類型
  • Spark中的DataFrame和Dataset代表不可變的數(shù)據(jù)集合
Schema

定義了DataFrame的列名和類型

兩者比較
  • 非類型化的DataFrame和類型化的Dataset,Spark負責(zé)維護DataFrame的類型(Schema),Dataset在編譯時就會檢查類型是否符合規(guī)范
  • 在Scala版本Spark中,DataFrame就是Row類型的Dataset的集合
  • Row類型是Spark用于支持內(nèi)存計算而優(yōu)化的數(shù)據(jù)格式,避免JVM類型的垃圾回收開銷和對象實例化開銷

結(jié)構(gòu)化API執(zhí)行概述

  • 編寫代碼
  • Spark生成邏輯計劃
  • Spark將邏輯計劃轉(zhuǎn)換成物理計劃
  • 在集群上執(zhí)行物理計劃
Catalyst優(yōu)化器
邏輯計劃
邏輯計劃
  • 最開始是尚未被解析的邏輯計劃
  • 加入了元數(shù)據(jù)Catalog,計劃變得可理解了
  • 經(jīng)過邏輯優(yōu)化生成優(yōu)化后的邏輯計劃
物理計劃
物理計劃
  • 獲取到很多個物理執(zhí)行計劃
  • 代價模型會分析得到最優(yōu)的物理執(zhí)行計劃,比如鏈接操作、物理表的大小
  • 交給集群執(zhí)行 運行在底層編程接口RDD上

第五章 基本的結(jié)構(gòu)化操作

主要是針對DataFrame的操作

模式

schema 文件的結(jié)構(gòu)可以通過schema指定,也可查看schema

val mySchema = StructType(Array(
StructFiled(名稱,類型,是否為空),
StructFiled(名稱,類型,是否為空),
))
 val df = spark.read.format("json").schema(mySchema).load("xxx")

列和表達式

df.col("count")
col("xxx")
column("xxx")

表達式
  • 表達式是對DataFrame中某一個或多個值的一組轉(zhuǎn)換操作
    expr("somecol-5")

記錄和行

  • DataFrame每一行都是一個記錄,而記錄是Row類型的對象
  • Spark使用列表達式操作Row類型對象,Row對象內(nèi)部是字節(jié)數(shù)組

DataFrame轉(zhuǎn)換操作

  • 創(chuàng)建DataFrame

df.createOrReplaceTempView

  • 操作DataFrame的列 select selectExpr

df.select(col(xxx)) df.selectExpr(xxx)

  • 添加列 withColumn

df.withColumn("列名", 表達式)

  • 重命名列 withColumnRenames("原名","新名")
  • 刪除列 drop
  • 過濾行 where filter

df.filter(col("count") < 2)

  • 去重 distinct()

df.select("xxx").distinct().count()

  • 連接 union

df.union(otherdf)

  • 行排序 sort orderBy

df.sort("xxx")
df.orderBy(expr("count desc"))

  • 重新分區(qū) repartition

df.repartition(4)

第六章 處理不同類型的數(shù)據(jù)

布爾

val priceFilter = col("unitPrice") > 600
df.withColum("isExpensive",priceFilter) // 創(chuàng)建一個列判斷價格是否大于600

字符串

  • 正則匹配
import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
// the | signifies `OR` in regular expression syntax
df.select(
  regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
  col("Description")).show(2)

空值

df.na.drop()
df.na.fill("填的值", Seq("列名1", "列名2"))

用戶自定義函數(shù)

  • 一個例子
val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
  • UDF如何運作
  1. 當(dāng)使用Scala Java的UDF時,JVM中運行,可能導(dǎo)致性能下降(GC)
  2. 使用Python寫的UDF,Spark在worker上啟動一個Python進程,將所有數(shù)據(jù)序列化為Python可解釋的格式,在Python進程中對該數(shù)據(jù)進行執(zhí)行函數(shù),最終將結(jié)果返回JVM和Spark。這會有兩個問題:計算安規(guī),進入Python進程后,Spark無法管理workjer內(nèi)存
UDF

第七章 聚合操作

每個分組操作都會返回RelationGroupedDataset,基于它來進行聚合操作

聚合操作

  • count

df.select(count("xxx"))

  • countDistinct

df.select(countDistinct("xxx"))

  • approx_count_distinct

df.select(approx_count_distinct("xxx",0.1))

  • first last

df.select(first("xxx"))

  • min max sum sumDistinct avg都是一樣的操作

分組

  • groupBy

df.groupBy("xxx")

  • 使用表達式分組

df.groupBy("xxxx").agg(count("xxx"))

  • 使用map分組

df.groupBy("xxx").agg("colname"->"avg","colname2"->"聚合操作")

window函數(shù)

窗口函數(shù)原理
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
// 每個用戶id是一個分區(qū),內(nèi)部的quantity降序排序
val windowSpec = Window
  .partitionBy("CustomerId", "date")
  .orderBy(col("Quantity").desc)
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

自定義聚合函數(shù)UDAF

類似于hive的udaf

  • inputSchema指定輸入?yún)?shù)
  • bufferSchema指定UDAF中間結(jié)果
  • dataType用于指定返回結(jié)果
  • deterministic 指定UDAF對于某個輸入是否返回相同結(jié)果
  • initialize初始化聚合緩沖區(qū)
  • update進行緩沖更新
  • merge合并兩個緩沖區(qū)
  • evaluate生成聚合最終結(jié)果
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 返回是不是全是ture
class BoolAnd extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", BooleanType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("result", BooleanType) :: Nil
  )
  def dataType: DataType = BooleanType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = true
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

第八章 連接操作

基本的鏈接操作

內(nèi)、外、左外、右外、左半(left semi)、左反(left anti)、笛卡爾

  • 定義連接的表達式

val joinExpression = person.col("xxx") = otherdf.col("xxx")

  • 選一個連接類型

joinType = "outer|left_outer|right_outer|left_sermi|left_anti|cross"

  • 執(zhí)行join操作

person.join(otherdf, joinExpression, joinType).show()

常見問題

復(fù)雜類型連接

可以自定返回Boolean的joinExpression來完成復(fù)雜類型連接

import org.apache.spark.sql.functions.expr
person.withColumnRenamed("id", "personId")
  .join(sparkStatus, expr("array_contains(spark_status, id)")).show()
重復(fù)列名
  • 用不同的連接表達式
  • 連接后刪除某一列
  • 連接前就重命名

Spark如何進行連接

spark以兩種不同的方式處理集群通信問題,要么執(zhí)行all-to-all通信的shuffle join,要么執(zhí)行廣播join

shuffle join

每個節(jié)點都與其他所有節(jié)點進行通信,對網(wǎng)絡(luò)傳輸有一定要求


shuffle join
broadcast join

當(dāng)表的大小足夠下能夠放入當(dāng)個節(jié)點內(nèi)存還有空余的時候,可以用。
在開始之前會有一次大規(guī)模通信,分發(fā)這個表,通信結(jié)束之后節(jié)點間不再有通信。
需要注意這個會先給driver,driver的空間也有夠,還要注意第一次大規(guī)模通信的timeout


broadcast join

第九章 數(shù)據(jù)源

數(shù)據(jù)源API結(jié)構(gòu)

Read API
spark.read.format("格式")
.option("配置key", "配置value")
.schema(xxx)
.load()
Write API
spark.writer.format("格式").
.option("配置key", "配置value")
//.partuitionBy()
//.sortBy()
//.bucketBy()
.save()

Parquet & ORC

都是壓縮格式
但書里提到Parquet針對Spark進行優(yōu)化 ORC針對hive進行優(yōu)化

寫入SQL數(shù)據(jù)庫

val driver =  "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
val dbDataFrame = spark.read.format("jdbc").option("url", url)
  .option("dbtable", tablename).option("driver",  driver).load()

高級IO概念

  • 可分割的文件類型和壓縮
    由于分區(qū)讀的都是單個文件,這個文件分不開就影響效率
  • 并行讀
    可以并行讀取不同的文件
  • 并行寫
    可以往同一個分區(qū)不同的文件下寫入
  • 分桶
    具有相同桶ID的數(shù)據(jù)放在一個物理分區(qū),可以避免在讀取數(shù)據(jù)的時候shuffle
val numberBuckets = 10
val columnToBucketBy = "count"

csvFile.write.format("parquet").mode("overwrite")
  .bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
  • 管理文件大小
    由于每個task都是處理單個文件,文件太小起的task就太多
    maxRecordPerFile來制定每個文件的最大記錄數(shù)

第十章 SparkSQL

Spark和Hive的關(guān)系

Spark SQL可以與Hive metastores鏈接。 Hive metastore維護了Hive跨回話數(shù)據(jù)表的信息

如何運行Spark SQL查詢

Spark可編程SQL接口

spark.sql("select 1+1").show()
這會返回一個DataFrame

Catalog

Spark SQL中最高級別的抽象是Catalog,用于存儲用戶數(shù)據(jù)中的元數(shù)據(jù)以及數(shù)據(jù)庫、數(shù)據(jù)表、函數(shù)、視圖等有用的東西

托管表和非托管表
  • 非托管表:定義磁盤上若干文件作為數(shù)據(jù)表
  • 托管表:DataFrame上使用saceAsTable創(chuàng)建一個數(shù)據(jù)表,這個函數(shù)會把表寫入一個新的位置

第十一章 Dataset

  • Dataset是結(jié)構(gòu)化API的基本類型
  • Dataset具有嚴格的Java虛擬機語言特性,僅與Scala和Java一起使用
  • 可以在Scala指定case類,Java創(chuàng)建JavaBean然后通過Spark以分布式的方式操作此非Row類型的對象

何時使用Dataset

  • 無法使用DataFrame操作表示
  • 需要類型安全,并愿意犧牲一定性能

創(chuàng)建Dataset

case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String, count: BigInt)

val flightsDF = spark.read.parquet("F:/spark-3.0.1-bin-hadoop2.7/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
  • 動作、轉(zhuǎn)化操作都和DataFrame上類似,可以自定義一些函數(shù)進行轉(zhuǎn)化操作
  • 連接提供了joinWith方法,類似co-group(可以對多個RDD同key的進行連結(jié))
  • 分組和聚合最終返回的是DataFrame,會丟失類型信息。如果groupByKey可以返回Dataset,這個方法接收的是一個函數(shù)而非特定列名
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容