Spark函數(shù)擴(kuò)展功能介紹

問題導(dǎo)讀

1.UDF對spark sql的作用是什么?

2.用Scala編寫的UDF與普通的Scala函數(shù)唯一的區(qū)別在什么地方?

3.如何在spark中使用UDF?

在數(shù)據(jù)分析領(lǐng)域中,沒有人能預(yù)見所有的數(shù)據(jù)運(yùn)算,以至于將它們都內(nèi)置好,一切準(zhǔn)備完好,用戶只需要考慮用,萬事大吉。擴(kuò)展性是一個平臺的生存之本,一個封閉的平臺如何能夠擁抱變化?在對數(shù)據(jù)進(jìn)行分析時,無論是算法也好,分析邏輯也罷,最好的重用單位自然還是:函數(shù)。

故而,對于一個大數(shù)據(jù)處理平臺而言,倘若不能支持函數(shù)的擴(kuò)展,確乎是不可想象的。Spark首先是一個開源框架,當(dāng)我們發(fā)現(xiàn)一些函數(shù)具有通用的性質(zhì),自然可以考慮contribute給社區(qū),直接加入到Spark的源代碼中。我們欣喜地看到隨著Spark版本的演化,確實(shí)涌現(xiàn)了越來越多對于數(shù)據(jù)分析師而言稱得上是一柄柄利器的強(qiáng)大函數(shù),例如博客文章《Spark 1.5 DataFrame API Highlights: Date/Time/String Handling, Time Intervals, and UDAFs》介紹了在1.5中為DataFrame提供了豐富的處理日期、時間和字符串的函數(shù);以及在Spark SQL 1.4中就引入的Window Function。

然而,針對特定領(lǐng)域進(jìn)行數(shù)據(jù)分析的函數(shù)擴(kuò)展,Spark提供了更好地置放之處,那就是所謂的“UDF(User Defined Function)”。

UDF的引入極大地豐富了Spark SQL的表現(xiàn)力。一方面,它讓我們享受了利用Scala(當(dāng)然,也包括Java或Python)更為自然地編寫代碼實(shí)現(xiàn)函數(shù)的福利,另一方面,又能精簡SQL(或者DataFrame的API),更加寫意自如地完成復(fù)雜的數(shù)據(jù)分析。尤其采用SQL語句去執(zhí)行數(shù)據(jù)分析時,UDF幫助我們在SQL函數(shù)與Scala函數(shù)之間左右逢源,還可以在一定程度上化解不同數(shù)據(jù)源具有歧異函數(shù)的尷尬。想想不同關(guān)系數(shù)據(jù)庫處理日期或時間的函數(shù)名稱吧!

用Scala編寫的UDF與普通的Scala函數(shù)沒有任何區(qū)別,唯一需要多執(zhí)行的一個步驟是要讓SQLContext注冊它。例如:

deflen(bookTitle:String):Int=bookTitle.length

sqlContext.udf.register("len", len_)

