使用spark的dataframe進行計算時有時需要添加新的列。本文介紹兩種添加新列的方法,比較常見的一種方法是調(diào)用dataframe的withColumn方法,但是該方法存在一定的限制,即新添加的列只能根據(jù)現(xiàn)有列轉(zhuǎn)換得到;另一種方法是利用UDF(user defined function)模塊。下面結(jié)合例子進行說明,現(xiàn)有預(yù)測得到的pm2.5數(shù)據(jù),需要添加其他污染項目的預(yù)測數(shù)據(jù)及預(yù)測時間。
1、withColumn
dataframe的withColumn方法可以用于添加新的列,但是新的列僅能根據(jù)現(xiàn)有列計算得到。
yHat = yHat.withColumn("pm25", yHat["pm25"]*(maxValue - minValue) + minValue)
yHat = yHat.withColumn("pm10", yHat["pm25"] + 10)
yHat = yHat.withColumn("CO", yHat["pm25"] + 20)
yHat = yHat.withColumn("NO2", yHat["pm25"] + 30)
yHat = yHat.withColumn("NO", yHat["pm25"] + 40)
yHat = yHat.withColumn("SO2", yHat["pm25"] + 50)
圖1
2、udf
除了withColumn方法,還可以利用spark的udf模塊添加新的列。在本例中,還需要添加相應(yīng)的時間列,此時withColumn方法并不適用,需要導(dǎo)入udf方法,該方法有兩個參數(shù),分別為自定義的函數(shù)名及返回值類型。
global idx
idx = 0
date = gettime()
def set_date(x):
global idx # 將idx設(shè)置為全局變量
if x is not None:
idx += 1
return date[idx - 1]
index = udf(set_date, StringType())
yHat = yHat.withColumn("date", index(yHat["pm25"]))
圖2