Map Reduce Triplets Transition Guide (Legacy)

GraphX 是 Spark 中用于圖形和圖形并行計(jì)算的新組件。在高層次上, GraphX 通過(guò)引入一個(gè)新的圖形抽象來(lái)擴(kuò)展 SparkRDD:一種具有附加到每個(gè)頂點(diǎn)和邊緣的屬性的定向多重圖形。為了支持圖形計(jì)算,GraphX 公開了一組基本運(yùn)算符(例如:subgraph,joinVertices和aggregateMessages)以及PregelAPI 的優(yōu)化變體。此外,GraphX 還包括越來(lái)越多的圖形算法和構(gòu)建器,以簡(jiǎn)化圖形分析任務(wù)。
首先需要將 Spark 和 GraphX 導(dǎo)入到項(xiàng)目中,如下所示:
importorg.apache.spark._importorg.apache.spark.graphx._// To make some of the examples work we will also need RDDimportorg.apache.spark.rdd.RDD
如果您不使用 Spark 外殼,您還需要一個(gè)SparkContext。要了解有關(guān) Spark 入門的更多信息,請(qǐng)參考Spark快速入門指南。
屬性 Graph是一個(gè)定向多重圖形,用戶定義的對(duì)象附加到每個(gè)頂點(diǎn)和邊緣。定向多圖是具有共享相同源和目標(biāo)頂點(diǎn)的潛在多個(gè)平行邊緣的有向圖。支持平行邊緣的能力簡(jiǎn)化了在相同頂點(diǎn)之間可以有多個(gè)關(guān)系(例如: 同事和朋友)的建模場(chǎng)景。每個(gè)頂點(diǎn)都由唯一的64位長(zhǎng)標(biāo)識(shí)符(VertexId)鍵入。 GraphX 不對(duì)頂點(diǎn)標(biāo)識(shí)符施加任何排序約束。類似地,邊緣具有對(duì)應(yīng)的源和目標(biāo)頂點(diǎn)標(biāo)識(shí)符。
屬性圖是通過(guò) vertex (VD)和 edge (ED) 類型進(jìn)行參數(shù)化的。這些是分別與每個(gè)頂點(diǎn)和邊緣相關(guān)聯(lián)的對(duì)象的類型。
當(dāng)它們是原始數(shù)據(jù)類型(例如: int ,double 等等)時(shí),GraphX 優(yōu)化頂點(diǎn)和邊緣類型的表示,通過(guò)將其存儲(chǔ)在專門的數(shù)組中來(lái)減少內(nèi)存占用。
在某些情況下,可能希望在同一個(gè)圖形中具有不同屬性類型的頂點(diǎn)。這可以通過(guò)繼承來(lái)實(shí)現(xiàn)。例如,將用戶和產(chǎn)品建模為二分圖,我們可能會(huì)執(zhí)行以下操作:
classVertexProperty()caseclassUserProperty(valname:String)extendsVertexPropertycaseclassProductProperty(valname:String,valprice:Double)extendsVertexProperty// The graph might then have the type:vargraph:Graph[VertexProperty,String]=null
像 RDD 一樣,屬性圖是不可變的,分布式的和容錯(cuò)的。通過(guò)生成具有所需更改的新圖形來(lái)完成對(duì)圖表的值或結(jié)構(gòu)的更改。請(qǐng)注意,原始圖形的大部分(即,未受影響的結(jié)構(gòu),屬性和索引)在新圖表中重復(fù)使用,可降低此內(nèi)在功能數(shù)據(jù)結(jié)構(gòu)的成本。使用一系列頂點(diǎn)分割啟發(fā)式方法,在執(zhí)行器之間劃分圖形。與 RDD 一樣,在發(fā)生故障的情況下,可以在不同的機(jī)器上重新創(chuàng)建圖形的每個(gè)分區(qū)。
邏輯上,屬性圖對(duì)應(yīng)于一對(duì)編碼每個(gè)頂點(diǎn)和邊緣的屬性的類型集合( RDD )。因此,圖類包含訪問(wèn)圖形頂點(diǎn)和邊的成員:
classGraph[VD,ED]{valvertices:VertexRDD[VD]valedges:EdgeRDD[ED]}
VertexRDD[VD]和EdgeRDD[ED]分別擴(kuò)展了RDD[(VertexId, VD)]和RDD[Edge[ED]]的優(yōu)化版本。VertexRDD[VD]和EdgeRDD[ED]都提供了圍繞圖形計(jì)算和利用內(nèi)部?jī)?yōu)化的附加功能。 我們?cè)?a target="_blank" rel="nofollow">頂點(diǎn)和邊緣 RDD部分更詳細(xì)地討論了VertexRDD和EdgeRDDAPI,但現(xiàn)在它們可以被認(rèn)為是RDD[(VertexId, VD)]和RDD[Edge[ED]]的簡(jiǎn)單 RDD。
假設(shè)我們要構(gòu)建一個(gè)由 GraphX 項(xiàng)目中的各種協(xié)作者組成的屬性圖。頂點(diǎn)屬性可能包含用戶名和職業(yè)。我們可以用描述協(xié)作者之間關(guān)系的字符串來(lái)注釋邊:

生成的圖形將具有類型簽名:
valuserGraph:Graph[(String,String),String]
從原始文件, RDD 甚至合成生成器構(gòu)建屬性圖有許多方法,這些在圖形構(gòu)建器的一節(jié)中有更詳細(xì)的討論 。最普遍的方法是使用Graph 對(duì)象。例如,以下代碼從 RDD 集合中構(gòu)建一個(gè)圖:
// Assume the SparkContext has already been constructedvalsc:SparkContext// Create an RDD for the verticesvalusers:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof"))))// Create an RDD for edgesvalrelationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi")))// Define a default user in case there are relationship with missing uservaldefaultUser=("John Doe","Missing")// Build the initial Graphvalgraph=Graph(users,relationships,defaultUser)
在上面的例子中,我們使用了Edgecase 類。邊緣具有srcId和dstId對(duì)應(yīng)于源和目標(biāo)頂點(diǎn)標(biāo)識(shí)符。此外,Edge該類有一個(gè)attr存儲(chǔ)邊緣屬性的成員。
我們可以分別使用graph.vertices和graph.edges成員將圖形解構(gòu)成相應(yīng)的頂點(diǎn)和邊緣視圖。
valgraph:Graph[(String,String),String]// Constructed from above// Count all users which are postdocsgraph.vertices.filter{case(id,(name,pos))=>pos=="postdoc"}.count// Count all the edges where src > dstgraph.edges.filter(e=>e.srcId>e.dstId).count
注意,graph.vertices返回一個(gè)VertexRDD[(String, String)]擴(kuò)展RDD[(VertexId, (String, String))],所以我們使用 scalacase表達(dá)式來(lái)解構(gòu)元組。另一方面,graph.edges返回一個(gè)EdgeRDD包含Edge[String]對(duì)象。我們也可以使用 case 類型構(gòu)造函數(shù),如下所示:
graph.edges.filter{caseEdge(src,dst,prop)=>src>dst}.count
除了屬性圖的頂點(diǎn)和邊緣視圖之外, GraphX 還暴露了三元組視圖。三元組視圖邏輯上連接頂點(diǎn)和邊緣屬性,生成RDD[EdgeTriplet[VD, ED]]包含EdgeTriplet該類的實(shí)例。此 連接可以用以下SQL表達(dá)式表示:
SELECTsrc.id,dst.id,src.attr,e.attr,dst.attrFROMedgesASeLEFTJOINverticesASsrc,verticesASdstONe.srcId=src.IdANDe.dstId=dst.Id
或圖形為:

