機(jī)器學(xué)習(xí)數(shù)據(jù)管道(Machine Learning Pipeline)
Pipeline的概念,在很多機(jī)器學(xué)習(xí)的模型中都存在,是一個(gè)種整理以及操控?cái)?shù)據(jù)的方法。在MLlib中,Pipeline API提供了一個(gè)在dataframe之上,管理機(jī)器學(xué)習(xí)工作流的接口。
MLlib術(shù)語(yǔ)
- Transformer
輸入一個(gè)Dataframe,并輸出一個(gè)包含了一些新列的Dataframe。Transformer不會(huì)從數(shù)據(jù)中學(xué)習(xí)到參數(shù),且只會(huì)引用一些rule-based規(guī)則去轉(zhuǎn)換數(shù)據(jù)。函數(shù)為transform()。
Transformer主要用于清洗數(shù)據(jù),以便于數(shù)據(jù)可以用于模型之中。
- Estimator
Estimator會(huì)從數(shù)據(jù)中學(xué)習(xí)到參數(shù),根據(jù)函數(shù)fit()返回一個(gè)Model
該Model便可以被認(rèn)定為一個(gè)Transformer
- Pipeline
將一個(gè)系列的Transformer和Estimator整理為一個(gè)模型。Pipeline自身同樣為Estimator,函數(shù)為pipeline.fit(),并返回一個(gè)PipelineModel。
PipelineModel同樣可以被認(rèn)定為Transformer
例子
數(shù)據(jù)集為San Francisco housing dataset from Inside Airbnb。該數(shù)據(jù)集包括了Airbnb的租房的信息,例如臥室間數(shù)、位置、評(píng)價(jià)等等。我們的目標(biāo)為預(yù)測(cè)一間夜的出租價(jià)格。
在起初拿到數(shù)據(jù)后,我們需要對(duì)數(shù)據(jù)的異常值或者離群值進(jìn)行去除(例如:小于$0的出租價(jià)),并進(jìn)行數(shù)據(jù)清洗(例如轉(zhuǎn)換數(shù)據(jù)類型),等等,再次不過多贅述。
# In Python
from pyspark.sql import SparkSession
# create new spark instance
spark = (SparkSession
.builder
.appName('SparkSQLExampleApp')
# .master("local[*]")
# .config("spark.sql.catalogImplementation","hive")
# .enableHiveSupport()
.getOrCreate())
path = '.../sf-airbnb-clean.parquet'
airbnb = spark.read.parquet(path)
len(airbnb.columns)
34
該數(shù)據(jù)集有34個(gè)列,我們選擇幾列來(lái)?yè)碛幸恍┐蟾诺臄?shù)據(jù)概覽。
airbnbDF.select('neighbourhood_cleansed', 'room_type', 'bedrooms', 'bathrooms',
'number_of_reviews', 'price').show(5)
+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed| room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
| Western Addition|Entire home/apt| 1.0| 1.0| 180.0|170.0|
| Bernal Heights|Entire home/apt| 2.0| 1.0| 111.0|235.0|
| Haight Ashbury| Private room| 1.0| 4.0| 17.0| 65.0|
| Haight Ashbury| Private room| 1.0| 4.0| 8.0| 65.0|
| Western Addition|Entire home/apt| 2.0| 1.5| 27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows
創(chuàng)建訓(xùn)練集與測(cè)試集

