spark dataframe添加新的列

使用spark的dataframe進行計算時有時需要添加新的列。本文介紹兩種添加新列的方法,比較常見的一種方法是調(diào)用dataframe的withColumn方法,但是該方法存在一定的限制,即新添加的列只能根據(jù)現(xiàn)有列轉(zhuǎn)換得到;另一種方法是利用UDF(user defined function)模塊。下面結(jié)合例子進行說明,現(xiàn)有預(yù)測得到的pm2.5數(shù)據(jù),需要添加其他污染項目的預(yù)測數(shù)據(jù)及預(yù)測時間。

1、withColumn

dataframe的withColumn方法可以用于添加新的列,但是新的列僅能根據(jù)現(xiàn)有列計算得到。

yHat = yHat.withColumn("pm25", yHat["pm25"]*(maxValue - minValue) + minValue)
yHat = yHat.withColumn("pm10", yHat["pm25"] + 10)
yHat = yHat.withColumn("CO", yHat["pm25"] + 20)
yHat = yHat.withColumn("NO2", yHat["pm25"] + 30)
yHat = yHat.withColumn("NO", yHat["pm25"] + 40)
yHat = yHat.withColumn("SO2", yHat["pm25"] + 50)
圖1

2、udf

除了withColumn方法,還可以利用spark的udf模塊添加新的列。在本例中,還需要添加相應(yīng)的時間列,此時withColumn方法并不適用,需要導(dǎo)入udf方法,該方法有兩個參數(shù),分別為自定義的函數(shù)名及返回值類型。

global idx
idx = 0
date = gettime()
def set_date(x):
    global idx  # 將idx設(shè)置為全局變量
    if x is not None:
        idx += 1
        return date[idx - 1]
index = udf(set_date, StringType())
yHat = yHat.withColumn("date", index(yHat["pm25"]))
圖2
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容