Learning Spark [8] - MLlib庫(kù) - 線性回歸

機(jī)器學(xué)習(xí)數(shù)據(jù)管道(Machine Learning Pipeline)

Pipeline的概念,在很多機(jī)器學(xué)習(xí)的模型中都存在,是一個(gè)種整理以及操控?cái)?shù)據(jù)的方法。在MLlib中,Pipeline API提供了一個(gè)在dataframe之上,管理機(jī)器學(xué)習(xí)工作流的接口。

MLlib術(shù)語(yǔ)

  • Transformer
    輸入一個(gè)Dataframe,并輸出一個(gè)包含了一些新列的Dataframe。Transformer不會(huì)從數(shù)據(jù)中學(xué)習(xí)到參數(shù),且只會(huì)引用一些rule-based規(guī)則去轉(zhuǎn)換數(shù)據(jù)。函數(shù)為transform()

Transformer主要用于清洗數(shù)據(jù),以便于數(shù)據(jù)可以用于模型之中。

  • Estimator
    Estimator會(huì)從數(shù)據(jù)中學(xué)習(xí)到參數(shù),根據(jù)函數(shù)fit()返回一個(gè)Model

該Model便可以被認(rèn)定為一個(gè)Transformer

  • Pipeline
    將一個(gè)系列的Transformer和Estimator整理為一個(gè)模型。Pipeline自身同樣為Estimator,函數(shù)為pipeline.fit(),并返回一個(gè)PipelineModel。

PipelineModel同樣可以被認(rèn)定為Transformer

例子

數(shù)據(jù)集為San Francisco housing dataset from Inside Airbnb。該數(shù)據(jù)集包括了Airbnb的租房的信息,例如臥室間數(shù)、位置、評(píng)價(jià)等等。我們的目標(biāo)為預(yù)測(cè)一間夜的出租價(jià)格。
在起初拿到數(shù)據(jù)后,我們需要對(duì)數(shù)據(jù)的異常值或者離群值進(jìn)行去除(例如:小于$0的出租價(jià)),并進(jìn)行數(shù)據(jù)清洗(例如轉(zhuǎn)換數(shù)據(jù)類型),等等,再次不過多贅述。

# In Python
from pyspark.sql import SparkSession

# create new spark instance
spark = (SparkSession
         .builder
         .appName('SparkSQLExampleApp')
         # .master("local[*]")
         # .config("spark.sql.catalogImplementation","hive")
         # .enableHiveSupport()
         .getOrCreate())

path = '.../sf-airbnb-clean.parquet'

airbnb = spark.read.parquet(path)
len(airbnb.columns)
34

該數(shù)據(jù)集有34個(gè)列,我們選擇幾列來(lái)?yè)碛幸恍┐蟾诺臄?shù)據(jù)概覽。

airbnbDF.select('neighbourhood_cleansed', 'room_type', 'bedrooms', 'bathrooms',                 
                'number_of_reviews', 'price').show(5)
+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows

創(chuàng)建訓(xùn)練集與測(cè)試集

訓(xùn)練集與測(cè)試集
# Split in Train and Test
trainDF, testDF = airbnb.randomSplit([.8, .2], seed = 42) # seed is for set the randomness
print(f'train size: {trainDF.count()}, test size: {testDF.count()}')
train size: 5780, test size: 1366

單元線性回歸

在Spark中建立線性回歸模型,我們需要先將所有的自變量合并到一個(gè)向量里。我們先只選擇一個(gè)自變量:臥室間數(shù)。我們可以使用vectorAssembler Transofrmer來(lái)進(jìn)行這個(gè)操作。

from pyspark.ml.feature import VectorAssembler 

vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features") 
vecTrainDF = vecAssembler.transform(trainDF) 
vecTrainDF.select("bedrooms", "features", "price").show(10)
+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows

在擁有的自變量(臥室間數(shù))和因變量(價(jià)格)后,我們就可以進(jìn)行下一步,構(gòu)建模型。

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
lrModel = lr.fit(vecTrainDF)

這里的lrModel就是一個(gè)estimator,它包含了使用訓(xùn)練集訓(xùn)練出來(lái)的參數(shù),也是就是線性方程y=ax+b中的a。實(shí)際的結(jié)果如下

a = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f'The formula for the linear regression is price = {a} * num_of_bedrooms + ')
The formula for the linear regression is price = 123.68 * num_of_bedrooms + 47.51

建立Pipeline(管道)

建立pipeline可以更好管理代碼,我們就省去了上述冗長(zhǎng)的代碼,從而更優(yōu)雅的訓(xùn)練模型或者使用測(cè)試集來(lái)測(cè)試模型的性能。

from pyspark.ml import Pipeline

# transformer config
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features") 
# model config
lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
# model training
pipelineModel = Pipeline(stages = [vecAssembler, lr]).fit(trainDF)

# At this time, pipelineModel已經(jīng)自動(dòng)可以以transformer來(lái)調(diào)用,所以便可以很便利的使用測(cè)試集來(lái)測(cè)試模型性能。
predDF = pipelineModel.transform(testDF)
predDF.select('bedrooms', 'features', 'price', 'prediction').show(10)
+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows

多元線性回歸&虛擬變量

為了解決自變量中的離散值,我們可以講該變量轉(zhuǎn)換為Dummy Variable(虛擬變量)。
在Spark中,函數(shù)OneHotEncoder()可以實(shí)現(xiàn)這一轉(zhuǎn)換。

from pyspark.ml.feature import OneHotEncoder, StringIndexer

# dummy variable 
categoricalCols = [field for (field, dataTypes) in trainDF.dtypes if dataTypes == 'string']

indexOutputCols = [x + 'Index' for x in categoricalCols]
oheOutputCols = [x + 'OHE' for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols,                               
                              outputCols=indexOutputCols,                               
                              handleInvalid = 'skip')

oheEncoder = OneHotEncoder(inputCols = indexOutputCols,
                           outputCols = oheOutputCols)

numericCols = [field for (field, dataTypes) in trainDF.dtypes if ((dataTypes == 'double') & (field != 'price'))]
assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols = assemblerInputs,
                               outputCol = 'features')

lr = LinearRegression(labelCol = 'price', featuresCol = 'features')

pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
|            features|             price|        prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...|  4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows

測(cè)試模型性能

常用模型性能的測(cè)試指標(biāo)有:RMSE(root mean-square error),和R^2

from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
    predictionCol = 'prediction',
    labelCol = 'price',
    metricName = 'rmse')
rmse = regressionEvaluator.evaluate(predDF)
print(f'RMSE is {round(rmse,2)}')
RMSE is 220.56
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) 
print(f"R2 is {round(r2, 2)}")
R2 is 0.16

R2過小,我們嘗試對(duì)因變量進(jìn)行對(duì)數(shù)轉(zhuǎn)換。

# log-transform y
from pyspark.sql.functions import log, col

logTrainDF = trainDF.withColumn('price', log(col('price')))
logTestDF = testDF.withColumn('price', log(col('price')))

log_pipelineModel = pipeline.fit(logTrainDF)
predDF = log_pipelineModel.transform(logTestDF)

r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) 
print(f"R2 is {round(r2, 2)}")
R2 is 0.57
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
|            features|             price|        prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...|  4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容