Spark實(shí)戰(zhàn)第二版(涵蓋Spark3.0)-第15章. 聚合數(shù)據(jù)

15.3 使用UDAFs構(gòu)建自定義聚合

在前面的小節(jié)中,您快速回顧了聚合數(shù)據(jù),在簡(jiǎn)單數(shù)據(jù)集上執(zhí)行了聚合操作,并最終處理了真實(shí)的數(shù)據(jù)。在這些操作中,使用了包括max()、avg()和min()在內(nèi)的標(biāo)準(zhǔn)聚合操作。Spark并沒(méi)有實(shí)現(xiàn)所有可能的數(shù)據(jù)聚合。

在本節(jié)中,您將通過(guò)構(gòu)建自己的聚合函數(shù)來(lái)擴(kuò)展Spark。用戶(hù)定義的聚合函數(shù)(UDAFs),可以執(zhí)行自定義聚合。

想象一下下面的用例:您是一個(gè)在線零售商,想要給客戶(hù)忠誠(chéng)度積分。每位顧客每訂購(gòu)一件商品可得一分,但每次訂購(gòu)最多可得三分。

解決這個(gè)問(wèn)題的一種方法是在您的order dataframe中添加一個(gè)point列并匹配點(diǎn)歸屬規(guī)則,但是您將使用一個(gè)聚合函數(shù)來(lái)解決這個(gè)問(wèn)題(您可以自己輕松地使用point列來(lái)解決這個(gè)問(wèn)題)。

圖15.6顯示了將要使用的數(shù)據(jù)集。它類(lèi)似于本章第一節(jié)15.1節(jié)中所使用的方法。

圖15.6 應(yīng)用自定義UDAF來(lái)計(jì)算每個(gè)客戶(hù)每個(gè)訂單獲得多少忠誠(chéng)點(diǎn)

操作的結(jié)果是一個(gè)客戶(hù)及其關(guān)聯(lián)點(diǎn)的列表,如下面的清單所示。

#清單15.23客戶(hù)及其關(guān)聯(lián)積分

+------------+--------+-----+-------------+-----+

| firstName|lastName|state|sum(quantity)|point|

+------------+--------+-----+-------------+-----+|?

Ginni? ? ? | Rometty| NY? | 7? ? ? ? ? | 3? |

|Jean-Georges| Perrin | NC? | 3? ? ? ? ? | 3? |

| Holden? ? | Karau? | CA? | 10? ? ? ? ? | 6? |

|Jean-Georges| Perrin | CA? | 4? ? ? ? ? | 3? |?

+------------+--------+-----+-------------+-----+

實(shí)驗(yàn)

這個(gè)實(shí)驗(yàn)的代碼在net.jgp.books.spark.ch13.lab400_udaf包。該應(yīng)用程序稱(chēng)為PointsPerOrderApp.java,UDAF代碼在PointAttributionUdaf.java中

調(diào)用UDAF并不比調(diào)用任何聚合函數(shù)復(fù)雜。有幾個(gè)步驟:

在Spark會(huì)話(huà)中使用udf().register()方法注冊(cè)這個(gè)函數(shù)。

使用callUDF()函數(shù)調(diào)用該函數(shù)。

下面的清單顯示了調(diào)用UDAF的過(guò)程。

//清單15.24注冊(cè)和調(diào)用UDAF

package net.jgp.books.spark.ch13.lab400_udaf;

import static org.apache.spark.sql.functions.callUDF;

import static org.apache.spark.sql.functions.col;

import static org.apache.spark.sql.functions.sum;

import static org.apache.spark.sql.functions.when;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class PointsPerOrderApp {

? ? public static void main(String[] args ) {

? ? ? PointsPerOrderApp app = new PointsPerOrderApp();

? ? ? app .start();

? ? }

? ? private void start() {

? ? ? SparkSession spark = SparkSession.builder()

? ? ? ? ? ? .appName( "Orders loyalty point" )

? ? ? ? ? ? .master( "local[*]" )

? ? ? ? ? ? .getOrCreate();

? ? ? spark

.udf().register( "pointAttribution" , new PointAttributionUdaf());

? ? ? Dataset<Row> df = spark .read().format( "csv" )

? ? ? ? ? ? .option( "header" , true )

? ? ? ? ? ? .option( "inferSchema" , true )

? ? ? ? ? ? .load( "data/orders/orders.csv" );

? ? ? Dataset<Row> pointDf = df

col( "firstName" ), col( "lastName" ), col( "state" ))

? ? ? ? ? ? .agg(

sum( "quantity" ),

callUDF( "pointAttribution" , col( "quantity" )).as( "point" ));

? ? ? pointDf .show(20);

? ? }

}

調(diào)用UDAF就像這樣簡(jiǎn)單:

callUDF(?"pointAttribution"?,?col(?"quantity"?))

在這種情況下,您的UDAF只接受一個(gè)參數(shù),但如果有必要,函數(shù)可以接受多個(gè)參數(shù)。如果UDAF需要更多的參數(shù),只需添加這些參數(shù):將它們添加到您的調(diào)用和輸入模式中(參見(jiàn)清單15.24)。

