推薦系統(tǒng)之SparkML預測模型構建

1.Spark ML重要概念

1.Spark ML基于什么來處理的?

Spark-Core的核心開發(fā)是基于RDD,但是RDD并不是非常的靈活,如果做一個結構化數(shù)據(jù)處理,還需要轉換成DataFrame(在Python當中引出的概念)DataFrame其實就是行對象的RDD加上schema,類似于文本的數(shù)據(jù),對這些數(shù)據(jù)加入schema,做一些結構的轉換,可以把它簡單地理解為數(shù)據(jù)庫里的一張表,里面有數(shù)據(jù),有類型。RDD和DataFrame的關系可以理解為普通文本和表的對應關系。
所以DataFrame處理起來會更加靈活一些,ML就是基于DataFrame進行處理的,來用于學習的一個數(shù)據(jù)集,所以當我們創(chuàng)建一個DataFrame的時候可以指定一個schem,或者創(chuàng)建一個行對象RDD,可以從已有的RDD轉換而來,DF也可以轉換成RDD。
DataFrame在ML中有多種類型,除了常見的整型、字符串等,還可以支持圖像、特征工程轉換的向量Vector。所以DataFrame是Spark ML用來進行訓練的數(shù)據(jù)集。

2.進行機器學習的流程?

前面講到過,一般機器學習的流程是,先收集數(shù)據(jù)集,將數(shù)據(jù)集劃分為訓練集合測試集,再用訓練集訓練模型,然后用模型對測試集進行預測。最后通過混淆矩陣,借助于AUC、召回率或準確率來進行模型的評估。
這個流程是可以形成管道的,整體就是個DAG(有向無環(huán)圖,spark通過DAG來進行調度的)其實整個模型測試的過程就是一個管道,這些管道會有各種各樣的組件,每一個步驟便是一個組件,組件可以分成兩個類別,第一個是Transformers,用到的函數(shù)是transform。它的作用就是進行轉換,即DataFrame轉成另外一個DataFrame,例如,將原本的數(shù)據(jù)集拆分成訓練集和測試集。對測試集進行預測,預測的過程,也是將原本的測試集(DF)轉換成了預測結果集(另外一個DF)。
第二種類型是Estimators,評估器,用到的函數(shù)是fit。它的作用是應用在一個DF上生成一個轉換器的算法,我們通常的用訓練集來訓練一個模型(邏輯回歸),本質上就是用Estimators的fit方法去做。

2.Spark ML如何工作


上圖上部分就是整個管道的操作,下部分就是不同數(shù)據(jù)類型的變化



上圖就是對上一個模型的整個transformer操作了。

3.Spark ML參數(shù)和保存

1.參數(shù)是所有的轉換器和評估器共享一個公共的api,比如正則化參數(shù)都是可以共用的
2.參數(shù)名Param是一個參數(shù),可以單獨去設置某個參數(shù)的值
3.ParamMap是一個參數(shù)的集合 (parameter, value)
4.傳遞參數(shù)的兩種方式:
為實例設置參數(shù)
傳遞ParamMap給fit()或transform()方法
保存和加載管道:可以對模型和管道進行保存

4.Spark ML代碼實例

實例1:Estimator, Transformer, and Param

步驟:
準備帶標簽和特征的數(shù)據(jù)
創(chuàng)建邏輯回歸的評估器
使用setter方法設置參數(shù)
使用存儲在lr中的參數(shù)來訓練一個模型
使用paramMap選擇指定的參數(shù)
準備測試數(shù)據(jù)
預測結果
代碼如下:

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.sql.Row

val training = sqlContext.createDataFrame(Seq(
(1.0,Vectors.dense(0.0,1.1,0.1)),
(0.0,Vectors.dense(2.0,1.0,-1.1)),
(0.0,Vectors.dense(2.0,1.3,1.0)),
(1.0,Vectors.dense(0.0,1.2,-0.5))
)).toDF("label","features")

val lr = new LogisticRegression()
//查看構建模型的默認參數(shù)
print(lr.explainParams())

打印運行結果如下圖:



從上圖可以看到正則化參數(shù)默認為0.0,最大迭代次數(shù)默認為100.

//設置參數(shù)
lr.setMaxIter(10).setRegParam(0.01)
val model = lr.fit(training)
//查看model的參數(shù)
model.parent.extractParamMap

打印結果如下圖:



可以看到,模型的正則化參數(shù)值被修改為0.01,最大迭代次數(shù)被修改為10.

//重置參數(shù)
val paramMap = ParamMap(lr.maxIter -> 20).
put(lr.maxIter,10).
put(lr.regParam -> 0.1, lr.threshold -> 0.55)
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")
//參數(shù)組合
val paramMapCombined = paramMap ++ paramMap2
val model2 = lr.fit(training,paramMapCombined)
model2.parent.extractParamMap

打印結果如下:



從結果可以看出,被框起來的參數(shù)都被修改或者覆蓋了。

//測試數(shù)據(jù)集
val test = sqlContext.createDataFrame(Seq(
(1.0,Vectors.dense(-1.0,1.5,1.3)),
(0.0,Vectors.dense(3.0,2.0,-0.1)),
(1.0,Vectors.dense(0.0,2.2,-1.5))
)).toDF("label","features")
//測試
model.transform(test).collect
//美化打印
model2.transform(test).select("label","features","myProbability","prediction").collect().
foreach{case Row(label:Double,features:Vector,myProbability:Vector,prediction:Double) =>
println(s"($features,$label) -> myProbability=$myProbability,prediction=$prediction")}

模型測試的結果如下:


實例2:管道使用

