Spark版本:2.4.0
語言:Scala
任務(wù):分類
這里對(duì)數(shù)據(jù)的處理步驟如下:
- 載入數(shù)據(jù)
- 歸一化
- PCA降維
- 劃分訓(xùn)練/測(cè)試集
- 線性SVM分類
- 驗(yàn)證精度
- 輸出cvs格式的結(jié)果
前言
從Spark 2.0開始,Spark機(jī)器學(xué)習(xí)API是基于DataFrame的spark.ml。而之前的基于RDD的API spark.mllib已進(jìn)入維護(hù)模式。
也就是說,Spark ML是Spark MLlib的一種新的API,它主要有以下幾個(gè)優(yōu)點(diǎn):
- 面向DataFrame,在RDD基礎(chǔ)上進(jìn)一步封裝,提供更強(qiáng)大更方便的API
- Pipeline功能,便于實(shí)現(xiàn)復(fù)雜的機(jī)器學(xué)習(xí)模型
- 性能提升
基于Pipeline的Spark ML中的幾個(gè)概念:
- DataFrame:從Spark SQL 的引用的概念,表示一個(gè)數(shù)據(jù)集,它可以容納多種數(shù)據(jù)類型。例如可以存儲(chǔ)文本,特征向量,標(biāo)簽和預(yù)測(cè)值等
- Transformer:是可以將一個(gè)DataFrame變換成另一個(gè)DataFrame的算法。例如,一個(gè)訓(xùn)練好的模型是一個(gè)Transformer,通過transform方法,將原始DataFrame轉(zhuǎn)化為一個(gè)包含預(yù)測(cè)值的DataFrame
- Estimator:是一個(gè)算法,接受一個(gè)DataFrame,產(chǎn)生一個(gè)Transformer。例如,一個(gè)學(xué)習(xí)算法(如PCA,SVM)是一個(gè)Estimator,通過fit方法,訓(xùn)練DataFrame并產(chǎn)生模型Transformer
- Pipeline: Pipeline將多個(gè)Transformers和Estimators連接起來組合成一個(gè)機(jī)器學(xué)習(xí)工作流程
- Parameter:用于對(duì)Transformers和Estimators指定參數(shù)的統(tǒng)一接口
本次實(shí)驗(yàn)使用的是Spark ML的API
首先要?jiǎng)?chuàng)建SparkSession
// 創(chuàng)建SparkSession
val spark = SparkSession
.builder
.appName("LinearSVCExample")
.master("local")
.getOrCreate()
數(shù)據(jù)處理步驟
1 載入數(shù)據(jù)
數(shù)據(jù)載入的方式有多種,這里使用libsvm格式的數(shù)據(jù)作為數(shù)據(jù)源,libsvm格式常被用來存儲(chǔ)稀疏的矩陣數(shù)據(jù),它每一行的格式如下:
label index1:value1 index2:value2 ...
第一個(gè)值是標(biāo)簽,后面是由“列號(hào):值”組成鍵值對(duì),只需要記錄非0項(xiàng)即可。
數(shù)據(jù)加載使用load方法完成:
// 加載訓(xùn)練數(shù)據(jù),生成DataFrame
val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
2 歸一化
作為數(shù)據(jù)預(yù)處理的第一步,需要對(duì)原始數(shù)據(jù)做歸一化處理,即把原始數(shù)據(jù)的每一維減去其平均值,再除以其標(biāo)準(zhǔn)差,使得數(shù)據(jù)總體分布為以0為中心,且標(biāo)準(zhǔn)差為1。
// 歸一化
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithMean(true)
.setWithStd(true)
.fit(data)
val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")
3 PCA降維
有時(shí)數(shù)據(jù)的維數(shù)可能很大,直接進(jìn)行分類不僅計(jì)算量很大,而且對(duì)數(shù)據(jù)量的要求也很高,常常會(huì)出現(xiàn)過擬合。因此需要進(jìn)行降維,常用的是主成分分析(PCA)算法。
// 創(chuàng)建PCA模型,生成Transformer
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(5)
.fit(scaleddata)
// transform數(shù)據(jù),生成主成分特征
val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")
4 劃分訓(xùn)練/測(cè)試集
經(jīng)過降維的數(shù)據(jù)就可以拿來訓(xùn)練分類器了,但是在此之前要將數(shù)據(jù)劃分為訓(xùn)練集和測(cè)試集,分類器只能在訓(xùn)練集上進(jìn)行訓(xùn)練,在測(cè)試集上驗(yàn)證其分類精度。Spark提供了很方便的接口,按給定的比例隨機(jī)劃分訓(xùn)練/測(cè)試集。
// 將經(jīng)過主成分分析的數(shù)據(jù),按比例劃分為訓(xùn)練數(shù)據(jù)和測(cè)試數(shù)據(jù)
val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)
5 線性SVM分類
這一步構(gòu)建線性SVM模型,設(shè)置最大迭代次數(shù)和正則化項(xiàng)的系數(shù),使用訓(xùn)練集進(jìn)行訓(xùn)練。
// 創(chuàng)建SVC分類器(Estimator)
val lsvc = new LinearSVC()
.setMaxIter(10)
.setRegParam(0.1)
// 訓(xùn)練分類器,生成模型(Transformer)
val lsvcModel = lsvc.fit(trainingData)
6 驗(yàn)證精度
將訓(xùn)練好的分類器作用于測(cè)試集上,獲得分類結(jié)果。
分類結(jié)果的好壞有很多種衡量的方法,如查準(zhǔn)率、查全率等,這里我們使用最簡(jiǎn)單的一種衡量標(biāo)準(zhǔn)——精度,即正確分類的樣本數(shù)占總樣本數(shù)的比值。
// 用訓(xùn)練好的模型,驗(yàn)證測(cè)試數(shù)據(jù)
val res = lsvcModel.transform(testData).select("prediction","label")
// 計(jì)算精度
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(res)
println(s"Accuracy = ${accuracy}")
7 輸出cvs格式的結(jié)果
Spark的DataFrame類型支持導(dǎo)出多種格式,這里以常用的csv格式為例。
這里輸出的目的是為了使用Python進(jìn)行可視化,在降維后進(jìn)行,可以直觀的看出降維后的數(shù)據(jù)是否明顯可分。
使用VectorAssembler,將標(biāo)簽與特征合并為一列,再進(jìn)行輸出。
(這里是將合并后的列轉(zhuǎn)換為String再輸出的,因此輸出的csv文件是帶有引號(hào)和括號(hào)的,至于為什么要這樣輸出,請(qǐng)看第二部分)
// 將標(biāo)簽與主成分合成為一列
val assembler = new VectorAssembler()
.setInputCols(Array("label","features"))
.setOutputCol("assemble")
val output = assembler.transform(pcaResult)
// 輸出csv格式的標(biāo)簽和主成分,便于可視化
val ass = output.select(output("assemble").cast("string"))
ass.write.mode("overwrite").csv("output.csv")
當(dāng)然也可以用同樣的方法輸出訓(xùn)練/預(yù)測(cè)的結(jié)果,這里就不再詳細(xì)介紹。
遇到的問題
完成這個(gè)簡(jiǎn)單的分類實(shí)驗(yàn),花了我兩天多的時(shí)間,從配置環(huán)境到熟悉API,再到遇見各種奇怪的問題……這里我都把他們記錄下來,供以后參考。
1 配置環(huán)境
起初,我想通過在本機(jī)編寫代碼,然后訪問安裝在虛擬機(jī)中的Spark節(jié)點(diǎn)(單節(jié)點(diǎn))這種方式進(jìn)行實(shí)驗(yàn)的(不是提交jar包然后執(zhí)行spark-submit),也就在是創(chuàng)建SparkSession時(shí),指定虛擬機(jī)中的Spark:
val spark = SparkSession
.builder
.appName("LinearSVCExample")
.master("spark://192.168.1.128:7077") // 虛擬機(jī)IP
.getOrCreate()
然而,這樣并沒有成功。遇到的問題有:
- 拒絕連接
- Spark的worker里可以查看到提交的任務(wù),但是一直處于等待狀態(tài),沒有響應(yīng)。并且提示:
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources(實(shí)際上,內(nèi)存和CPU是夠的) - 報(bào)錯(cuò)
RuntimeException: java.io.EOFException......
在嘗試過各種方案都沒有解決問題之后,我放棄了,最后還是在本機(jī)中安裝Spark,在local模式下運(yùn)行。(如果有同學(xué)成功實(shí)現(xiàn)上面的訪問方法,歡迎留言告訴我~)
至于如何在本機(jī)(Windows)安裝Spark,百度搜索即可
2 導(dǎo)出CSV格式的數(shù)據(jù)
將DataFrame導(dǎo)出為cvs格式的時(shí)候,遇到了這個(gè)問題:
java.lang.UnsupportedOperationException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.
而我要導(dǎo)出的DataFrame只是一個(gè)多行數(shù)組而已?。?/p>