# Split in Train and Test
trainDF, testDF = airbnb.randomSplit([.8, .2], seed = 42) # seed is for set the randomness
print(f'train size: {trainDF.count()}, test size: {testDF.count()}')
train size: 5780, test size: 1366
單元線性回歸
在Spark中建立線性回歸模型,我們需要先將所有的自變量合并到一個(gè)向量里。我們先只選擇一個(gè)自變量:臥室間數(shù)。我們可以使用vectorAssembler Transofrmer來(lái)進(jìn)行這個(gè)操作。
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)
+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
| 1.0| [1.0]|200.0|
| 1.0| [1.0]|130.0|
| 1.0| [1.0]| 95.0|
| 1.0| [1.0]|250.0|
| 3.0| [3.0]|250.0|
| 1.0| [1.0]|115.0|
| 1.0| [1.0]|105.0|
| 1.0| [1.0]| 86.0|
| 1.0| [1.0]|100.0|
| 2.0| [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows
在擁有的自變量(臥室間數(shù))和因變量(價(jià)格)后,我們就可以進(jìn)行下一步,構(gòu)建模型。
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
lrModel = lr.fit(vecTrainDF)
這里的lrModel就是一個(gè)estimator,它包含了使用訓(xùn)練集訓(xùn)練出來(lái)的參數(shù),也是就是線性方程y=ax+b中的a。實(shí)際的結(jié)果如下
a = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f'The formula for the linear regression is price = {a} * num_of_bedrooms + ')
The formula for the linear regression is price = 123.68 * num_of_bedrooms + 47.51
建立Pipeline(管道)
建立pipeline可以更好管理代碼,我們就省去了上述冗長(zhǎng)的代碼,從而更優(yōu)雅的訓(xùn)練模型或者使用測(cè)試集來(lái)測(cè)試模型的性能。
from pyspark.ml import Pipeline
# transformer config
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
# model config
lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
# model training
pipelineModel = Pipeline(stages = [vecAssembler, lr]).fit(trainDF)
# At this time, pipelineModel已經(jīng)自動(dòng)可以以transformer來(lái)調(diào)用,所以便可以很便利的使用測(cè)試集來(lái)測(cè)試模型性能。
predDF = pipelineModel.transform(testDF)
predDF.select('bedrooms', 'features', 'price', 'prediction').show(10)
+--------+--------+------+------------------+
|bedrooms|features| price| prediction|
+--------+--------+------+------------------+
| 1.0| [1.0]| 85.0|171.18598011578285|
| 1.0| [1.0]| 45.0|171.18598011578285|
| 1.0| [1.0]| 70.0|171.18598011578285|
| 1.0| [1.0]| 128.0|171.18598011578285|
| 1.0| [1.0]| 159.0|171.18598011578285|
| 2.0| [2.0]| 250.0|294.86172649777757|
| 1.0| [1.0]| 99.0|171.18598011578285|
| 1.0| [1.0]| 95.0|171.18598011578285|
| 1.0| [1.0]| 100.0|171.18598011578285|
| 1.0| [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows
多元線性回歸&虛擬變量
為了解決自變量中的離散值,我們可以講該變量轉(zhuǎn)換為Dummy Variable(虛擬變量)。
在Spark中,函數(shù)OneHotEncoder()可以實(shí)現(xiàn)這一轉(zhuǎn)換。
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# dummy variable
categoricalCols = [field for (field, dataTypes) in trainDF.dtypes if dataTypes == 'string']
indexOutputCols = [x + 'Index' for x in categoricalCols]
oheOutputCols = [x + 'OHE' for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols,
outputCols=indexOutputCols,
handleInvalid = 'skip')
oheEncoder = OneHotEncoder(inputCols = indexOutputCols,
outputCols = oheOutputCols)
numericCols = [field for (field, dataTypes) in trainDF.dtypes if ((dataTypes == 'double') & (field != 'price'))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = 'features')
lr = LinearRegression(labelCol = 'price', featuresCol = 'features')
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
| features| price| prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...| 4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows
測(cè)試模型性能
常用模型性能的測(cè)試指標(biāo)有:RMSE(root mean-square error),和R^2
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol = 'prediction',
labelCol = 'price',
metricName = 'rmse')
rmse = regressionEvaluator.evaluate(predDF)
print(f'RMSE is {round(rmse,2)}')
RMSE is 220.56
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {round(r2, 2)}")
R2 is 0.16
R2過小,我們嘗試對(duì)因變量進(jìn)行對(duì)數(shù)轉(zhuǎn)換。
# log-transform y
from pyspark.sql.functions import log, col
logTrainDF = trainDF.withColumn('price', log(col('price')))
logTestDF = testDF.withColumn('price', log(col('price')))
log_pipelineModel = pipeline.fit(logTrainDF)
predDF = log_pipelineModel.transform(logTestDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {round(r2, 2)}")
R2 is 0.57
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
| features| price| prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...| 4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee