一.本地向量
有如下幾個類: Vector(基類),DenseVector,SparseVector,Vectors(工廠方法,推薦用)
工廠模式是:定義一個用于創(chuàng)建對象的接口,讓子類決定實(shí)例化哪一個類,使一個類的實(shí)例化延遲到子類
import org.apache.spark.mllib.linalg.{Vectors,Vector} ? ?# linalg is short for linear algebra
val v1=Vectors.dense(1.0,2.0,3.0) ?#定義1
val v2 =Vectors.sparse(3,(1,2),(10,100)) #長度為3,第1,2個位置的值為10和100
val v3=Vectors.sparse(3,Seq((1,10),(2,100))) #結(jié)果同上
二.帶有標(biāo)簽的向量
主要應(yīng)用在有監(jiān)督學(xué)習(xí)中,二分類(0,1),多分類(0,1,2,3,....)
import org.apache.spark.mllib.Regression.LabeledPoint;
val vl1=LabeledPoint(1,Vectors.dense(1,2,3,4))
val vl2=LabeledPoint(0,Vectors.sparse(3,(1,2),(10,100)))
三.讀取LIBSVM格式的數(shù)據(jù)
: : ...
其中 是訓(xùn)練數(shù)據(jù)集的目標(biāo)值,對于分類,它是標(biāo)識某類的整數(shù)(支持多個類);對于回歸,是任意實(shí)數(shù)。 是以1開始的整數(shù),可以是不連續(xù)的;;為實(shí)數(shù),也就是我們常說的自變量。檢驗(yàn)數(shù)據(jù)文件中的label只用于計(jì)算準(zhǔn)確度或誤差,如果它是未知的,只需用一個數(shù)填寫這一欄,也可以空著不填.
例如:
0 1:10 3:19
1 1:18 3:20 4:178
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val svmfile=MLUtils.loadLibSVMFile(sc,"svmdata2")
四.創(chuàng)建本地矩陣
本地矩陣是行列號索引,值為double類型的數(shù)據(jù),存儲在單獨(dú)的機(jī)器上.支持稠密矩陣和稀疏矩陣。
與Vector和Vectors的關(guān)系類似,Matrix有對應(yīng)的Matrices
對于稀疏矩陣的壓縮方法,具體可以參考http://www.tuicool.com/articles/A3emmqi,spark默認(rèn)的為CSC格式的壓縮
import org.apache.spark.mllib.linalg.{Matrix,Matrices}
val m1=Matrices.dense(3,2,Array(1,2,3,4,5,6))
val m2=Matrices.sparse(3,2,Array(0,1,3),Array(0,2,1),Array(9,6,8))
參考csc壓縮方法,m2 手工算的結(jié)果,應(yīng)該是
(0,0)9
(2,0)6
(1,1)8
與spark計(jì)算的有出入。
五.分布式矩陣
選擇一個正確的形式去存儲大的分布式矩陣非常重要,?將分布式矩陣轉(zhuǎn)化為不同的格式需要全局的shuffle,代價(jià)很大。目前有三種類型的分布式矩陣,RowMatrix,IndexedRowMatrix,CoordinateMatrix.
什么是shuffle呢?參考http://dongxicheng.org/framework-on-yarn/apache-spark-shuffle-details/
通常shuffle分為兩部分,map階段的數(shù)據(jù)準(zhǔn)備以及Reduce階段的數(shù)據(jù)拷貝,Map階段需要根據(jù)Reduce階段的Task數(shù)量決定每個Map Task輸出的數(shù)據(jù)分片數(shù)目
RowMatrix是沒有行索引,例如一些特征向量,沒一行是一個本地向量。
IndexedRowMatrix,有行索引,可以用于識別行和執(zhí)行鏈接操作
CoordinateMatrix存成COO形式
構(gòu)造RowMatrix
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data=sc.parallelize(1 to 9,3) #RDD形式
val rows=data.map(x=>Vectors.dense(x))
val m1=new RowMatrix(rows,3,3)
m1.numRows
m1.numCols
構(gòu)造IndexedRowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val data1=sc.parallelize(1 to 12,2)
val rows1=data1.map(x=>IndexedRow(2,Vectors.dense(x)))
val mat=new IndexedRowMatrix(rows1,3,4)
mat.numRows()
mat.numCols()
構(gòu)造COO #對于稀疏矩陣比較有用,指定非空元素的行列以及value即可
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val data2=sc.parallelize(1 to 20 ,4)
val rows2=data1.map(x=>MatrixEntry(1,1,3))
val m2=new CoordinateMatrix(rows2,4,5)
#
val data3=sc.textFile("coo").map(_.split(' ')).map(_.map(_.toDouble)).map(m=>(m(0).toLong,m(1).toLong,m(2))).map(x=>new MatrixEntry(x._1,x._2,x._3))
val m3=new CoordinateMatrix(data3,3,4)
#構(gòu)造BlockMatrix
val m4=m3.toBlockMatrix()