來源:https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/

PySpark UDF(又名用戶定義函數(shù))是Spark SQL & DataFrame中最有用的特性,用于擴展PySpark構(gòu)建的功能。在本文中,我將解釋什么是UDF?為什么我們需要它,以及如何創(chuàng)建和使用它在DataFrame select(), withColumn()和SQL使用PySpark (Spark with Python)示例。
注意:UDF是最昂貴的操作,因此只有在必要時才使用它們。在本文后面的部分中,我將詳細解釋為什么使用UDF是一個昂貴的操作。
1. PySpark UDF的介紹
1.1什么是UDF?
UDF的別名是用戶定義函數(shù),如果您來自SQL背景,那么UDF對您來說并不新鮮,因為大多數(shù)傳統(tǒng)RDBMS數(shù)據(jù)庫都支持用戶定義函數(shù),這些函數(shù)需要在數(shù)據(jù)庫庫中注冊,并在SQL中將它們作為常規(guī)函數(shù)使用。
PySpark的UDF與傳統(tǒng)數(shù)據(jù)庫上的UDF類似。在PySpark中,您可以在Python語法中創(chuàng)建一個函數(shù),并用PySpark SQL udf()包裝它,或者將它注冊為udf,并分別在DataFrame和SQL上使用它。
1.2為什么我們需要一個UDF?
這些函數(shù)用于擴展框架的函數(shù),并在多個DataFrame上重用這些函數(shù)。例如,您希望將名稱字符串中單詞的每個首字母都轉(zhuǎn)換為大寫字母;PySpark的內(nèi)置特性沒有這個函數(shù),因此您可以為它創(chuàng)建一個UDF,并根據(jù)需要在許多DataFrame上重用它。UDF一旦創(chuàng)建,就可以在多個數(shù)據(jù)框架和SQL表達式上重用。
在創(chuàng)建任何UDF之前,先研究一下Spark SQL函數(shù)中是否已經(jīng)有您想要的類似函數(shù)。PySpark SQL提供了一些預定義的通用函數(shù),并且在每個版本中都添加了更多的新函數(shù)。因此,在你重新發(fā)明輪子之前最好檢查一下。
當你創(chuàng)建UDF時,你需要非常仔細地設計它們,否則你會遇到優(yōu)化和性能問題。
2. 創(chuàng)建PySpark UDF
2.1創(chuàng)建數(shù)據(jù)幀
在我們開始創(chuàng)建UDF之前,首先讓我們創(chuàng)建一個PySpark DataFrame。
spark=SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns=["Seqno","Name"]data=[("1","john jones"),("2","tracey smith"),("3","amy sanders")]
df=spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)
產(chǎn)生以下輸出。
2.2創(chuàng)建一個Python函數(shù)
創(chuàng)建UDF的第一步是創(chuàng)建Python函數(shù)。下面的代碼片段創(chuàng)建了一個函數(shù)convertCase(),它接受一個字符串參數(shù),并將每個單詞的第一個字母轉(zhuǎn)換為大寫字母。UDF接受您選擇的參數(shù)并返回一個值。
2.3將Python函數(shù)轉(zhuǎn)換為PySpark UDF
現(xiàn)在,通過將函數(shù)傳遞給PySpark SQL UDF(),將這個函數(shù)convertCase()轉(zhuǎn)換為UDF,這個函數(shù)可以在org.apache.spark.sql.functions.UDF中找到。請確保在使用這個包之前import它。
PySpark SQL udf()函數(shù)返回org.apache.spark.sql.expressions.UserDefinedFunction類對象。
注意:udf()的默認類型是StringType,因此,您也可以在不返回類型的情況下編寫上述語句。
3.在DataFrame中使用UDF
3.1利用PySpark DataFrame select()使用UDF
現(xiàn)在可以在DataFrame列上使用convertUDF()作為常規(guī)的內(nèi)置函數(shù)。
結(jié)果如下。
3.2在PySpark DataFrame withColumn()中使用UDF
您也可以在DataFrame withColumn()函數(shù)上使用udf,為了解釋這一點,我將創(chuàng)建另一個upperCase()函數(shù),它將輸入字符串轉(zhuǎn)換為大寫字母。
讓我們將upperCase() python函數(shù)轉(zhuǎn)換為UDF,然后將其與DataFrame withColumn()一起使用。下面的示例將“Name”列的值轉(zhuǎn)換為大寫,并創(chuàng)建一個新列“Curated Name”。
結(jié)果如下。
3.3注冊PySpark UDF &在SQL上使用它
為了在PySpark SQL上使用convertCase()函數(shù),您需要通過使用spark.udf.register()向PySpark注冊該函數(shù)。
產(chǎn)生與3.1相似的結(jié)果。
4. 使用注釋創(chuàng)建UDF
在前面的小節(jié)中,您已經(jīng)了解了創(chuàng)建UDF需要兩個步驟,首先,您需要創(chuàng)建一個Python函數(shù),然后使用SQL UDF()函數(shù)將函數(shù)轉(zhuǎn)換為UDF,但是,您可以避免這兩個步驟,通過使用注釋只需一個步驟就可以創(chuàng)建它。
其結(jié)果與3.2相似。
5. 特殊處理
5.1執(zhí)行順序
需要注意的一點是,在PySpark/Spark中并不保證子表達式求值的順序,這意味著表達式不能保證從左到右或以任何其他固定的順序求值。PySpark重新排序查詢優(yōu)化和計劃的執(zhí)行順序,因此,AND, OR, WHERE and HAVING 將產(chǎn)生副作用。
因此,在設計和使用UDF時,必須非常小心,尤其是null處理,因為這會導致運行時異常。
5.2處理null
如果不仔細設計,UDF很容易出錯。例如,當您有一個列在某些記錄上包含值null時。
注意,在上面的代碼片段中,帶有“Seqno 4”的記錄的“name”列的值為“None”。因為我們沒有使用UDF函數(shù)處理null,所以在DataFrame上使用這個函數(shù)返回錯誤。注意,在Python中None被認為是空的。
要記住以下幾點:
(1)最好的做法是在UDF函數(shù)內(nèi)部檢查是否為空,而不是在外部檢查是否為空。
(2)在任何情況下,如果您不能在UDF中執(zhí)行null檢查,那么至少使用if或case WHEN來檢查null并有條件地調(diào)用UDF。
當我們在注冊UDF時檢查null/none時,這將成功執(zhí)行,不會出現(xiàn)錯誤。
5.3使用UDF的性能問題
UDF對PySpark來說是一個黑盒,因此它不能應用優(yōu)化,你將失去所有PySpark在Dataframe/Dataset上做的優(yōu)化。如果可能的話,應該使用Spark SQL內(nèi)置函數(shù),因為這些函數(shù)提供了優(yōu)化。請考慮僅在現(xiàn)有的內(nèi)置SQL函數(shù)沒有UDF時創(chuàng)建UDF。
6 樣例代碼
https://github.com/spark-examples/pyspark-examples/blob/master/pyspark-udf.py
結(jié)論
在本文中,您已經(jīng)了解了以下內(nèi)容:
(1)PySpark UDF是一個用戶定義的函數(shù),用于在Spark中創(chuàng)建一個可重用的函數(shù)。
(2)一旦創(chuàng)建了UDF,就可以在多個DataFrames和SQL上重用它(注冊后)。
(3)udf()的默認類型是StringType。
(4)您需要顯式地處理空值,否則您將看到副作用。