分布式矩陣(Distributed Matrix)

矩陣RowMatrix是最基礎(chǔ)的分布式矩陣類型。每行是一個本地向量,行索引無實際意義(即無法直接使用)。數(shù)據(jù)存儲在一個由行組成的RDD中,其中每一行都使用一個本地向量來進行存儲。由于行是通過本地向量來實現(xiàn)的,故列數(shù)(即行的維度)被限制在普通整型(integer)的范圍內(nèi)。在實際使用時,由于單機處理本地向量的存儲和通信代價,行維度更是需要被控制在一個更小的范圍之內(nèi)。RowMatrix可通過一個RDD[Vector]的實例來創(chuàng)建,如下代碼所示:

scala> import org.apache.spark.rdd.RDD

import org.apache.spark.rdd.RDD

scala> import org.apache.spark.mllib.linalg.{Vector,Vectors}

import org.apache.spark.mllib.linalg.{Vector,Vectors}

scala> import org.apache.spark.mllib.linalg.distributed.RowMatrix

import org.apache.spark.mllib.linalg.distributed.RowMatrix

// 創(chuàng)建兩個本地向量dv1 dv2

scala> val dv1 : Vector = Vectors.dense(1.0,2.0,3.0)

dv1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]

scala> val dv2 : Vector = Vectors.dense(2.0,3.0,4.0)

dv2: org.apache.spark.mllib.linalg.Vector = [2.0,3.0,4.0]

// 使用兩個本地向量創(chuàng)建一個RDD[Vector]

scala> val rows : RDD[Vector] = sc.parallelize(Array(dv1,dv2))

rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ParallelCollectionRDD[13] at parallelize at :38

// 通過RDD[Vector]創(chuàng)建一個行矩陣

scala> val mat : RowMatrix = new RowMatrix(rows)

mat: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@76fc0fa

//可以使用numRows()和numCols()方法得到行數(shù)和列數(shù)

scala> mat.numRows()

res0: Long = 2

scala> mat.numCols()

res1: Long = 3

scala> mat.rows.foreach(println)

[1.0,2.0,3.0]

[2.0,3.0,4.0]

在獲得RowMatrix的實例后,我們可以通過其自帶的computeColumnSummaryStatistics()方法獲取該矩陣的一些統(tǒng)計摘要信息,并可以對其進行QR分解,SVD分解PCA分解,這一部分內(nèi)容將在特征降維的章節(jié)詳細解說,這里不再敘述。

統(tǒng)計摘要信息的獲取如下代碼段所示(接上代碼段):

// 通過computeColumnSummaryStatistics()方法獲取統(tǒng)計摘要

scala> val summary = mat.computeColumnSummaryStatistics()

// 可以通過summary實例來獲取矩陣的相關(guān)統(tǒng)計信息,例如行數(shù)

scala> summary.count

res2: Long = 2

// 最大向量

scala> summary.max

res3: org.apache.spark.mllib.linalg.Vector = [2.0,3.0,4.0]

// 方差向量

scala> summary.variance

res4: org.apache.spark.mllib.linalg.Vector = [0.5,0.5,0.5]

// 平均向量

scala> summary.mean

res5: org.apache.spark.mllib.linalg.Vector = [1.5,2.5,3.5]

// L1范數(shù)向量

scala> summary.normL1

res6: org.apache.spark.mllib.linalg.Vector = [3.0,5.0,7.0]

(二)索引行矩陣(IndexedRowMatrix)

索引行矩陣IndexedRowMatrix與RowMatrix相似,但它的每一行都帶有一個有意義的行索引值,這個索引值可以被用來識別不同行,或是進行諸如join之類的操作。其數(shù)據(jù)存儲在一個由IndexedRow組成的RDD里,即每一行都是一個帶長整型索引的本地向量。

與RowMatrix類似,IndexedRowMatrix的實例可以通過RDD[IndexedRow]實例來創(chuàng)建。如下代碼段所示(接上例):

