《Spark: The Definitive Guide 》第7章:Aggregations 聚合 中文學(xué)習(xí)筆記

注意:
如需執(zhí)行如下代碼,請(qǐng)從官方github下載數(shù)據(jù)包 , 安裝所需spark環(huán)境
執(zhí)行如下創(chuàng)建Dataframe數(shù)據(jù)集代碼創(chuàng)建好所需的Dataframe才能用接下來(lái)的代碼對(duì)數(shù)據(jù)進(jìn)行操作。
為了美觀下面的例如.option() .load()為換行展示,真正輸入代碼時(shí)要在一行輸入。

所需創(chuàng)建的DataFrame數(shù)據(jù)集(第四行數(shù)據(jù)集路徑根據(jù)自己下載到本地的數(shù)據(jù)集地址進(jìn)行修改):

// Scala
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/all/*.csv")
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

7.3 Window Functions 窗口函數(shù)

在Spark里面每一行數(shù)據(jù)就是一個(gè)row,窗口函數(shù)是指定所需行(row)的數(shù)據(jù)組成一個(gè)數(shù)據(jù)集進(jìn)行計(jì)算,窗口可以進(jìn)行排名,分析,聚合操作。


窗口示例

下面對(duì)數(shù)據(jù)集進(jìn)行處理,組成新的dfWithDate, 每行添加日期以便更好的直觀的展現(xiàn)操作。

// Scala
import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
原始DF和添加日期DF比較

第一步

就是利用窗口函數(shù)創(chuàng)建一個(gè)窗口,partitionBy是根據(jù)客戶的id和購(gòu)買日期進(jìn)行分組,rowsBetween代表了這個(gè)窗口里面包含了哪幾行(實(shí)例中是從"前面所有行"->"當(dāng)前行")

// Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

第二步

下面根據(jù)這個(gè)窗口計(jì)算并返回所需列

  • 求出每個(gè)用戶購(gòu)買最多的股票Stock數(shù)目
import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
  • 求出該顧客購(gòu)買不同股票Stock數(shù)量的排名,dense rank表明按照排名往下排序不和數(shù)量綁定排名例如有1個(gè)第一名和2個(gè)第二名那么接下來(lái)的第三名則是從正常的第三名開(kāi)始計(jì)數(shù),而rank 則是從第四名開(kāi)始計(jì)數(shù)。
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)

第三步

這時(shí)候maxPurchaseQuantity, purchaseDenseRank, purchaseRank將會(huì)返回三個(gè)列,下面我們將這三個(gè)列和指定的dataframe的列一同返回得出所需結(jié)論:

// Scala
import org.apache.spark.sql.functions.col

dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

結(jié)果如下:


image.png

7.4 Grouping Sets 分組集合

group-by可以使我們對(duì)組內(nèi)進(jìn)行聚合操作,但是有些時(shí)候我們需要跨組進(jìn)行操作,這個(gè)時(shí)候我們就需要用到分組集合。

首先我們對(duì)上面已經(jīng)處理好的dfWithDate 這個(gè)Dataframe進(jìn)行去空.drop()操作:

// in Scala
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

注意: Grouping Sets對(duì)空值有效,所以需要去除空值保證結(jié)果不受影響。

下面通過(guò)SQL來(lái)得出所有的股票代碼和持有該股票的用戶以及每個(gè)用戶持有該股票的總數(shù)目:

-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC

如果我們想求整個(gè)股票的數(shù)量而不是根據(jù)客戶和股票分組則通過(guò)下面這個(gè)SQL語(yǔ)句:

-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC

然而分組集合僅僅在SQL里面提供,spark則使用 rullup 和 cube.

Rollups

目前討論的都是顯式數(shù)據(jù)集,及提供確定的數(shù)據(jù)集給Spark進(jìn)行操作,下面將根據(jù)日期+不同國(guó)家股票創(chuàng)建rollups求出每個(gè)日期下面不同國(guó)家股票的購(gòu)買總數(shù):
Rollup可以返回的結(jié)果:

  • 單個(gè)日期不同國(guó)家股票購(gòu)買總數(shù)
  • 單個(gè)日期所有國(guó)家股票購(gòu)買總數(shù)
  • 所有日期所有國(guó)家股票購(gòu)買總數(shù)
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
  .orderBy("Date")
rolledUpDF.show()

結(jié)果如下:


image.png

可以看到上圖結(jié)果兩個(gè)都為空的那個(gè)row為股票總數(shù),可通過(guò)如下語(yǔ)句取出:

rolledUpDF.where("Country IS NULL").show()
rolledUpDF.where("Date IS NULL").show()

Cube

Cube相比Rollups更為深層次的返回結(jié)果:

  • 可以返回所有不論國(guó)家和日期的股票總額
  • 可以返回單個(gè)日期的所有國(guó)家的股票總額
  • 可以返回單個(gè)國(guó)家的每個(gè)日期的股票總額
  • 可以返回單個(gè)國(guó)家的不同日期的股票總額

和Rollup調(diào)用的方法差不多:

// Scala
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
  .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

結(jié)果如圖:

image.png

image.png

Grouping Metadata

這個(gè)的意思就是希望顯示的分組級(jí)別及從最細(xì)的到最粗的分組,而不是像cube那樣一股腦的全部顯示:

  • 級(jí)別0 可以返回所有股票總額
  • 級(jí)別1 可以返回單個(gè)客戶所有購(gòu)買的股票總額
  • 級(jí)別2 可以返回單個(gè)股票所有用戶購(gòu)買的總額
  • 級(jí)別3 可以返回單個(gè)客戶單個(gè)股票編碼的股票總額(最高級(jí)別)

由grouping_id來(lái)控制

// Scala
import org.apache.spark.sql.functions.{grouping_id, sum, expr}

dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show()

結(jié)果和上面的cube的結(jié)果差不多,只不過(guò)根據(jù)分組級(jí)別進(jìn)行了分組.

Pivot

可以將行轉(zhuǎn)換為列進(jìn)行操作,如下示例我們可以將給定的國(guó)家按照日期來(lái)計(jì)算股票的數(shù)額:

// Scala
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

由于數(shù)目龐大僅展示美國(guó)的日期大于2011-12-05的數(shù)額.

// Scala
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
image.png

7.5 User-Defined Aggregation Functions 用戶自定義聚合函數(shù)(UDAF)

注意:
目前UDAF只能在Spark2.3以上版本及Java和Scala上實(shí)現(xiàn).

可以根據(jù)業(yè)務(wù)邏輯自己制定分組聚合,然而必須繼承基類:

// Scala
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", BooleanType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("result", BooleanType) :: Nil
  )
  def dataType: DataType = BooleanType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = true
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

現(xiàn)在我們只需注冊(cè)進(jìn)udf中便可使用.

// Scala
val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
  .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
  .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
  .select(ba(col("t")), expr("booland(f)"))
  .show()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容