EdgeTriplet類通過(guò)分別添加包含源和目標(biāo)屬性的srcAttr和dstAttr成員來(lái)擴(kuò)展Edge類。 我們可以使用圖形的三元組視圖來(lái)渲染描述用戶之間關(guān)系的字符串集合。
valgraph:Graph[(String,String),String]// Constructed from above// Use the triplets view to create an RDD of facts.valfacts:RDD[String]=graph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1)facts.collect.foreach(println(_))
正如 RDDs 有這樣的基本操作map,filter, 以及reduceByKey,性能圖表也有采取用戶定義的函數(shù)基本運(yùn)算符的集合,產(chǎn)生具有轉(zhuǎn)化特性和結(jié)構(gòu)的新圖。定義了優(yōu)化實(shí)現(xiàn)的核心運(yùn)算符,并定義了Graph表示為核心運(yùn)算符組合的方便運(yùn)算符GraphOps。不過(guò),由于 Scala 的含義,操作員GraphOps可自動(dòng)作為成員使用Graph。例如,我們可以通過(guò)以下方法計(jì)算每個(gè)頂點(diǎn)的入度(定義GraphOps):
valgraph:Graph[(String,String),String]// Use the implicit GraphOps.inDegrees operatorvalinDegrees:VertexRDD[Int]=graph.inDegrees
區(qū)分核心圖形操作的原因GraphOps是能夠在將來(lái)支持不同的圖形表示。每個(gè)圖形表示必須提供核心操作的實(shí)現(xiàn),并重用許多有用的操作GraphOps。
以下是兩個(gè)定義的功能的簡(jiǎn)要摘要,但為簡(jiǎn)單起見(jiàn)Graph,GraphOps它作為 Graph 的成員呈現(xiàn)。請(qǐng)注意,已經(jīng)簡(jiǎn)化了一些功能簽名(例如,刪除了默認(rèn)參數(shù)和類型約束),并且已經(jīng)刪除了一些更高級(jí)的功能,因此請(qǐng)參閱 API 文檔以獲取正式的操作列表。
/** Summary of the functionality in the property graph */classGraph[VD,ED]{// Information about the Graph ===================================================================valnumEdges:LongvalnumVertices:LongvalinDegrees:VertexRDD[Int]valoutDegrees:VertexRDD[Int]valdegrees:VertexRDD[Int]// Views of the graph as collections =============================================================valvertices:VertexRDD[VD]valedges:EdgeRDD[ED]valtriplets:RDD[EdgeTriplet[VD,ED]]// Functions for caching graphs ==================================================================defpersist(newLevel:StorageLevel=StorageLevel.MEMORY_ONLY):Graph[VD,ED]defcache():Graph[VD,ED]defunpersistVertices(blocking:Boolean=true):Graph[VD,ED]// Change the partitioning heuristic? ============================================================defpartitionBy(partitionStrategy:PartitionStrategy):Graph[VD,ED]// Transform vertex and edge attributes ==========================================================defmapVertices[VD2](map:(VertexId,VD)=>VD2):Graph[VD2,ED]defmapEdges[ED2](map:Edge[ED]=>ED2):Graph[VD,ED2]defmapEdges[ED2](map:(PartitionID,Iterator[Edge[ED]])=>Iterator[ED2]):Graph[VD,ED2]defmapTriplets[ED2](map:EdgeTriplet[VD,ED]=>ED2):Graph[VD,ED2]defmapTriplets[ED2](map:(PartitionID,Iterator[EdgeTriplet[VD,ED]])=>Iterator[ED2]):Graph[VD,ED2]// Modify the graph structure ====================================================================defreverse:Graph[VD,ED]defsubgraph(epred:EdgeTriplet[VD,ED]=>Boolean=(x=>true),vpred:(VertexId,VD)=>Boolean=((v,d)=>true)):Graph[VD,ED]defmask[VD2,ED2](other:Graph[VD2,ED2]):Graph[VD,ED]defgroupEdges(merge:(ED,ED)=>ED):Graph[VD,ED]// Join RDDs with the graph ======================================================================defjoinVertices[U](table:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,U)=>VD):Graph[VD,ED]defouterJoinVertices[U,VD2](other:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,Option[U])=>VD2):Graph[VD2,ED]// Aggregate information about adjacent triplets =================================================defcollectNeighborIds(edgeDirection:EdgeDirection):VertexRDD[Array[VertexId]]defcollectNeighbors(edgeDirection:EdgeDirection):VertexRDD[Array[(VertexId,VD)]]defaggregateMessages[Msg:ClassTag](sendMsg:EdgeContext[VD,ED,Msg]=>Unit,mergeMsg:(Msg,Msg)=>Msg,tripletFields:TripletFields=TripletFields.All):VertexRDD[A]// Iterative graph-parallel computation ==========================================================defpregel[A](initialMsg:A,maxIterations:Int,activeDirection:EdgeDirection)(vprog:(VertexId,VD,A)=>VD,sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],mergeMsg:(A,A)=>A):Graph[VD,ED]// Basic graph algorithms ========================================================================defpageRank(tol:Double,resetProb:Double=0.15):Graph[Double,Double]defconnectedComponents():Graph[VertexId,ED]deftriangleCount():Graph[Int,ED]defstronglyConnectedComponents(numIter:Int):Graph[VertexId,ED]}
與 RDDmap運(yùn)算符一樣,屬性圖包含以下內(nèi)容:
classGraph[VD,ED]{defmapVertices[VD2](map:(VertexId,VD)=>VD2):Graph[VD2,ED]defmapEdges[ED2](map:Edge[ED]=>ED2):Graph[VD,ED2]defmapTriplets[ED2](map:EdgeTriplet[VD,ED]=>ED2):Graph[VD,ED2]}
這些運(yùn)算符中的每一個(gè)產(chǎn)生一個(gè)新的圖形,其中頂點(diǎn)或邊緣屬性被用戶定義的map函數(shù)修改。
請(qǐng)注意,在每種情況下,圖形結(jié)構(gòu)都不受影響。這是這些運(yùn)算符的一個(gè)關(guān)鍵特征,它允許生成的圖形重用原始圖形的結(jié)構(gòu)索引。以下代碼段在邏輯上是等效的,但是第一個(gè)代碼片段不保留結(jié)構(gòu)索引,并且不會(huì)從GraphX系統(tǒng)優(yōu)化中受益:
valnewVertices=graph.vertices.map{case(id,attr)=>(id,mapUdf(id,attr))}valnewGraph=Graph(newVertices,graph.edges)
而是mapVertices用來(lái)保存索引:
valnewGraph=graph.mapVertices((id,attr)=>mapUdf(id,attr))
這些運(yùn)算符通常用于初始化特定計(jì)算或項(xiàng)目的圖形以避免不必要的屬性。例如,給出一個(gè)以度為頂點(diǎn)屬性的圖(我們稍后將描述如何構(gòu)建這樣一個(gè)圖),我們?yōu)镻ageRank初始化它:
// Given a graph where the vertex property is the out degreevalinputGraph:Graph[Int,String]=graph.outerJoinVertices(graph.outDegrees)((vid,_,degOpt)=>degOpt.getOrElse(0))// Construct a graph where each edge contains the weight// and each vertex is the initial PageRankvaloutputGraph:Graph[Double,Double]=inputGraph.mapTriplets(triplet=>1.0/triplet.srcAttr).mapVertices((id,_)=>1.0)
目前GraphX只支持一套簡(jiǎn)單的常用結(jié)構(gòu)運(yùn)算符,我們預(yù)計(jì)將來(lái)會(huì)增加更多。以下是基本結(jié)構(gòu)運(yùn)算符的列表。
classGraph[VD,ED]{defreverse:Graph[VD,ED]defsubgraph(epred:EdgeTriplet[VD,ED]=>Boolean,vpred:(VertexId,VD)=>Boolean):Graph[VD,ED]defmask[VD2,ED2](other:Graph[VD2,ED2]):Graph[VD,ED]defgroupEdges(merge:(ED,ED)=>ED):Graph[VD,ED]}
該reverse運(yùn)算符將返回逆轉(zhuǎn)的所有邊緣方向上的新圖。這在例如嘗試計(jì)算逆PageRank時(shí)是有用的。由于反向操作不會(huì)修改頂點(diǎn)或邊緣屬性或更改邊緣數(shù)量,因此可以在沒(méi)有數(shù)據(jù)移動(dòng)或重復(fù)的情況下高效地實(shí)現(xiàn)。
在subgraph操作者需要的頂點(diǎn)和邊緣的謂詞,并返回包含只有滿足謂詞頂點(diǎn)的頂點(diǎn)的曲線圖(評(píng)估為真),并且滿足謂詞邊緣邊緣并連接滿足頂點(diǎn)謂詞頂點(diǎn)。所述subgraph操作員可在情況編號(hào)被用來(lái)限制圖形以頂點(diǎn)和感興趣的邊緣或消除斷開的鏈接。例如,在以下代碼中,我們刪除了斷開的鏈接:
// Create an RDD for the verticesvalusers:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof")),(4L,("peter","student"))))// Create an RDD for edgesvalrelationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi"),Edge(4L,0L,"student"),Edge(5L,0L,"colleague")))// Define a default user in case there are relationship with missing uservaldefaultUser=("John Doe","Missing")// Build the initial Graphvalgraph=Graph(users,relationships,defaultUser)// Notice that there is a user 0 (for which we have no information) connected to users// 4 (peter) and 5 (franklin).graph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1).collect.foreach(println(_))// Remove missing vertices as well as the edges to connected to themvalvalidGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="Missing")// The valid subgraph will disconnect users 4 and 5 by removing user 0validGraph.vertices.collect.foreach(println(_))validGraph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1).collect.foreach(println(_))
注意在上面的例子中只提供了頂點(diǎn)謂詞。 如果未提供頂點(diǎn)或邊緣謂詞,則subgraph運(yùn)算符默認(rèn)為true。
在mask操作者通過(guò)返回包含該頂點(diǎn)和邊,它們也在輸入圖形中發(fā)現(xiàn)的曲線構(gòu)造一個(gè)子圖。這可以與subgraph運(yùn)算符一起使用, 以便根據(jù)另一個(gè)相關(guān)圖中的屬性限制圖形。例如,我們可以使用缺少頂點(diǎn)的圖運(yùn)行連接的組件,然后將答案限制為有效的子圖。
// Run Connected ComponentsvalccGraph=graph.connectedComponents()// No longer contains missing field// Remove missing vertices as well as the edges to connected to themvalvalidGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="Missing")// Restrict the answer to the valid subgraphvalvalidCCGraph=ccGraph.mask(validGraph)
groupEdges操作符將多邊形中的平行邊(即,頂點(diǎn)對(duì)之間的重復(fù)邊)合并。 在許多數(shù)值應(yīng)用中,可以將平行邊緣(它們的權(quán)重組合)合并成單個(gè)邊緣,從而減小圖形的大小。
在許多情況下,有必要使用圖形連接來(lái)自外部收集( RDD )的數(shù)據(jù)。例如,我們可能有額外的用戶屬性,我們要與現(xiàn)有的圖形合并,或者我們可能希望將頂點(diǎn)屬性從一個(gè)圖形拉到另一個(gè)。這些任務(wù)可以使用join運(yùn)算符完成。下面我們列出關(guān)鍵 join 運(yùn)算符:
classGraph[VD,ED]{defjoinVertices[U](table:RDD[(VertexId,U)])(map:(VertexId,VD,U)=>VD):Graph[VD,ED]defouterJoinVertices[U,VD2](table:RDD[(VertexId,U)])(map:(VertexId,VD,Option[U])=>VD2):Graph[VD2,ED]}
joinVertices操作符將頂點(diǎn)與輸入 RDD 相連,并返回一個(gè)新的圖形,其中通過(guò)將用戶定義的map函數(shù)應(yīng)用于已連接頂點(diǎn)的結(jié)果而獲得的頂點(diǎn)屬性。 RDD 中沒(méi)有匹配值的頂點(diǎn)保留其原始值。
請(qǐng)注意,如果 RDD 包含給定頂點(diǎn)的多個(gè)值,則只能使用一個(gè)值。因此,建議使用以下命令使輸入 RDD 變得獨(dú)一無(wú)二,這也將對(duì)結(jié)果值進(jìn)行pre-index,以顯著加速后續(xù)連接。
valnonUniqueCosts:RDD[(VertexId,Double)]valuniqueCosts:VertexRDD[Double]=graph.vertices.aggregateUsingIndex(nonUnique,(a,b)=>a+b)valjoinedGraph=graph.joinVertices(uniqueCosts)((id,oldCost,extraCost)=>oldCost+extraCost)
除了將用戶定義的map函數(shù)應(yīng)用于所有頂點(diǎn)并且可以更改頂點(diǎn)屬性類型之外,更一般的outerJoinVertices的行為類似于joinVertices。 因?yàn)椴皇撬械捻旤c(diǎn)都可能在輸入 RDD 中具有匹配的值,所以map函數(shù)采用Option類型。 例如,我們可以通過(guò)使用outDegree初始化頂點(diǎn)屬性來(lái)為 PageRank 設(shè)置一個(gè)圖。
valoutDegrees:VertexRDD[Int]=graph.outDegreesvaldegreeGraph=graph.outerJoinVertices(outDegrees){(id,oldAttr,outDegOpt)=>outDegOptmatch{caseSome(outDeg)=>outDegcaseNone=>0// No outDegree means zero outDegree}}
您可能已經(jīng)注意到上述示例中使用的多個(gè)參數(shù)列表(例如:f(a)(b)curried 函數(shù)模式。 雖然我們可以將f(a)(b)同樣地寫成f(a,b),這意味著b上的類型推斷不依賴于a。 因此,用戶需要為用戶定義的函數(shù)提供類型注釋:
valjoinedGraph=graph.joinVertices(uniqueCosts,(id:VertexId,oldCost:Double,extraCost:Double)=>oldCost+extraCost)
許多圖形分析任務(wù)的關(guān)鍵步驟是聚合關(guān)于每個(gè)頂點(diǎn)鄰域的信息。 例如,我們可能想知道每個(gè)用戶擁有的關(guān)注者數(shù)量或每個(gè)用戶的追隨者的平均年齡。 許多迭代圖表算法(例如:網(wǎng)頁(yè)級(jí)別,最短路徑,以及連接成分)相鄰頂點(diǎn)(例如:電流值的 PageRank ,最短到源路徑,和最小可達(dá)頂點(diǎn) ID )的重復(fù)聚合性質(zhì)。
為了提高性能,主聚合操作員graph.mapReduceTriplets從新的更改graph.AggregateMessages。雖然 API 的變化相對(duì)較小,但我們?cè)谙旅嫣峁┝艘粋€(gè)轉(zhuǎn)換指南。
GraphX 中的核心聚合操作是aggregateMessages。該運(yùn)算符將用戶定義的sendMsg函數(shù)應(yīng)用于圖中的每個(gè)邊緣三元組,然后使用該mergeMsg函數(shù)在其目標(biāo)頂點(diǎn)聚合這些消息。
classGraph[VD,ED]{defaggregateMessages[Msg:ClassTag](sendMsg:EdgeContext[VD,ED,Msg]=>Unit,mergeMsg:(Msg,Msg)=>Msg,tripletFields:TripletFields=TripletFields.All):VertexRDD[Msg]}
用戶定義的sendMsg函數(shù)接受一個(gè)EdgeContext,它將源和目標(biāo)屬性以及 edge 屬性和函數(shù) (sendToSrc, 和sendToDst) 一起發(fā)送到源和目標(biāo)屬性。 在 map-reduce 中,將sendMsg作為map函數(shù)。 用戶定義的mergeMsg函數(shù)需要兩個(gè)發(fā)往同一頂點(diǎn)的消息,并產(chǎn)生一條消息。 想想mergeMsg是 map-reduce 中的reduce函數(shù)。aggregateMessages運(yùn)算符返回一個(gè)VertexRDD[Msg],其中包含去往每個(gè)頂點(diǎn)的聚合消息(Msg類型)。 沒(méi)有收到消息的頂點(diǎn)不包括在返回的VertexRDDVertexRDD中。
另外,aggregateMessages采用一個(gè)可選的tripletsFields,它們指示在EdgeContext中訪問(wèn)哪些數(shù)據(jù)(即源頂點(diǎn)屬性,而不是目標(biāo)頂點(diǎn)屬性)。tripletsFields定義的可能選項(xiàng),TripletFields默認(rèn)值是TripletFields.All指示用戶定義的sendMsg函數(shù)可以訪問(wèn)的任何字段EdgeContext。該tripletFields參數(shù)可用于通知 GraphX ,只有部分EdgeContext需要允許 GraphX 選擇優(yōu)化的連接策略。例如,如果我們計(jì)算每個(gè)用戶的追隨者的平均年齡,我們只需要源字段,因此我們將用于TripletFields.Src表示我們只需要源字段。
在早期版本的 GraphX 中,我們使用字節(jié)碼檢測(cè)來(lái)推斷,TripletFields但是我們發(fā)現(xiàn)字節(jié)碼檢查稍微不可靠,而是選擇了更明確的用戶控制。
在下面的例子中,我們使用aggregateMessages運(yùn)算符來(lái)計(jì)算每個(gè)用戶的資深追蹤者的平均年齡。
importorg.apache.spark.graphx.{Graph,VertexRDD}importorg.apache.spark.graphx.util.GraphGenerators// Create a graph with "age" as the vertex property.// Here we use a random graph for simplicity.valgraph:Graph[Double,Int]=GraphGenerators.logNormalGraph(sc,numVertices=100).mapVertices((id,_)=>id.toDouble)// Compute the number of older followers and their total agevalolderFollowers:VertexRDD[(Int,Double)]=graph.aggregateMessages[(Int,Double)](triplet=>{// Map Functionif(triplet.srcAttr>triplet.dstAttr){// Send message to destination vertex containing counter and agetriplet.sendToDst(1,triplet.srcAttr)}},// Add counter and age(a,b)=>(a._1+b._1,a._2+b._2)// Reduce Function)// Divide total age by number of older followers to get average age of older followersvalavgAgeOfOlderFollowers:VertexRDD[Double]=olderFollowers.mapValues((id,value)=>valuematch{case(count,totalAge)=>totalAge/count})// Display the resultsavgAgeOfOlderFollowers.collect.foreach(println(_))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala" in the Spark repo.
aggregateMessages當(dāng)消息(和消息的總和)是恒定大?。ɡ纾焊?dòng)和加法而不是列表和級(jí)聯(lián))時(shí),該操作最佳地執(zhí)行。
Map Reduce Triplets Transition Guide (Legacy)
在早期版本的 GraphX 中,鄰域聚合是使用mapReduceTriplets運(yùn)算符完成的 :
classGraph[VD,ED]{defmapReduceTriplets[Msg](map:EdgeTriplet[VD,ED]=>Iterator[(VertexId,Msg)],reduce:(Msg,Msg)=>Msg):VertexRDD[Msg]}
mapReduceTriplets操作符接受用戶定義的映射函數(shù),該函數(shù)應(yīng)用于每個(gè)三元組,并且可以使用用戶定義的縮減函數(shù)來(lái)生成聚合的消息。 然而,我們發(fā)現(xiàn)返回的迭代器的用戶是昂貴的,并且它阻止了我們應(yīng)用其他優(yōu)化(例如:局部頂點(diǎn)重新編號(hào))的能力。 在aggregateMessages中,我們引入了 EdgeContext ,它暴露了三元組字段,并且還顯示了向源和目標(biāo)頂點(diǎn)發(fā)送消息的功能。 此外,我們刪除了字節(jié)碼檢查,而是要求用戶指出三元組中實(shí)際需要哪些字段。
以下代碼塊使用mapReduceTriplets:
valgraph:Graph[Int,Float]=...defmsgFun(triplet:Triplet[Int,Float]):Iterator[(Int,String)]={Iterator((triplet.dstId,"Hi"))}defreduceFun(a:String,b:String):String=a+" "+bvalresult=graph.mapReduceTriplets[String](msgFun,reduceFun)
可以使用aggregateMessages:?
valgraph:Graph[Int,Float]=...defmsgFun(triplet:EdgeContext[Int,Float,String]){triplet.sendToDst("Hi")}defreduceFun(a:String,b:String):String=a+" "+bvalresult=graph.aggregateMessages[String](msgFun,reduceFun)
常見(jiàn)的聚合任務(wù)是計(jì)算每個(gè)頂點(diǎn)的程度:與每個(gè)頂點(diǎn)相鄰的邊數(shù)。在有向圖的上下文中,通常需要知道每個(gè)頂點(diǎn)的度數(shù),外部程度和總程度。本GraphOps類包含運(yùn)營(yíng)商計(jì)算度數(shù)每個(gè)頂點(diǎn)的集合。例如,在下面我們將計(jì)算最大值,最大和最大級(jí)別:
// Define a reduce operation to compute the highest degree vertexdefmax(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={if(a._2>b._2)aelseb}// Compute the max degreesvalmaxInDegree:(VertexId,Int)=graph.inDegrees.reduce(max)valmaxOutDegree:(VertexId,Int)=graph.outDegrees.reduce(max)valmaxDegrees:(VertexId,Int)=graph.degrees.reduce(max)
在某些情況下,通過(guò)在每個(gè)頂點(diǎn)處收集相鄰頂點(diǎn)及其屬性可以更容易地表達(dá)計(jì)算。這可以使用collectNeighborIds和collectNeighbors運(yùn)算符輕松實(shí)現(xiàn) 。
classGraphOps[VD,ED]{defcollectNeighborIds(edgeDirection:EdgeDirection):VertexRDD[Array[VertexId]]defcollectNeighbors(edgeDirection:EdgeDirection):VertexRDD[Array[(VertexId,VD)]]}
這些操作可能相當(dāng)昂貴,因?yàn)樗鼈冎貜?fù)信息并需要大量通信。 如果可能,請(qǐng)直接使用aggregateMessages操作來(lái)表達(dá)相同的計(jì)算。
在 Spark 中,默認(rèn)情況下,RDD 不會(huì)保留在內(nèi)存中。為了避免重新計(jì)算,在多次使用它們時(shí),必須明確緩存它們(參見(jiàn)Spark Programming Guide)。GraphX 中的圖形表現(xiàn)方式相同。當(dāng)多次使用圖表時(shí),請(qǐng)務(wù)必先調(diào)用Graph.cache()。
在迭代計(jì)算中,uncaching也可能是最佳性能所必需的。默認(rèn)情況下,緩存的 RDD 和圖形將保留在內(nèi)存中,直到內(nèi)存壓力迫使它們以 LRU 順序逐出。對(duì)于迭代計(jì)算,來(lái)自先前迭代的中間結(jié)果將填滿緩存。雖然它們最終被驅(qū)逐出來(lái),但存儲(chǔ)在內(nèi)存中的不必要的數(shù)據(jù)會(huì)減慢垃圾收集速度。一旦不再需要中間結(jié)果,就會(huì)更有效率。這涉及每次迭代實(shí)現(xiàn)(緩存和強(qiáng)制)圖形或 RDD ,取消所有其他數(shù)據(jù)集,并且僅在將來(lái)的迭代中使用實(shí)例化數(shù)據(jù)集。然而,由于圖形由多個(gè) RDD 組成,所以很難將它們正確地分開。對(duì)于迭代計(jì)算,我們建議使用Pregel API,它可以正確地解析中間結(jié)果。
圖形是固有的遞歸數(shù)據(jù)結(jié)構(gòu),因?yàn)轫旤c(diǎn)的屬性取決于其鄰居的屬性,而鄰居的屬性又依賴于其鄰居的屬性。因此,許多重要的圖算法迭代地重新計(jì)算每個(gè)頂點(diǎn)的屬性,直到達(dá)到一個(gè)固定點(diǎn)條件。已經(jīng)提出了一系列圖并行抽象來(lái)表達(dá)這些迭代算法。GraphX 公開了 Pregel API 的變體。
在高層次上,GraphX 中的 Pregel 運(yùn)算符是限制到圖形拓?fù)涞呐客讲⑿邢⒊橄蟆?Pregel 操作符在一系列超級(jí)步驟中執(zhí)行,其中頂點(diǎn)接收來(lái)自先前超級(jí)步驟的入站消息的總和,計(jì)算頂點(diǎn)屬性的新值,然后在下一個(gè)超級(jí)步驟中將消息發(fā)送到相鄰頂點(diǎn)。與 Pregel 不同,消息作為邊緣三元組的函數(shù)并行計(jì)算,消息計(jì)算可以訪問(wèn)源和目標(biāo)頂點(diǎn)屬性。在超級(jí)步驟中跳過(guò)不接收消息的頂點(diǎn)。 Pregel 運(yùn)算符終止迭代,并在沒(méi)有剩余的消息時(shí)返回最終的圖。
注意,與更多的標(biāo)準(zhǔn) Pregel 實(shí)現(xiàn)不同,GraphX 中的頂點(diǎn)只能將消息發(fā)送到相鄰頂點(diǎn),并且使用用戶定義的消息傳遞功能并行完成消息構(gòu)造。這些約束允許在 GraphX 中進(jìn)行額外優(yōu)化。
以下是Pregel 運(yùn)算符的類型簽名以及 其實(shí)現(xiàn)的草圖(注意:為了避免由于長(zhǎng)譜系鏈引起的 stackOverflowError , pregel 支持周期性檢查點(diǎn)圖和消息,將 “spark.graphx.pregel.checkpointInterval” 設(shè)置為正數(shù),說(shuō)10。并使用 SparkContext.setCheckpointDir(directory: String)) 設(shè)置 checkpoint 目錄):
classGraphOps[VD,ED]{defpregel[A](initialMsg:A,maxIter:Int=Int.MaxValue,activeDir:EdgeDirection=EdgeDirection.Out)(vprog:(VertexId,VD,A)=>VD,sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],mergeMsg:(A,A)=>A):Graph[VD,ED]={// Receive the initial message at each vertexvarg=mapVertices((vid,vdata)=>vprog(vid,vdata,initialMsg)).cache()// compute the messagesvarmessages=g.mapReduceTriplets(sendMsg,mergeMsg)varactiveMessages=messages.count()// Loop until no messages remain or maxIterations is achievedvari=0while(activeMessages>0&&i
請(qǐng)注意,Pregel 需要兩個(gè)參數(shù)列表(即:graph.pregel(list1)(list2)。第一個(gè)參數(shù)列表包含配置參數(shù),包括初始消息,最大迭代次數(shù)以及發(fā)送消息的邊緣方向(默認(rèn)情況下為邊緣)。第二個(gè)參數(shù)列表包含用于接收消息(頂點(diǎn)程序vprog),計(jì)算消息(sendMsg)和組合消息的用戶定義函數(shù)mergeMsg。
在以下示例中,我們可以使用 Pregel 運(yùn)算符來(lái)表達(dá)單源最短路徑的計(jì)算。
importorg.apache.spark.graphx.{Graph,VertexId}importorg.apache.spark.graphx.util.GraphGenerators// A graph with edge attributes containing distancesvalgraph:Graph[Long,Double]=GraphGenerators.logNormalGraph(sc,numVertices=100).mapEdges(e=>e.attr.toDouble)valsourceId:VertexId=42// The ultimate source// Initialize the graph such that all vertices except the root have distance infinity.valinitialGraph=graph.mapVertices((id,_)=>if(id==sourceId)0.0elseDouble.PositiveInfinity)valsssp=initialGraph.pregel(Double.PositiveInfinity)((id,dist,newDist)=>math.min(dist,newDist),// Vertex Programtriplet=>{// Send Messageif(triplet.srcAttr+triplet.attrmath.min(a,b)// Merge Message)println(sssp.vertices.collect.mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala" in the Spark repo.
GraphX 提供了從 RDD 或磁盤上的頂點(diǎn)和邊的集合構(gòu)建圖形的幾種方法。默認(rèn)情況下,圖形構(gòu)建器都不會(huì)重新分配圖形邊; 相反,邊緣保留在其默認(rèn)分區(qū)(例如 HDFS 中的原始?jí)K)中。Graph.groupEdges需要重新分區(qū)圖,因?yàn)樗俣ㄏ嗤倪厡⒈还餐ㄎ辉谕环謪^(qū)上,因此您必須在調(diào)用Graph.partitionBy之前調(diào)用groupEdges。
objectGraphLoader{defedgeListFile(sc:SparkContext,path:String,canonicalOrientation:Boolean=false,minEdgePartitions:Int=1):Graph[Int,Int]}
GraphLoader.edgeListFile提供了從磁盤邊緣列表中加載圖形的方法。它解析以下形式的(源頂點(diǎn) ID ,目標(biāo)頂點(diǎn) ID )對(duì)的鄰接列表,跳過(guò)以下開始的注釋行#:
# This is a comment
2 1
4 1
1 2
它Graph從指定的邊緣創(chuàng)建一個(gè),自動(dòng)創(chuàng)建邊緣提到的任何頂點(diǎn)。所有頂點(diǎn)和邊緣屬性默認(rèn)為1.canonicalOrientation參數(shù)允許在正方向 (srcId < dstId) 重新定向邊,這是連接的組件算法所要求的。該minEdgePartitions參數(shù)指定要生成的邊緣分區(qū)的最小數(shù)量; 如果例如 HDFS 文件具有更多塊,則可能存在比指定更多的邊緣分區(qū)。
objectGraph{defapply[VD,ED](vertices:RDD[(VertexId,VD)],edges:RDD[Edge[ED]],defaultVertexAttr:VD=null):Graph[VD,ED]deffromEdges[VD,ED](edges:RDD[Edge[ED]],defaultValue:VD):Graph[VD,ED]deffromEdgeTuples[VD](rawEdges:RDD[(VertexId,VertexId)],defaultValue:VD,uniqueEdges:Option[PartitionStrategy]=None):Graph[VD,Int]}
Graph.apply允許從頂點(diǎn)和邊緣的 RDD 創(chuàng)建圖形。重復(fù)的頂點(diǎn)被任意挑選,并且邊緣 RDD 中找到的頂點(diǎn),而不是頂點(diǎn) RDD 被分配了默認(rèn)屬性。
Graph.fromEdges允許僅從 RDD 的邊緣創(chuàng)建圖形,自動(dòng)創(chuàng)建邊緣提到的任何頂點(diǎn)并將其分配給默認(rèn)值。
Graph.fromEdgeTuples允許僅從邊緣元組的 RDD 創(chuàng)建圖形,將邊緣分配為值1,并自動(dòng)創(chuàng)建邊緣提到的任何頂點(diǎn)并將其分配給默認(rèn)值。 它還支持重復(fù)數(shù)據(jù)刪除邊緣; 重復(fù)數(shù)據(jù)刪除,將某些PartitionStrategy作為uniqueEdges參數(shù)傳遞(例如:uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。 分區(qū)策略是必須的,以便在相同的分區(qū)上共同使用相同的邊,以便可以進(jìn)行重復(fù)數(shù)據(jù)刪除。
GraphX 公開RDD了圖中存儲(chǔ)的頂點(diǎn)和邊的視圖。然而,由于 GraphX 在優(yōu)化的數(shù)據(jù)結(jié)構(gòu)中維護(hù)頂點(diǎn)和邊,并且這些數(shù)據(jù)結(jié)構(gòu)提供了附加功能,所以頂點(diǎn)和邊分別作為VertexRDD和EdgeRDD返回 。在本節(jié)中,我們將回顧一些這些類型中的其他有用功能。請(qǐng)注意,這只是一個(gè)不完整的列表,請(qǐng)參閱API文檔中的正式操作列表。
該VertexRDD[A]擴(kuò)展RDD[(VertexId, A)]并增加了額外的限制,每個(gè)VertexId只發(fā)生一次。此外,VertexRDD[A]表示一組頂點(diǎn),每個(gè)頂點(diǎn)的屬性類型A。在內(nèi)部,這是通過(guò)將頂點(diǎn)屬性存儲(chǔ)在可重用的散列圖數(shù)據(jù)結(jié)構(gòu)中來(lái)實(shí)現(xiàn)的。因此,如果兩個(gè)VertexRDD派生自相同的基礎(chǔ)VertexRDD(例如:filter或mapValues),則可以在不使用散列評(píng)估的情況下連續(xù)連接。為了利用這個(gè)索引的數(shù)據(jù)結(jié)構(gòu),VertexRDD公開了以下附加功能:
classVertexRDD[VD]extendsRDD[(VertexId,VD)]{// Filter the vertex set but preserves the internal indexdeffilter(pred:Tuple2[VertexId,VD]=>Boolean):VertexRDD[VD]// Transform the values without changing the ids (preserves the internal index)defmapValues[VD2](map:VD=>VD2):VertexRDD[VD2]defmapValues[VD2](map:(VertexId,VD)=>VD2):VertexRDD[VD2]// Show only vertices unique to this set based on their VertexId'sdefminus(other:RDD[(VertexId,VD)])// Remove vertices from this set that appear in the other setdefdiff(other:VertexRDD[VD]):VertexRDD[VD]// Join operators that take advantage of the internal indexing to accelerate joins (substantially)defleftJoin[VD2,VD3](other:RDD[(VertexId,VD2)])(f:(VertexId,VD,Option[VD2])=>VD3):VertexRDD[VD3]definnerJoin[U,VD2](other:RDD[(VertexId,U)])(f:(VertexId,VD,U)=>VD2):VertexRDD[VD2]// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.defaggregateUsingIndex[VD2](other:RDD[(VertexId,VD2)],reduceFunc:(VD2,VD2)=>VD2):VertexRDD[VD2]}
請(qǐng)注意,例如,filter運(yùn)算符 如何返回VertexRDD。過(guò)濾器實(shí)際上是通過(guò)BitSet使用索引重新實(shí)現(xiàn)的,并保留與其他VertexRDD進(jìn)行快速連接的能力。同樣,mapValues運(yùn)算符不允許map功能改變,VertexId從而使相同的HashMap數(shù)據(jù)結(jié)構(gòu)能夠被重用。無(wú)論是leftJoin和innerJoin能夠連接兩個(gè)時(shí)識(shí)別VertexRDD來(lái)自同一來(lái)源的小號(hào)HashMap和落實(shí)線性掃描,而不是昂貴的點(diǎn)查找的加入。
aggregateUsingIndex運(yùn)算符對(duì)于從RDD[(VertexId, A)]有效構(gòu)建新的VertexRDD非常有用。 在概念上,如果我在一組頂點(diǎn)上構(gòu)造了一個(gè)VertexRDD[B],這是一些RDD[(VertexId, A)]中的頂點(diǎn)的超集,那么我可以重用索引來(lái)聚合然后再索引RDD[(VertexId, A)]。 例如:
valsetA:VertexRDD[Int]=VertexRDD(sc.parallelize(0Luntil100L).map(id=>(id,1)))valrddB:RDD[(VertexId,Double)]=sc.parallelize(0Luntil100L).flatMap(id=>List((id,1.0),(id,2.0)))// There should be 200 entries in rddBrddB.countvalsetB:VertexRDD[Double]=setA.aggregateUsingIndex(rddB,_+_)// There should be 100 entries in setBsetB.count// Joining A and B should now be fast!valsetC:VertexRDD[Double]=setA.innerJoin(setB)((id,a,b)=>a+b)
該EdgeRDD[ED]擴(kuò)展RDD[Edge[ED]]使用其中定義的各種分區(qū)策略之一來(lái)組織塊中的邊PartitionStrategy。在每個(gè)分區(qū)中,邊緣屬性和鄰接結(jié)構(gòu)分別存儲(chǔ),可以在更改屬性值時(shí)進(jìn)行最大限度的重用。
EdgeRDDEdgeRDD公開的三個(gè)附加功能是:
// Transform the edge attributes while preserving the structuredefmapValues[ED2](f:Edge[ED]=>ED2):EdgeRDD[ED2]// Reverse the edges reusing both attributes and structuredefreverse:EdgeRDD[ED]// Join two `EdgeRDD`s partitioned using the same partitioning strategy.definnerJoin[ED2,ED3](other:EdgeRDD[ED2])(f:(VertexId,VertexId,ED,ED2)=>ED3):EdgeRDD[ED3]
在大多數(shù)應(yīng)用中,我們發(fā)現(xiàn)在EdgeRDDEdgeRDD上的操作是通過(guò)圖形運(yùn)算符完成的,或者依賴基RDD類中定義的操作。
雖然在分布式圖形的 GraphX 表示中使用的優(yōu)化的詳細(xì)描述超出了本指南的范圍,但一些高級(jí)理解可能有助于可擴(kuò)展算法的設(shè)計(jì)以及 API 的最佳使用。GraphX 采用頂點(diǎn)切分方式進(jìn)行分布式圖分割:

GraphX 不是沿著邊沿分割圖形,而是沿著頂點(diǎn)分割圖形,這可以減少通信和存儲(chǔ)開銷。在邏輯上,這對(duì)應(yīng)于將邊緣分配給機(jī)器并允許頂點(diǎn)跨越多臺(tái)機(jī)器。分配邊緣的確切方法取決于PartitionStrategy各種啟發(fā)式的幾種折衷。用戶可以通過(guò)與Graph.partitionBy運(yùn)算符重新分區(qū)圖來(lái)選擇不同的策略。默認(rèn)分區(qū)策略是使用圖形構(gòu)建中提供的邊的初始分區(qū)。然而,用戶可以輕松切換到 GraphX 中包含的 2D 劃分或其他啟發(fā)式算法。

一旦邊緣被劃分,高效的圖形并行計(jì)算的關(guān)鍵挑戰(zhàn)就是有效地將頂點(diǎn)屬性與邊緣連接起來(lái)。因?yàn)檎鎸?shí)世界的圖形通常具有比頂點(diǎn)更多的邊緣,所以我們將頂點(diǎn)屬性移動(dòng)到邊緣。因?yàn)椴皇撬械姆謪^(qū)都將包含鄰近的所有頂點(diǎn)的邊緣,我們內(nèi)部維護(hù)標(biāo)識(shí)在哪里執(zhí)行所需的連接像操作時(shí),廣播頂點(diǎn)的路由表triplets和aggregateMessages。
GraphX 包括一組簡(jiǎn)化分析任務(wù)的圖算法。該算法被包含在org.apache.spark.graphx.lib包可直接作為方法來(lái)訪問(wèn)Graph通過(guò)GraphOps。本節(jié)介紹算法及其使用方法。
PageRank 測(cè)量在圖中每個(gè)頂點(diǎn)的重要性,假設(shè)從邊緣u到v表示的認(rèn)可v通過(guò)的重要性u(píng)。例如,如果 Twitter 用戶遵循許多其他用戶,則用戶將被高度排名。
GraphX 附帶了 PageRank 的靜態(tài)和動(dòng)態(tài)實(shí)現(xiàn)方法作PageRank 對(duì)象上的方法。靜態(tài) PageRank 運(yùn)行固定次數(shù)的迭代,而動(dòng)態(tài) PageRank 運(yùn)行直到排列收斂(即,停止改變超過(guò)指定的公差)。GraphOps允許直接調(diào)用這些算法作為方法Graph。
GraphX還包括一個(gè)可以運(yùn)行 PageRank 的社交網(wǎng)絡(luò)數(shù)據(jù)集示例。給出了一組用戶data/graphx/users.txt,并給出了一組用戶之間的關(guān)系data/graphx/followers.txt。我們計(jì)算每個(gè)用戶的 PageRank 如下:
importorg.apache.spark.graphx.GraphLoader// Load the edges as a graphvalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Run PageRankvalranks=graph.pageRank(0.0001).vertices// Join the ranks with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valranksByUsername=users.join(ranks).map{case(id,(username,rank))=>(username,rank)}// Print the resultprintln(ranksByUsername.collect().mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala" in the Spark repo.
連接的組件算法將圖中每個(gè)連接的組件與其最低編號(hào)頂點(diǎn)的ID進(jìn)行標(biāo)記。例如,在社交網(wǎng)絡(luò)中,連接的組件可以近似群集。GraphX包含ConnectedComponents object中算法的實(shí)現(xiàn),我們從PageRank 部分計(jì)算示例社交網(wǎng)絡(luò)數(shù)據(jù)集的連接組件如下:
importorg.apache.spark.graphx.GraphLoader// Load the graph as in the PageRank examplevalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Find the connected componentsvalcc=graph.connectedComponents().vertices// Join the connected components with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valccByUsername=users.join(cc).map{case(id,(username,cc))=>(username,cc)}// Print the resultprintln(ccByUsername.collect().mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala" in the Spark repo.
頂點(diǎn)是三角形的一部分,當(dāng)它有兩個(gè)相鄰的頂點(diǎn)之間有一個(gè)邊。GraphX 在TriangleCount 對(duì)象中實(shí)現(xiàn)一個(gè)三角計(jì)數(shù)算法,用于確定通過(guò)每個(gè)頂點(diǎn)的三角形數(shù)量,提供聚類度量。我們從PageRank 部分計(jì)算社交網(wǎng)絡(luò)數(shù)據(jù)集的三角形數(shù)。需要注意的是TriangleCount邊緣要處于規(guī)范方向 (srcId < dstId),而圖形要使用Graph.partitionBy。
importorg.apache.spark.graphx.{GraphLoader,PartitionStrategy}// Load the edges in canonical order and partition the graph for triangle countvalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt",true).partitionBy(PartitionStrategy.RandomVertexCut)// Find the triangle count for each vertexvaltriCounts=graph.triangleCount().vertices// Join the triangle counts with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valtriCountByUsername=users.join(triCounts).map{case(id,(username,tc))=>(username,tc)}// Print the resultprintln(triCountByUsername.collect().mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala" in the Spark repo.
假設(shè)我想從一些文本文件中構(gòu)建圖形,將圖形限制為重要的關(guān)系和用戶,在 sub-graph 上運(yùn)行 page-rank ,然后返回與頂級(jí)用戶關(guān)聯(lián)的屬性。我可以用 GraphX 在幾行內(nèi)完成所有這些:
importorg.apache.spark.graphx.GraphLoader// Load my user data and parse into tuples of user id and attribute listvalusers=(sc.textFile("data/graphx/users.txt").map(line=>line.split(",")).map(parts=>(parts.head.toLong,parts.tail)))// Parse the edge data which is already in userId -> userId formatvalfollowerGraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Attach the user attributesvalgraph=followerGraph.outerJoinVertices(users){case(uid,deg,Some(attrList))=>attrList// Some users may not have attributes so we set them as emptycase(uid,deg,None)=>Array.empty[String]}// Restrict the graph to users with usernames and namesvalsubgraph=graph.subgraph(vpred=(vid,attr)=>attr.size==2)// Compute the PageRankvalpagerankGraph=subgraph.pageRank(0.001)// Get the attributes of the top pagerank usersvaluserInfoWithPageRank=subgraph.outerJoinVertices(pagerankGraph.vertices){case(uid,attrList,Some(pr))=>(pr,attrList.toList)case(uid,attrList,None)=>(0.0,attrList.toList)}println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala" in the Spark repo.

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/graphx-programming-guide.html
網(wǎng)頁(yè)地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(覺(jué)得不錯(cuò)麻煩給個(gè) Star,謝謝!~)