在深入代碼之前,讓我們先理解UDAF的架構(gòu)。每一行將被處理,結(jié)果可以存儲(chǔ)在一個(gè)聚合緩沖區(qū)中(在worker點(diǎn)上)。請(qǐng)注意,緩沖區(qū)不必反映傳入數(shù)據(jù)的結(jié)構(gòu):您將定義它的模式,它可以存儲(chǔ)其他元素。圖15.7說(shuō)明了聚合及其結(jié)果緩沖區(qū)的機(jī)制。

圖15.7 當(dāng)你的代碼正在分析數(shù)據(jù)集的每一行時(shí),中間結(jié)果可以保存在一個(gè)緩沖區(qū)中。

現(xiàn)在,讓我們看看如何實(shí)現(xiàn)UDAF本身。當(dāng)你的應(yīng)用程序調(diào)用這個(gè)函數(shù)時(shí),它就是一個(gè)函數(shù);然而,當(dāng)涉及到它的實(shí)現(xiàn)時(shí),它是一個(gè)完整的類(lèi)。這個(gè)類(lèi)必須擴(kuò)展UserDefinedAggregateFunction(在org.apache .spark.sql.expressions包)。

因此,實(shí)現(xiàn)UDAF的類(lèi)必須實(shí)現(xiàn)以下方法:

bufferSchema()——定義緩沖區(qū)的模式。

dataType()——表示來(lái)自聚合函數(shù)的數(shù)據(jù)類(lèi)型。

deterministic()——當(dāng)Spark通過(guò)分割數(shù)據(jù)來(lái)執(zhí)行時(shí),它會(huì)分別處理數(shù)據(jù)塊并將它們組合在一起。如果UDAF邏輯使結(jié)果獨(dú)立于處理和組合數(shù)據(jù)的順序,則UDAF是確定性的。

evaluate()——根據(jù)給定的聚合緩沖區(qū)計(jì)算該UDAF的最終結(jié)果。

initialize()——初始化給定的聚合緩沖區(qū)。

inputSchema()——描述發(fā)送到UDAF的輸入的模式。

merge()——合并兩個(gè)聚合緩沖區(qū)并存儲(chǔ)更新后的緩沖區(qū)值。當(dāng)我們將兩個(gè)部分聚合的數(shù)據(jù)元素合并在一起時(shí),將調(diào)用此方法。

update()——用新的輸入數(shù)據(jù)更新給定的聚合緩沖區(qū)。每個(gè)輸入行調(diào)用一次此方法。

現(xiàn)在,您擁有了構(gòu)建UDAF所需的所有元素,如清單15.25所示。注意,這個(gè)類(lèi)繼承了UserDefinedAggregateFunction,它實(shí)現(xiàn)了Serializable。您將需要定義一個(gè)serialVersionUID變量,但最重要的是,該類(lèi)的每個(gè)元素也需要是可序列化的。

//清單15.25關(guān)注于UDAF:

inputSchema()方法定義了發(fā)送給函數(shù)的數(shù)據(jù)的模式。在本例中,您接收到的是一個(gè)整數(shù),表示訂單中的原始項(xiàng)數(shù)。Spark中的一個(gè)模式,你已經(jīng)用過(guò)幾次了,是用StructType實(shí)現(xiàn)的:

@Override? ? public StructType inputSchema() {? ? ? List<StructField> inputFields = new ArrayList<>();? ? ? inputFields .add(? ? ? ? ? ? DataTypes.createStructField( "_c0" , DataTypes. IntegerType , true ));? ? ? return DataTypes.createStructType( inputFields );?????}

bufferSchema()方法定義聚合緩沖區(qū)的模式,用于存儲(chǔ)中間結(jié)果。在本例中,您只需要一列存儲(chǔ)整數(shù)。對(duì)于更復(fù)雜的聚合流程,可能需要更多的列。

@OverridepublicStructTypebufferSchema(){List bufferFields =newArrayList<>();? ? ? bufferFields .add(DataTypes.createStructField("sum", DataTypes. IntegerType ,true));returnDataTypes.createStructType( bufferFields );? ? }@OverridepublicDataTypedataType(){returnDataTypes. IntegerType ;? ? }@Overridepublicbooleandeterministic(){returntrue;? ? }

很好,initialize()方法初始化內(nèi)部緩沖區(qū)。在這種情況下,由于這是一個(gè)相當(dāng)簡(jiǎn)單的聚合,緩沖區(qū)將被設(shè)置為0。

然而,由類(lèi)履行的契約需要遵循這個(gè)基本規(guī)則。在兩個(gè)初始緩沖區(qū)上應(yīng)用merge()方法應(yīng)該返回初始緩沖區(qū)本身;例如,merge(initialBuffer, initialBuffer) = initialBuffer。

@Overridepublicvoidinitialize(MutableAggregationBuffer buffer ){? ? ? buffer .update(0,0);? ? }