scala>import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}

// 通過本地向量dv1 dv2來創(chuàng)建對應(yīng)的IndexedRow

// 在創(chuàng)建時可以給定行的索引值,如這里給dv1的向量賦索引值1,dv2賦索引值2

scala> val idxr1 = IndexedRow(1,dv1)

idxr1: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(1,[1.0,2.0,3.0])

scala> val idxr2 = IndexedRow(2,dv2)

idxr2: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(2,[2.0,3.0,4.0])

// 通過IndexedRow創(chuàng)建RDD[IndexedRow]

scala> val idxrows = sc.parallelize(Array(idxr1,idxr2))

idxrows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.IndexedRow] = ParallelCollectionRDD[14] at parallelize at :45

// 通過RDD[IndexedRow]創(chuàng)建一個索引行矩陣

scala> val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)

idxmat: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@532887bc

//打印

scala> idxmat.rows.foreach(println)

IndexedRow(1,[1.0,2.0,3.0])

IndexedRow(2,[2.0,3.0,4.0])

(三)坐標(biāo)矩陣(Coordinate Matrix)

坐標(biāo)矩陣CoordinateMatrix是一個基于矩陣項構(gòu)成的RDD的分布式矩陣。每一個矩陣項MatrixEntry都是一個三元組:(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是該位置的值。坐標(biāo)矩陣一般在矩陣的兩個維度都很大,且矩陣非常稀疏的時候使用。

CoordinateMatrix實例可通過RDD[MatrixEntry]實例來創(chuàng)建,其中每一個矩陣項都是一個(rowIndex, colIndex, elem)的三元組:

scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

// 創(chuàng)建兩個矩陣項ent1和ent2,每一個矩陣項都是由索引和值構(gòu)成的三元組

scala> val ent1 = new MatrixEntry(0,1,0.5)

ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,1,0.5)

scala> val ent2 = new MatrixEntry(2,2,1.8)

ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.8)

// 創(chuàng)建RDD[MatrixEntry]

scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2))

entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[15] at parallelize at :42

// 通過RDD[MatrixEntry]創(chuàng)建一個坐標(biāo)矩陣

scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)

coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@25b2d465

//打印

scala> coordMat.entries.foreach(println)

MatrixEntry(0,1,0.5)

MatrixEntry(2,2,1.8)

坐標(biāo)矩陣可以通過transpose()方法對矩陣進行轉(zhuǎn)置操作,并可以通過自帶的toIndexedRowMatrix()方法轉(zhuǎn)換成索引行矩陣IndexedRowMatrix。但目前暫不支持CoordinateMatrix的其他計算操作。

// 將coordMat進行轉(zhuǎn)置

scala> val transMat: CoordinateMatrix = coordMat.transpose()

transMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@c1ee50

scala> transMat.entries.foreach(println)

MatrixEntry(1,0,0.5)

MatrixEntry(2,2,1.8)

// 將坐標(biāo)矩陣轉(zhuǎn)換成一個索引行矩陣

scala> val indexedRowMatrix = transMat.toIndexedRowMatrix()

indexedRowMatrix: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@7ee7e1bb

scala> indexedRowMatrix.rows.foreach(println)

IndexedRow(1,(3,[0],[0.5]))

IndexedRow(2,(3,[2],[1.8]))

####(四)分塊矩陣(Block Matrix)

分塊矩陣是基于矩陣塊MatrixBlock構(gòu)成的RDD的分布式矩陣,其中每一個矩陣塊MatrixBlock都是一個元組((Int, Int), Matrix),其中(Int, Int)是塊的索引,而Matrix則是在對應(yīng)位置的子矩陣(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock決定,默認值均為1024。分塊矩陣支持和另一個分塊矩陣進行加法操作和乘法操作,并提供了一個支持方法validate()來確認分塊矩陣是否創(chuàng)建成功。

分塊矩陣可由索引行矩陣IndexedRowMatrix或坐標(biāo)矩陣CoordinateMatrix調(diào)用toBlockMatrix()方法來進行轉(zhuǎn)換,該方法將矩陣劃分成尺寸默認為1024x1024的分塊,可以在調(diào)用toBlockMatrix(rowsPerBlock, colsPerBlock)方法時傳入?yún)?shù)來調(diào)整分塊的尺寸。

