矩陣RowMatrix是最基礎(chǔ)的分布式矩陣類型。每行是一個本地向量,行索引無實際意義(即無法直接使用)。數(shù)據(jù)存儲在一個由行組成的RDD中,其中每一行都使用一個本地向量來進行存儲。由于行是通過本地向量來實現(xiàn)的,故列數(shù)(即行的維度)被限制在普通整型(integer)的范圍內(nèi)。在實際使用時,由于單機處理本地向量的存儲和通信代價,行維度更是需要被控制在一個更小的范圍之內(nèi)。RowMatrix可通過一個RDD[Vector]的實例來創(chuàng)建,如下代碼所示:
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.mllib.linalg.{Vector,Vectors}
scala> import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
// 創(chuàng)建兩個本地向量dv1 dv2
scala> val dv1 : Vector = Vectors.dense(1.0,2.0,3.0)
dv1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]
scala> val dv2 : Vector = Vectors.dense(2.0,3.0,4.0)
dv2: org.apache.spark.mllib.linalg.Vector = [2.0,3.0,4.0]
// 使用兩個本地向量創(chuàng)建一個RDD[Vector]
scala> val rows : RDD[Vector] = sc.parallelize(Array(dv1,dv2))
rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ParallelCollectionRDD[13] at parallelize at :38
// 通過RDD[Vector]創(chuàng)建一個行矩陣
scala> val mat : RowMatrix = new RowMatrix(rows)
mat: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@76fc0fa
//可以使用numRows()和numCols()方法得到行數(shù)和列數(shù)
scala> mat.numRows()
res0: Long = 2
scala> mat.numCols()
res1: Long = 3
scala> mat.rows.foreach(println)
[1.0,2.0,3.0]
[2.0,3.0,4.0]
在獲得RowMatrix的實例后,我們可以通過其自帶的computeColumnSummaryStatistics()方法獲取該矩陣的一些統(tǒng)計摘要信息,并可以對其進行QR分解,SVD分解和PCA分解,這一部分內(nèi)容將在特征降維的章節(jié)詳細解說,這里不再敘述。
統(tǒng)計摘要信息的獲取如下代碼段所示(接上代碼段):
// 通過computeColumnSummaryStatistics()方法獲取統(tǒng)計摘要
scala> val summary = mat.computeColumnSummaryStatistics()
// 可以通過summary實例來獲取矩陣的相關(guān)統(tǒng)計信息,例如行數(shù)
scala> summary.count
res2: Long = 2
// 最大向量
scala> summary.max
res3: org.apache.spark.mllib.linalg.Vector = [2.0,3.0,4.0]
// 方差向量
scala> summary.variance
res4: org.apache.spark.mllib.linalg.Vector = [0.5,0.5,0.5]
// 平均向量
scala> summary.mean
res5: org.apache.spark.mllib.linalg.Vector = [1.5,2.5,3.5]
// L1范數(shù)向量
scala> summary.normL1
res6: org.apache.spark.mllib.linalg.Vector = [3.0,5.0,7.0]
(二)索引行矩陣(IndexedRowMatrix)
索引行矩陣IndexedRowMatrix與RowMatrix相似,但它的每一行都帶有一個有意義的行索引值,這個索引值可以被用來識別不同行,或是進行諸如join之類的操作。其數(shù)據(jù)存儲在一個由IndexedRow組成的RDD里,即每一行都是一個帶長整型索引的本地向量。
與RowMatrix類似,IndexedRowMatrix的實例可以通過RDD[IndexedRow]實例來創(chuàng)建。如下代碼段所示(接上例):
scala>import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
// 通過本地向量dv1 dv2來創(chuàng)建對應(yīng)的IndexedRow
// 在創(chuàng)建時可以給定行的索引值,如這里給dv1的向量賦索引值1,dv2賦索引值2
scala> val idxr1 = IndexedRow(1,dv1)
idxr1: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(1,[1.0,2.0,3.0])
scala> val idxr2 = IndexedRow(2,dv2)
idxr2: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(2,[2.0,3.0,4.0])
// 通過IndexedRow創(chuàng)建RDD[IndexedRow]
scala> val idxrows = sc.parallelize(Array(idxr1,idxr2))
idxrows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.IndexedRow] = ParallelCollectionRDD[14] at parallelize at :45
// 通過RDD[IndexedRow]創(chuàng)建一個索引行矩陣
scala> val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
idxmat: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@532887bc
//打印
scala> idxmat.rows.foreach(println)
IndexedRow(1,[1.0,2.0,3.0])
IndexedRow(2,[2.0,3.0,4.0])
(三)坐標(biāo)矩陣(Coordinate Matrix)
坐標(biāo)矩陣CoordinateMatrix是一個基于矩陣項構(gòu)成的RDD的分布式矩陣。每一個矩陣項MatrixEntry都是一個三元組:(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是該位置的值。坐標(biāo)矩陣一般在矩陣的兩個維度都很大,且矩陣非常稀疏的時候使用。
CoordinateMatrix實例可通過RDD[MatrixEntry]實例來創(chuàng)建,其中每一個矩陣項都是一個(rowIndex, colIndex, elem)的三元組:
scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
// 創(chuàng)建兩個矩陣項ent1和ent2,每一個矩陣項都是由索引和值構(gòu)成的三元組
scala> val ent1 = new MatrixEntry(0,1,0.5)
ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,1,0.5)
scala> val ent2 = new MatrixEntry(2,2,1.8)
ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.8)
// 創(chuàng)建RDD[MatrixEntry]
scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2))
entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[15] at parallelize at :42
// 通過RDD[MatrixEntry]創(chuàng)建一個坐標(biāo)矩陣
scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@25b2d465
//打印
scala> coordMat.entries.foreach(println)
MatrixEntry(0,1,0.5)
MatrixEntry(2,2,1.8)
坐標(biāo)矩陣可以通過transpose()方法對矩陣進行轉(zhuǎn)置操作,并可以通過自帶的toIndexedRowMatrix()方法轉(zhuǎn)換成索引行矩陣IndexedRowMatrix。但目前暫不支持CoordinateMatrix的其他計算操作。
// 將coordMat進行轉(zhuǎn)置
scala> val transMat: CoordinateMatrix = coordMat.transpose()
transMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@c1ee50
scala> transMat.entries.foreach(println)
MatrixEntry(1,0,0.5)
MatrixEntry(2,2,1.8)
// 將坐標(biāo)矩陣轉(zhuǎn)換成一個索引行矩陣
scala> val indexedRowMatrix = transMat.toIndexedRowMatrix()
indexedRowMatrix: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@7ee7e1bb
scala> indexedRowMatrix.rows.foreach(println)
IndexedRow(1,(3,[0],[0.5]))
IndexedRow(2,(3,[2],[1.8]))
####(四)分塊矩陣(Block Matrix)
分塊矩陣是基于矩陣塊MatrixBlock構(gòu)成的RDD的分布式矩陣,其中每一個矩陣塊MatrixBlock都是一個元組((Int, Int), Matrix),其中(Int, Int)是塊的索引,而Matrix則是在對應(yīng)位置的子矩陣(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock決定,默認值均為1024。分塊矩陣支持和另一個分塊矩陣進行加法操作和乘法操作,并提供了一個支持方法validate()來確認分塊矩陣是否創(chuàng)建成功。
分塊矩陣可由索引行矩陣IndexedRowMatrix或坐標(biāo)矩陣CoordinateMatrix調(diào)用toBlockMatrix()方法來進行轉(zhuǎn)換,該方法將矩陣劃分成尺寸默認為1024x1024的分塊,可以在調(diào)用toBlockMatrix(rowsPerBlock, colsPerBlock)方法時傳入?yún)?shù)來調(diào)整分塊的尺寸。
下面以矩陣A(如圖)為例,先利用矩陣項MatrixEntry將其構(gòu)造成坐標(biāo)矩陣,再轉(zhuǎn)化成如圖所示的4個分塊矩陣,最后對矩陣A與其轉(zhuǎn)置進行乘法運算:

scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
scala> import org.apache.spark.mllib.linalg.distributed.BlockMatrix
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
// 創(chuàng)建8個矩陣項,每一個矩陣項都是由索引和值構(gòu)成的三元組
scala> val ent1 = new MatrixEntry(0,0,1)
...
scala> val ent2 = new MatrixEntry(1,1,1)
...
scala> val ent3 = new MatrixEntry(2,0,-1)
...
scala> val ent4 = new MatrixEntry(2,1,2)
...
scala> val ent5 = new MatrixEntry(2,2,1)
...
scala> val ent6 = new MatrixEntry(3,0,1)
...
scala> val ent7 = new MatrixEntry(3,1,1)
...
scala> val ent8 = new MatrixEntry(3,3,1)
...
// 創(chuàng)建RDD[MatrixEntry]
scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8))
entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[21] at parallelize at :57
// 通過RDD[MatrixEntry]創(chuàng)建一個坐標(biāo)矩陣
scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@31c5fb43
// 將坐標(biāo)矩陣轉(zhuǎn)換成2x2的分塊矩陣并存儲,尺寸通過參數(shù)傳入
val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache()
matA: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@26b1df2c
// 可以用validate()方法判斷是否分塊成功
matA.validate()
構(gòu)建成功后,可通過toLocalMatrix轉(zhuǎn)換成本地矩陣,并查看其分塊情況:
scala> matA.toLocalMatrix
res31: org.apache.spark.mllib.linalg.Matrix =
1.0? 0.0? 0.0? 0.0
0.0? 1.0? 0.0? 0.0
-1.0? 2.0? 1.0? 0.0
1.0? 1.0? 0.0? 1.0
// 查看其分塊情況
scala> matA.numColBlocks
res12: Int = 2
scala> matA.numRowBlocks
res13: Int = 2
// 計算矩陣A和其轉(zhuǎn)置矩陣的積矩陣
scala> val ata = matA.transpose.multiply(matA)
ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@3644e451
scala> ata.toLocalMatrix
res1: org.apache.spark.mllib.linalg.Matrix =
3.0? -1.0? -1.0? 1.0
-1.0? 6.0? 2.0? 1.0
-1.0? 2.0? 1.0? 0.0
1.0? 1.0? 0.0? 1.0
分塊矩陣BlockMatrix將矩陣分成一系列矩陣塊,底層由矩陣塊構(gòu)成的RDD來進行數(shù)據(jù)存儲。值得指出的是,用于生成分布式矩陣的底層RDD必須是已經(jīng)確定(Deterministic)的,因為矩陣的尺寸將被存儲下來,所以使用未確定的RDD將會導(dǎo)致錯誤。而且,不同類型的分布式矩陣之間的轉(zhuǎn)換需要進行一個全局的shuffle操作,非常耗費資源。所以,根據(jù)數(shù)據(jù)本身的性質(zhì)和應(yīng)用需求來選取恰當(dāng)?shù)姆植际骄仃嚧鎯︻愋褪欠浅V匾摹?/p>
參考路徑:http://dblab.xmu.edu.cn/blog/1175/