根據(jù)StackOverflow上面的提問,Spark的csv導(dǎo)出不支持復(fù)雜結(jié)構(gòu),array<double>都不行。
然后有人給了一種辦法,把數(shù)組轉(zhuǎn)化為String,就可以導(dǎo)出了。
但是導(dǎo)出的結(jié)果是這樣的:

需要進(jìn)一步處理。
所以還不如手動(dòng)實(shí)現(xiàn)導(dǎo)出csv文件,或者你有更好的辦法,歡迎留言告訴我,非常感謝~
3 PCA維數(shù)限制
當(dāng)我想跑一個(gè)10萬維度的數(shù)據(jù)時(shí),程序運(yùn)行到PCA報(bào)錯(cuò):
java.lang.IllegalArgumentException: Argument with more than 65535 cols: 109600
原來,Spark ML的PCA不支持超過65535維的數(shù)據(jù)。參見源碼
4 SVM核
翻閱了Spark ML文檔,只找到Linear Support Vector Machine,即線性核的支持向量機(jī)。對(duì)于高斯核和其他非線性的核,Spark ML貌似還沒有實(shí)現(xiàn)。

5 withColumn操作
起初我認(rèn)為對(duì)數(shù)據(jù)進(jìn)行降維前,需要把DataFrame中的標(biāo)簽label與特征feature分開,然后對(duì)feature進(jìn)行降維,再使用withColumn方法,把label與降維后的feature組合成新的DataFrame。
發(fā)現(xiàn)這樣既不可行也沒有必要。
首先,withColumn只能添加當(dāng)前DataFrame的數(shù)據(jù)(對(duì)DataFrame某一列進(jìn)行一些操作,再添加到這個(gè)DataFrame本身),不能把來自于不同DataFrame的Column添加到當(dāng)前DataFrame中。
其次,PCA降維時(shí),只需指定InputCoulum作為特征列,指定OutputColumn作為輸出列,其他列的存在并不影響PCA的執(zhí)行,PCA也不會(huì)改變它們,在新生成的DataFrame中依然會(huì)保留原來所有Column,并且添加上降維后的數(shù)據(jù)Column,后面再使用select方法選擇出所需的Column即可。
完整代碼(Pipeline版)
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.classification.LinearSVC
import org.apache.spark.sql.SparkSession
object Hello {
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.8.3")
// 屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 創(chuàng)建sparkSession
val spark = SparkSession
.builder
.appName("LinearSVCExample")
.master("local")
.getOrCreate()
// 加載訓(xùn)練數(shù)據(jù),生成DataFrame
val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
println(data.count())
// 歸一化
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithMean(true)
.setWithStd(true)
.fit(data)
val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")
// 創(chuàng)建PCA模型,生成Transformer
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(5)
.fit(scaleddata)
// transform 數(shù)據(jù),生成主成分特征
val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")
// pcaResult.show(truncate=false)
// 將標(biāo)簽與主成分合成為一列
val assembler = new VectorAssembler()
.setInputCols(Array("label","features"))
.setOutputCol("assemble")
val output = assembler.transform(pcaResult)
// 輸出csv格式的標(biāo)簽和主成分,便于可視化
val ass = output.select(output("assemble").cast("string"))
ass.write.mode("overwrite").csv("output.csv")
// 將經(jīng)過主成分分析的數(shù)據(jù),按比例劃分為訓(xùn)練數(shù)據(jù)和測(cè)試數(shù)據(jù)
val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)
// 創(chuàng)建SVC分類器(Estimator)
val lsvc = new LinearSVC()
.setMaxIter(10)
.setRegParam(0.1)
// 創(chuàng)建pipeline, 將上述步驟連接起來
val pipeline = new Pipeline()
.setStages(Array(scaler, pca, lsvc))
// 使用串聯(lián)好的模型在訓(xùn)練集上訓(xùn)練
val model = pipeline.fit(trainingData)
// 在測(cè)試集上測(cè)試
val predictions = model.transform(testData).select("prediction","label")
// 計(jì)算精度
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Accuracy = ${accuracy}")
spark.stop()
}
}
最后的精度為1.0,這里使用的測(cè)試數(shù)據(jù)比較好分,從PCA后對(duì)前兩維的可視化結(jié)果可以看出:

參考資料
Spark ML文檔
DataFrame API
PCA列數(shù)限制-源碼
導(dǎo)出cvs文件方法-stackoverflow
無法導(dǎo)出csv文件-stackoverflow
示例數(shù)據(jù)