使用Spark ML進(jìn)行數(shù)據(jù)分析

Spark版本:2.4.0
語言:Scala
任務(wù):分類

這里對(duì)數(shù)據(jù)的處理步驟如下:

  1. 載入數(shù)據(jù)
  2. 歸一化
  3. PCA降維
  4. 劃分訓(xùn)練/測(cè)試集
  5. 線性SVM分類
  6. 驗(yàn)證精度
  7. 輸出cvs格式的結(jié)果

前言

從Spark 2.0開始,Spark機(jī)器學(xué)習(xí)API是基于DataFrame的spark.ml。而之前的基于RDD的API spark.mllib已進(jìn)入維護(hù)模式。
也就是說,Spark ML是Spark MLlib的一種新的API,它主要有以下幾個(gè)優(yōu)點(diǎn):

  • 面向DataFrame,在RDD基礎(chǔ)上進(jìn)一步封裝,提供更強(qiáng)大更方便的API
  • Pipeline功能,便于實(shí)現(xiàn)復(fù)雜的機(jī)器學(xué)習(xí)模型
  • 性能提升

基于Pipeline的Spark ML中的幾個(gè)概念:

  • DataFrame:從Spark SQL 的引用的概念,表示一個(gè)數(shù)據(jù)集,它可以容納多種數(shù)據(jù)類型。例如可以存儲(chǔ)文本,特征向量,標(biāo)簽和預(yù)測(cè)值等
  • Transformer:是可以將一個(gè)DataFrame變換成另一個(gè)DataFrame的算法。例如,一個(gè)訓(xùn)練好的模型是一個(gè)Transformer,通過transform方法,將原始DataFrame轉(zhuǎn)化為一個(gè)包含預(yù)測(cè)值的DataFrame
  • Estimator:是一個(gè)算法,接受一個(gè)DataFrame,產(chǎn)生一個(gè)Transformer。例如,一個(gè)學(xué)習(xí)算法(如PCA,SVM)是一個(gè)Estimator,通過fit方法,訓(xùn)練DataFrame并產(chǎn)生模型Transformer
  • Pipeline: Pipeline將多個(gè)Transformers和Estimators連接起來組合成一個(gè)機(jī)器學(xué)習(xí)工作流程
  • Parameter:用于對(duì)Transformers和Estimators指定參數(shù)的統(tǒng)一接口

本次實(shí)驗(yàn)使用的是Spark ML的API

首先要?jiǎng)?chuàng)建SparkSession

// 創(chuàng)建SparkSession
val spark = SparkSession
  .builder
  .appName("LinearSVCExample")
  .master("local")
  .getOrCreate()

數(shù)據(jù)處理步驟

1 載入數(shù)據(jù)

數(shù)據(jù)載入的方式有多種,這里使用libsvm格式的數(shù)據(jù)作為數(shù)據(jù)源,libsvm格式常被用來存儲(chǔ)稀疏的矩陣數(shù)據(jù),它每一行的格式如下:

label index1:value1 index2:value2 ...

第一個(gè)值是標(biāo)簽,后面是由“列號(hào):值”組成鍵值對(duì),只需要記錄非0項(xiàng)即可。

數(shù)據(jù)加載使用load方法完成:

// 加載訓(xùn)練數(shù)據(jù),生成DataFrame
val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

2 歸一化

作為數(shù)據(jù)預(yù)處理的第一步,需要對(duì)原始數(shù)據(jù)做歸一化處理,即把原始數(shù)據(jù)的每一維減去其平均值,再除以其標(biāo)準(zhǔn)差,使得數(shù)據(jù)總體分布為以0為中心,且標(biāo)準(zhǔn)差為1。

// 歸一化
val scaler = new StandardScaler()
   .setInputCol("features")
   .setOutputCol("scaledFeatures")
   .setWithMean(true)
   .setWithStd(true)
   .fit(data)

val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")

3 PCA降維

有時(shí)數(shù)據(jù)的維數(shù)可能很大,直接進(jìn)行分類不僅計(jì)算量很大,而且對(duì)數(shù)據(jù)量的要求也很高,常常會(huì)出現(xiàn)過擬合。因此需要進(jìn)行降維,常用的是主成分分析(PCA)算法。

// 創(chuàng)建PCA模型,生成Transformer
val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(5)
  .fit(scaleddata)

//  transform數(shù)據(jù),生成主成分特征
val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")

4 劃分訓(xùn)練/測(cè)試集

經(jīng)過降維的數(shù)據(jù)就可以拿來訓(xùn)練分類器了,但是在此之前要將數(shù)據(jù)劃分為訓(xùn)練集和測(cè)試集,分類器只能在訓(xùn)練集上進(jìn)行訓(xùn)練,在測(cè)試集上驗(yàn)證其分類精度。Spark提供了很方便的接口,按給定的比例隨機(jī)劃分訓(xùn)練/測(cè)試集。