該操作發(fā)生在update()方法中。您將在這里處理數(shù)據(jù)。你接收到的緩沖區(qū)可能包含數(shù)據(jù),也可能不包含數(shù)據(jù),所以不能忽略它:在第一次調(diào)用中,它將不包含初始化數(shù)據(jù)以外的數(shù)據(jù)。然而,在隨后的調(diào)用中,數(shù)據(jù)已經(jīng)在緩沖區(qū)中了,所以不應(yīng)該忽略它:

@Overridepublicvoidupdate(MutableAggregationBuffer buffer , Row input ){...intinitialValue = buffer .getInt(0);intinputValue = input .getInt(0);intoutputValue =0;if( inputValue < MAX_POINT_PER_ORDER ) {? ? ? ? outputValue = inputValue ;}else{? ? ? ? outputValue = MAX_POINT_PER_ORDER ;? ? ? }? ? ? outputValue += initialValue ;buffer .update(0, outputValue );? ? }

merge()方法合并兩個(gè)聚合緩沖區(qū),并將更新后的緩沖區(qū)值存儲(chǔ)回聚合緩沖區(qū)中。在這個(gè)場(chǎng)景中,當(dāng)有兩個(gè)包含忠誠(chéng)度點(diǎn)的緩沖區(qū)時(shí),只需相加它們:

@Override? ? public void merge(MutableAggregationBuffer buffer , Row row ) {buffer.update(0,buffer.getInt(0) +row.getInt(0));? ? }

最后,evaluate()方法根據(jù)給定的聚合緩沖區(qū)計(jì)算這個(gè)UDAF的最終結(jié)果:

@OverridepublicIntegerevaluate(Row row ){returnrow .getInt(0);? ? } }

在本節(jié)中,您已經(jīng)使用并構(gòu)建了自己的用戶(hù)定義聚合函數(shù),這有點(diǎn)棘手。您遵循的用例是一個(gè)簡(jiǎn)單的忠誠(chéng)度點(diǎn)歸屬,但您可以想象其他類(lèi)型的場(chǎng)景。

如果您有興趣了解更多關(guān)于聚合如何工作的信息,可以在Log4j.properties中激活跟蹤日志:

log4j.logger.net.jgp= DEBUG//修改為log4j.logger.net.jgp= TRACE

在下一次執(zhí)行時(shí),你將得到詳細(xì)的輸出:

...alize(PointAttributionUdaf.java:79):->initialize()-bufferas1row(s)...alize(PointAttributionUdaf.java:79):->initialize()-bufferas1row(s)...pdate(PointAttributionUdaf.java:92):->update(),inputrowhas1args...pdate(PointAttributionUdaf.java:97):->update(0, 1) ...

總結(jié)

聚合是一種對(duì)數(shù)據(jù)進(jìn)行分組的方法,這樣您就可以從更高或更宏觀的級(jí)別查看數(shù)據(jù)。

Apache Spark可以使用Spark SQL(通過(guò)創(chuàng)建一個(gè)視圖)或dataframe API對(duì)dataframe進(jìn)行聚合。

groupBy()方法等價(jià)于SQL GROUP BY語(yǔ)句。

在執(zhí)行聚合之前,需要準(zhǔn)備和清理數(shù)據(jù)。這些步驟可以通過(guò)轉(zhuǎn)換來(lái)完成(第12章)。

聚合可以通過(guò)groupBy()方法之后鏈接的方法執(zhí)行,也可以通過(guò)agg()方法內(nèi)部的靜態(tài)函數(shù)執(zhí)行。

Spark的聚合可以通過(guò)自定義的自定義聚合函數(shù)(UDAFs)進(jìn)行擴(kuò)展。

一個(gè)UDAF必須在你的Spark會(huì)話(huà)中通過(guò)名字注冊(cè)。

使用callUDF()方法和UDAF名稱(chēng)來(lái)調(diào)用UDAF。

一個(gè)UDAF作為一個(gè)類(lèi)實(shí)現(xiàn),它應(yīng)該實(shí)現(xiàn)幾個(gè)方法。

使用agg()方法一次對(duì)多個(gè)列執(zhí)行聚合。

您可以使用sum()方法和靜態(tài)函數(shù)來(lái)計(jì)算集合的和。

可以使用avg()方法和靜態(tài)函數(shù)來(lái)計(jì)算集合的平均值。

可以使用max()方法和靜態(tài)函數(shù)來(lái)提取集合的最大值。

可以使用min()方法和靜態(tài)函數(shù)來(lái)提取集合的最小值。

其他聚合函數(shù)包括許多統(tǒng)計(jì)方法,如:approx_count_distinct() , collect_list() , collect_set() , corr() , count() , countDistinct() , covar_pop() , covar_samp() , first() , grouping() , grouping _id() , kurtosis() , last() , mean() , skewness() , stddev() , stddev_pop() , stddev_samp() , sumDistinct() , var_pop() , var_samp() , 和variance()

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容