下面以矩陣A(如圖)為例,先利用矩陣項MatrixEntry將其構(gòu)造成坐標(biāo)矩陣,再轉(zhuǎn)化成如圖所示的4個分塊矩陣,最后對矩陣A與其轉(zhuǎn)置進行乘法運算:

scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

scala> import org.apache.spark.mllib.linalg.distributed.BlockMatrix

import org.apache.spark.mllib.linalg.distributed.BlockMatrix

// 創(chuàng)建8個矩陣項,每一個矩陣項都是由索引和值構(gòu)成的三元組

scala> val ent1 = new MatrixEntry(0,0,1)

...

scala> val ent2 = new MatrixEntry(1,1,1)

...

scala> val ent3 = new MatrixEntry(2,0,-1)

...

scala> val ent4 = new MatrixEntry(2,1,2)

...

scala> val ent5 = new MatrixEntry(2,2,1)

...

scala> val ent6 = new MatrixEntry(3,0,1)

...

scala> val ent7 = new MatrixEntry(3,1,1)

...

scala> val ent8 = new MatrixEntry(3,3,1)

...

// 創(chuàng)建RDD[MatrixEntry]

scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8))

entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[21] at parallelize at :57

// 通過RDD[MatrixEntry]創(chuàng)建一個坐標(biāo)矩陣

scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)

coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@31c5fb43

// 將坐標(biāo)矩陣轉(zhuǎn)換成2x2的分塊矩陣并存儲,尺寸通過參數(shù)傳入

val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache()

matA: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@26b1df2c

// 可以用validate()方法判斷是否分塊成功

matA.validate()

構(gòu)建成功后,可通過toLocalMatrix轉(zhuǎn)換成本地矩陣,并查看其分塊情況:

scala> matA.toLocalMatrix

res31: org.apache.spark.mllib.linalg.Matrix =

1.0? 0.0? 0.0? 0.0

0.0? 1.0? 0.0? 0.0

-1.0? 2.0? 1.0? 0.0

1.0? 1.0? 0.0? 1.0

// 查看其分塊情況

scala> matA.numColBlocks

res12: Int = 2

scala> matA.numRowBlocks

res13: Int = 2

// 計算矩陣A和其轉(zhuǎn)置矩陣的積矩陣

scala> val ata = matA.transpose.multiply(matA)

ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@3644e451

scala> ata.toLocalMatrix

res1: org.apache.spark.mllib.linalg.Matrix =

3.0? -1.0? -1.0? 1.0

-1.0? 6.0? 2.0? 1.0

-1.0? 2.0? 1.0? 0.0

1.0? 1.0? 0.0? 1.0

分塊矩陣BlockMatrix將矩陣分成一系列矩陣塊,底層由矩陣塊構(gòu)成的RDD來進行數(shù)據(jù)存儲。值得指出的是,用于生成分布式矩陣的底層RDD必須是已經(jīng)確定(Deterministic)的,因為矩陣的尺寸將被存儲下來,所以使用未確定的RDD將會導(dǎo)致錯誤。而且,不同類型的分布式矩陣之間的轉(zhuǎn)換需要進行一個全局的shuffle操作,非常耗費資源。所以,根據(jù)數(shù)據(jù)本身的性質(zhì)和應(yīng)用需求來選取恰當(dāng)?shù)姆植际骄仃嚧鎯︻愋褪欠浅V匾摹?/p>

參考路徑:http://dblab.xmu.edu.cn/blog/1175/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容