UDF
為了滿足用戶的不同的分析需求,Spark允許使用者自己定義函數(shù),供用戶在Spark SQL中使用。例如數(shù)據(jù)科學(xué)家可以將一個(gè)機(jī)器學(xué)習(xí)模型封裝在一個(gè)函數(shù)內(nèi),提供給數(shù)據(jù)分析師在無需知道模型內(nèi)部復(fù)雜的知識(shí)下,直接使用。
例子:創(chuàng)建一個(gè)返回立方的函數(shù)
# in Python
from pyspark.sql.types import LongType
# create function
def cubed(num):
return num ** 3
# register UDF
spark.udf.register('cubed', cubed, LongType())
# generate a temp view
spark.range(1,9).createOrReplaceTempView('udf_test')
# query
spark.sql('SELECT id, cubed(id) AS id_cubed FROM udf_test').show()
+---+--------+
| id|id_cubed|
+---+--------+
| 1| 1|
| 2| 8|
| 3| 27|
| 4| 64|
| 5| 125|
| 6| 216|
| 7| 343|
| 8| 512|
+---+--------+
Pandas-UDF
為了提升UDF的計(jì)算效率,可以使用Python中的Pandas包來創(chuàng)建Pandas UDF(或者叫向量化(Vectorized)UDF)。
關(guān)于向量化函數(shù),在Pandas包以及R中的dply族函數(shù),都是很好的例子。
# In Python
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
def cubed(a: pd.Series) -> pd.Series:
return a ** 3
cubed_udf = pandas_udf(cubed, returnType = LongType())
spark.range(1,9).createOrReplaceTempView('udf_test')
spark.sql('SELECT id, cubed(id) AS id_cubed FROM udf_test').show()
+---+--------+
| id|id_cubed|
+---+--------+
| 1| 1|
| 2| 8|
| 3| 27|
| 4| 64|
| 5| 125|
| 6| 216|
| 7| 343|
| 8| 512|
+---+--------+