步驟:
準備訓練文檔
配置ML管道,包含三個stage:
Tokenizer,HashingTF和lr
安裝管道到數(shù)據(jù)上
保存管道到磁盤
包括安裝好的和未安裝好的
加載管道
準備測試文檔
預測結果

import org.apache.spark.ml.feature.{Tokenizer,HashingTF}
import org.apache.spark.ml.{Pipeline,PipelineModel}

val training = sqlContext.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id","text","label")

val tokenizer = new Tokenizer().
setInputCol("text").
setOutputCol("words")

val hashingTF = new HashingTF().
setNumFeatures(1000).
setInputCol(tokenizer.getOutputCol).
setOutputCol("features")

val lr = new LogisticRegression().
setMaxIter(10).setRegParam(0.01)

val pipeline = new Pipeline().
setStages(Array(tokenizer,hashingTF,lr))

val model = pipeline.fit(training)

pipeline.save("./sparkML-LRpipeline")
model.save("./sparkML-LRmodel")

val model2 = PipelineModel.load("./sparkML-LRmodel")

val test = sqlContext.createDataFrame(Seq(
(4L, "spark h d e"),
(5L, "a f c"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id","text")

model2.transform(test).select("id","text","probability","prediction").collect().
foreach{case Row(id:Long,text:String,probability:Vector,prediction:Double) =>
println(s"($id,$text) -> probability=$probability,prediction=$prediction")}

執(zhí)行以上兩行save代碼后,會在當前目錄下生成兩個文件,這就是保存的模型文件,如下圖:


最后模型對測試集的預測打印結果如下:


實例3:通過交叉驗證來模型調優(yōu)

在模型訓練過程中,會有一些參數(shù)可以選擇,比如正則化參數(shù),迭代次數(shù)等,但我們自己設定的參數(shù)值是不是最優(yōu)的呢,肯定不是的,那如何選擇最優(yōu)的參數(shù),只能憑借經(jīng)驗來選擇可信賴的比較簡單的模型,但也不一定是最優(yōu)參數(shù)。所以就需要對模型進行調優(yōu)了。我們可以給出一些可選的參數(shù),比如迭代次數(shù),給10/100/1000,正則化給0.1/0.01/0.001等,讓模型自己去組合選擇出最優(yōu)的參數(shù)搭配。
完成參數(shù)的選擇,我們用到的類為pipeline,基于整體的管道進行調優(yōu),而不是給lr模型或者分詞單獨進行調優(yōu)。實踐如下:
步驟:
準備訓練數(shù)據(jù)
配置ML管道,包含三個stage:
Tokenizer,HashingTF和lr
使用ParamGridBuilder 構造一個參數(shù)網(wǎng)格
使用CrossValidator來選擇模型和參數(shù)
CrossValidator需要一個Estimator(pipeline),一個評估器參數(shù)集合,和一個Evaluator(lr就選擇二分類評估器)
運行交叉校驗,選擇最好的參數(shù)集
準備測試數(shù)據(jù)
預測結果

實例4:通過訓練校驗分類來調優(yōu)模型

上一個實例的交叉驗證是把數(shù)據(jù)分成多份,每一份把參數(shù)組合,對模型計算評分一次。這種方式只需要把每一組參數(shù)計算一次就可以了。
而校驗分類是自動把數(shù)據(jù)分成訓練集和校驗集,例如80%的數(shù)據(jù)作為訓練集,20%的數(shù)據(jù)作為每一組參數(shù)的校驗集,每一組參數(shù)計算一次。這種方式的使用必須依賴大量數(shù)據(jù)進行訓練,如果訓練數(shù)據(jù)不夠,那么所生成的結果是不可信的。這種情況就更適合前面一種交叉驗證方式了,對于交叉驗證來說,如果數(shù)據(jù)少一些,沒關系,每一組參數(shù)會進行多次校驗,如果fold=3,會校驗3次。所以即使數(shù)據(jù)量比較少,但是因為計算了多次,3次的結果足夠進行評估,選出最優(yōu)的參數(shù)。
在這個實例中,我們使用線性回歸進行處理,線性回歸主要是用來進行預測的回歸算法,這一次用的數(shù)據(jù)集相對來說大一點。
步驟:
準備訓練和測試數(shù)據(jù)
使用ParamGridBuilder 構造一個參數(shù)網(wǎng)格
使用TrainValidationSplit來選擇模型和參數(shù)
CrossValidator需要一個Estimator,一個評估器參數(shù)集合,和一個Evaluator
運行訓練校驗分離,選擇最好的參數(shù)
在測試數(shù)據(jù)上做預測,模型是參數(shù)組合中執(zhí)行最好的一個
數(shù)據(jù)集如下圖:


進入spark-shell前先將數(shù)據(jù)預處理一下,替換掉所有的"+"符號
sed -i 's/+//g' sample_linear_regression_data.txt

代碼如下:

//加載文件
val data = sqlContext.read.format("libsvm").load("./sample_linear_regression_data.txt")
//劃分數(shù)據(jù)集
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed=12345)
//創(chuàng)建模型
val lr = new LinearRegression()
//使用ParamGridBuilder 構造一個參數(shù)網(wǎng)格
val paramGrid = new ParamGridBuilder().
addGrid(lr.elasticNetParam, Array(0.0,0.5,1.0)).
addGrid(lr.fitIntercept).
addGrid(lr.regParam, Array(0.1,0.01)).
build()
//使用TrainValidationSplit來選擇模型和參數(shù)
val trainValidationSplit = new TrainValidationSplit().
setEstimator(lr).
setEstimatorParamMaps(paramGrid).
setEvaluator(new RegressionEvaluator).
setTrainRatio(0.8)
//運行交叉校驗,選擇最好的參數(shù)集
  
//預測結果
model.transform(test).select("features","label","prediction").show()

預測結果如下圖:


最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容