valbooksWithLongTitle=sqlContext.sql("select title, author from books where len(title) >10

編寫的UDF可以放到SQL語句的fields部分,也可以作為where、groupBy或者h(yuǎn)aving子句的一部分。

既然是UDF,它也得保持足夠的特殊性,否則就完全與Scala函數(shù)泯然眾人也。這一特殊性不在于函數(shù)的實(shí)現(xiàn),而是思考函數(shù)的角度,需要將UDF的參數(shù)視為數(shù)據(jù)表的某個列。例如上面len函數(shù)的參數(shù)bookTitle,雖然是一個普通的字符串,但當(dāng)其代入到Spark SQL的語句中,實(shí)參title實(shí)際上是表中的一個列(可以是列的別名)。

當(dāng)然,我們也可以在使用UDF時,傳入常量而非表的列名。讓我們稍稍修改一下剛才的函數(shù),讓長度10作為函數(shù)的參數(shù)傳入:


deflengthLongerThan(bookTitle:String, length:Int):Boolean=bookTitle.length > length

sqlContext.udf.register("longLength", lengthLongerThan_)

valbooksWithLongTitle=sqlContext.sql("select title, author from books where longLength(title,

若使用DataFrame的API,則可以以字符串的形式將UDF傳入:

valbooksWithLongTitle=dataFrame.filter("longLength(title, 10)")

DataFrame的API也可以接收Column對象,可以用$符號來包裹一個字符串表示一個Column。$是定義在SQLContext對象implicits中的一個隱式轉(zhuǎn)換。此時,UDF的定義也不相同,不能直接定義Scala函數(shù),而是要用定義在org.apache.spark.sql.functions中的udf方法來接收一個函數(shù)。這種方式無需register:

importorg.apache.spark.sql.functions._

vallongLength=udf((bookTitle:String, length:Int)=> bookTitle.length > length)

importsqlContext.implicits._

valbooksWithLongTitle=dataFrame.filter(longLength($"title", $"10"))

注意,代碼片段中的sqlContext是之前已經(jīng)實(shí)例化的SQLContext對象。

不幸,運(yùn)行這段代碼會拋出異常:

cannot resolve '10' given input columns id, title, author, price, publishedDate;

因?yàn)椴捎?來包裹一個常量,會讓Spark錯以為這是一個Column。這時,需要定義在org.apache.spark.sql.functions中的lit函數(shù)來幫助:


valbooksWithLongTitle=dataFrame.filter(longLength($"title", lit(10)))

普通的UDF卻也存在一個缺陷,就是無法在函數(shù)內(nèi)部支持對表數(shù)據(jù)的聚合運(yùn)算。例如,當(dāng)我要對銷量執(zhí)行年度同比計算,就需要對當(dāng)年和上一年的銷量分別求和,然后再利用同比公式進(jìn)行計算。此時,UDF就無能為力了。

該UDAF(User Defined Aggregate Function)粉墨登場的時候了。

Spark為所有的UDAF定義了一個父類UserDefinedAggregateFunction。要繼承這個類,需要實(shí)現(xiàn)父類的幾個抽象方法:

definputSchema:StructType

defbufferSchema:StructType

defdataType:DataType

defdeterministic:Boolean

definitialize(buffer:MutableAggregationBuffer):Unit

defupdate(buffer:MutableAggregationBuffer, input:Row):Unit

defmerge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit

defevaluate(buffer:Row):Any

可以將inputSchema理解為UDAF與DataFrame列有關(guān)的輸入樣式。例如年同比函數(shù)需要對某個可以運(yùn)算的指標(biāo)與時間維度進(jìn)行處理,就需要在inputSchema中定義它們。

definputSchema:StructType={

StructType(StructField("metric", DoubleType)::StructField("timeCategory", DateType)::Nil)

}

代碼創(chuàng)建了擁有兩個StructField的StructType。StructField的名字并沒有特別要求,完全可以認(rèn)為是兩個內(nèi)部結(jié)構(gòu)的列名占位符。至于UDAF具體要操作DataFrame的哪個列,取決于調(diào)用者,但前提是數(shù)據(jù)類型必須符合事先的設(shè)置,如這里的DoubleType與DateType類型。這兩個類型被定義在org.apache.spark.sql.types中。

bufferSchema用于定義存儲聚合運(yùn)算時產(chǎn)生的中間數(shù)據(jù)結(jié)果的Schema,例如我們需要存儲當(dāng)年與上一年的銷量總和,就需要定義兩個StructField:

defbufferSchema:StructType={

StructType(StructField("sumOfCurrent", DoubleType)::StructField("sumOfPrevious", DoubleType)::Nil)

}

dataType標(biāo)明了UDAF函數(shù)的返回值類型,deterministic是一個布爾值,用以標(biāo)記針對給定的一組輸入,UDAF是否總是生成相同的結(jié)果。

顧名思義,initialize就是對聚合運(yùn)算中間結(jié)果的初始化,在我們這個例子中,兩個求和的中間值都被初始化為0d:


definitialize(buffer:MutableAggregationBuffer):Unit={

buffer.update(0,0.0)

buffer.update(1,0.0)

}

update函數(shù)的第一個參數(shù)為bufferSchema中兩個Field的索引,默認(rèn)以0開始,所以第一行就是針對“sumOfCurrent”的求和值進(jìn)行初始化。

UDAF的核心計算都發(fā)生在update函數(shù)中。在我們這個例子中,需要用戶設(shè)置計算同比的時間周期。這個時間周期值屬于外部輸入,但卻并非inputSchema的一部分,所以應(yīng)該從UDAF對應(yīng)類的構(gòu)造函數(shù)中傳入。我為時間周期定義了一個樣例類,且對于同比函數(shù),我們只要求輸入當(dāng)年的時間周期,上一年的時間周期可以通過對年份減1來完成:

caseclassDateRange(startDate:Timestamp, endDate:Timestamp) {

defin(targetDate:Date):Boolean={

targetDate.before(endDate) && targetDate.after(startDate)

}

}

classYearOnYearBasis(current:DateRange)extendsUserDefinedAggregateFunction {

defupdate(buffer:MutableAggregationBuffer, input:Row):Unit={

if(current.in(input.getAs[Date](1))) {

buffer(0)=buffer.getAs[Double](0) + input.getAs[Double](0)

}

valprevious=DateRange(subtractOneYear(current.startDate), subtractOneYear(current.endDate))

if(previous.in(input.getAs[Date](1))) {

buffer(1)=buffer.getAs[Double](0) + input.getAs[Double](0)

}

}

}

update函數(shù)的第二個參數(shù)input: Row對應(yīng)的并非DataFrame的行,而是被inputSchema投影了的行。以本例而言,每一個input就應(yīng)該只有兩個Field的值。倘若我們在調(diào)用這個UDAF函數(shù)時,分別傳入了銷量和銷售日期兩個列的話,則input(0)代表的就是銷量,input(1)代表的就是銷售日期。

merge函數(shù)負(fù)責(zé)合并兩個聚合運(yùn)算的buffer,再將其存儲到MutableAggregationBuffer中:

[Scala]純文本查看復(fù)制代碼

?

1

2

3

4defmerge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit={

buffer1(0)=buffer1.getAs[Double](0) + buffer2.getAs[Double](0)

buffer1(1)=buffer1.getAs[Double](1) + buffer2.getAs[Double](1)

}

最后,由evaluate函數(shù)完成對聚合Buffer值的運(yùn)算,得到最后的結(jié)果:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容