// 將經(jīng)過主成分分析的數(shù)據(jù),按比例劃分為訓(xùn)練數(shù)據(jù)和測(cè)試數(shù)據(jù)
val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)

5 線性SVM分類

這一步構(gòu)建線性SVM模型,設(shè)置最大迭代次數(shù)和正則化項(xiàng)的系數(shù),使用訓(xùn)練集進(jìn)行訓(xùn)練。

// 創(chuàng)建SVC分類器(Estimator)
val lsvc = new LinearSVC()
  .setMaxIter(10)
  .setRegParam(0.1)

// 訓(xùn)練分類器,生成模型(Transformer)
val lsvcModel = lsvc.fit(trainingData)

6 驗(yàn)證精度

將訓(xùn)練好的分類器作用于測(cè)試集上,獲得分類結(jié)果。

分類結(jié)果的好壞有很多種衡量的方法,如查準(zhǔn)率、查全率等,這里我們使用最簡(jiǎn)單的一種衡量標(biāo)準(zhǔn)——精度,即正確分類的樣本數(shù)占總樣本數(shù)的比值。

// 用訓(xùn)練好的模型,驗(yàn)證測(cè)試數(shù)據(jù)
val res = lsvcModel.transform(testData).select("prediction","label")

// 計(jì)算精度
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(res)

println(s"Accuracy = ${accuracy}")

7 輸出cvs格式的結(jié)果

Spark的DataFrame類型支持導(dǎo)出多種格式,這里以常用的csv格式為例。

這里輸出的目的是為了使用Python進(jìn)行可視化,在降維后進(jìn)行,可以直觀的看出降維后的數(shù)據(jù)是否明顯可分。

使用VectorAssembler,將標(biāo)簽與特征合并為一列,再進(jìn)行輸出。

(這里是將合并后的列轉(zhuǎn)換為String再輸出的,因此輸出的csv文件是帶有引號(hào)和括號(hào)的,至于為什么要這樣輸出,請(qǐng)看第二部分)

// 將標(biāo)簽與主成分合成為一列
val assembler = new VectorAssembler()
  .setInputCols(Array("label","features"))
  .setOutputCol("assemble")
val output = assembler.transform(pcaResult)

// 輸出csv格式的標(biāo)簽和主成分,便于可視化
val ass = output.select(output("assemble").cast("string"))
ass.write.mode("overwrite").csv("output.csv")

當(dāng)然也可以用同樣的方法輸出訓(xùn)練/預(yù)測(cè)的結(jié)果,這里就不再詳細(xì)介紹。

遇到的問題

完成這個(gè)簡(jiǎn)單的分類實(shí)驗(yàn),花了我兩天多的時(shí)間,從配置環(huán)境到熟悉API,再到遇見各種奇怪的問題……這里我都把他們記錄下來,供以后參考。

1 配置環(huán)境

起初,我想通過在本機(jī)編寫代碼,然后訪問安裝在虛擬機(jī)中的Spark節(jié)點(diǎn)(單節(jié)點(diǎn))這種方式進(jìn)行實(shí)驗(yàn)的(不是提交jar包然后執(zhí)行spark-submit),也就在是創(chuàng)建SparkSession時(shí),指定虛擬機(jī)中的Spark:

val spark = SparkSession
  .builder
  .appName("LinearSVCExample")
  .master("spark://192.168.1.128:7077") // 虛擬機(jī)IP
  .getOrCreate()

然而,這樣并沒有成功。遇到的問題有:

  • 拒絕連接
  • Spark的worker里可以查看到提交的任務(wù),但是一直處于等待狀態(tài),沒有響應(yīng)。并且提示:
    WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources(實(shí)際上,內(nèi)存和CPU是夠的)
  • 報(bào)錯(cuò)RuntimeException: java.io.EOFException......

在嘗試過各種方案都沒有解決問題之后,我放棄了,最后還是在本機(jī)中安裝Spark,在local模式下運(yùn)行。(如果有同學(xué)成功實(shí)現(xiàn)上面的訪問方法,歡迎留言告訴我~

至于如何在本機(jī)(Windows)安裝Spark,百度搜索即可

2 導(dǎo)出CSV格式的數(shù)據(jù)

將DataFrame導(dǎo)出為cvs格式的時(shí)候,遇到了這個(gè)問題:
java.lang.UnsupportedOperationException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.

而我要導(dǎo)出的DataFrame只是一個(gè)多行數(shù)組而已?。?/p>

image.png

根據(jù)StackOverflow上面的提問,Spark的csv導(dǎo)出不支持復(fù)雜結(jié)構(gòu),array<double>都不行。

然后有人給了一種辦法,把數(shù)組轉(zhuǎn)化為String,就可以導(dǎo)出了。

但是導(dǎo)出的結(jié)果是這樣的:

image.png

需要進(jìn)一步處理。

所以還不如手動(dòng)實(shí)現(xiàn)導(dǎo)出csv文件,或者你有更好的辦法,歡迎留言告訴我,非常感謝~

3 PCA維數(shù)限制

當(dāng)我想跑一個(gè)10萬維度的數(shù)據(jù)時(shí),程序運(yùn)行到PCA報(bào)錯(cuò):
java.lang.IllegalArgumentException: Argument with more than 65535 cols: 109600

原來,Spark ML的PCA不支持超過65535維的數(shù)據(jù)。參見源碼

4 SVM核

翻閱了Spark ML文檔,只找到Linear Support Vector Machine,即線性核的支持向量機(jī)。對(duì)于高斯核和其他非線性的核,Spark ML貌似還沒有實(shí)現(xiàn)。

image.png

5 withColumn操作

起初我認(rèn)為對(duì)數(shù)據(jù)進(jìn)行降維前,需要把DataFrame中的標(biāo)簽label與特征feature分開,然后對(duì)feature進(jìn)行降維,再使用withColumn方法,把label與降維后的feature組合成新的DataFrame。

發(fā)現(xiàn)這樣既不可行也沒有必要。

首先,withColumn只能添加當(dāng)前DataFrame的數(shù)據(jù)(對(duì)DataFrame某一列進(jìn)行一些操作,再添加到這個(gè)DataFrame本身),不能把來自于不同DataFrame的Column添加到當(dāng)前DataFrame中。

其次,PCA降維時(shí),只需指定InputCoulum作為特征列,指定OutputColumn作為輸出列,其他列的存在并不影響PCA的執(zhí)行,PCA也不會(huì)改變它們,在新生成的DataFrame中依然會(huì)保留原來所有Column,并且添加上降維后的數(shù)據(jù)Column,后面再使用select方法選擇出所需的Column即可。

完整代碼(Pipeline版)

import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.classification.LinearSVC
import org.apache.spark.sql.SparkSession

object Hello {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "D:\\hadoop-2.8.3")
    //  屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 創(chuàng)建sparkSession
    val spark = SparkSession
      .builder
      .appName("LinearSVCExample")
      .master("local")
      .getOrCreate()

    // 加載訓(xùn)練數(shù)據(jù),生成DataFrame
    val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

    println(data.count())

    // 歸一化
    val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
      .setWithMean(true)
      .setWithStd(true)
      .fit(data)

    val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")

    // 創(chuàng)建PCA模型,生成Transformer
    val pca = new PCA()
      .setInputCol("features")
      .setOutputCol("pcaFeatures")
      .setK(5)
      .fit(scaleddata)

    //  transform 數(shù)據(jù),生成主成分特征
    val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")

    //  pcaResult.show(truncate=false)

    // 將標(biāo)簽與主成分合成為一列
    val assembler = new VectorAssembler()
      .setInputCols(Array("label","features"))
      .setOutputCol("assemble")
    val output = assembler.transform(pcaResult)

    // 輸出csv格式的標(biāo)簽和主成分,便于可視化
    val ass = output.select(output("assemble").cast("string"))
    ass.write.mode("overwrite").csv("output.csv")

    // 將經(jīng)過主成分分析的數(shù)據(jù),按比例劃分為訓(xùn)練數(shù)據(jù)和測(cè)試數(shù)據(jù)
    val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)

    // 創(chuàng)建SVC分類器(Estimator)
    val lsvc = new LinearSVC()
      .setMaxIter(10)
      .setRegParam(0.1)

    // 創(chuàng)建pipeline, 將上述步驟連接起來
    val pipeline = new Pipeline()
      .setStages(Array(scaler, pca, lsvc))
    
    // 使用串聯(lián)好的模型在訓(xùn)練集上訓(xùn)練
    val model = pipeline.fit(trainingData)
    
    // 在測(cè)試集上測(cè)試
    val predictions = model.transform(testData).select("prediction","label")

    // 計(jì)算精度
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)

    println(s"Accuracy = ${accuracy}")

    spark.stop()
  }
}

最后的精度為1.0,這里使用的測(cè)試數(shù)據(jù)比較好分,從PCA后對(duì)前兩維的可視化結(jié)果可以看出:

image.png

參考資料

Spark ML文檔
DataFrame API
PCA列數(shù)限制-源碼
導(dǎo)出cvs文件方法-stackoverflow
無法導(dǎo)出csv文件-stackoverflow
示例數(shù)據(jù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容