Map Reduce Triplets Transition Guide (Legacy)

GraphX 是 Spark 中用于圖形和圖形并行計算的新組件。在高層次上, GraphX 通過引入一個新的圖形抽象來擴展 SparkRDD:一種具有附加到每個頂點和邊緣的屬性的定向多重圖形。為了支持圖形計算,GraphX 公開了一組基本運算符(例如:subgraph,joinVertices和aggregateMessages)以及PregelAPI 的優(yōu)化變體。此外,GraphX 還包括越來越多的圖形算法和構(gòu)建器,以簡化圖形分析任務(wù)。
首先需要將 Spark 和 GraphX 導(dǎo)入到項目中,如下所示:
importorg.apache.spark._importorg.apache.spark.graphx._// To make some of the examples work we will also need RDDimportorg.apache.spark.rdd.RDD
如果您不使用 Spark 外殼,您還需要一個SparkContext。要了解有關(guān) Spark 入門的更多信息,請參考Spark快速入門指南。
屬性 Graph是一個定向多重圖形,用戶定義的對象附加到每個頂點和邊緣。定向多圖是具有共享相同源和目標(biāo)頂點的潛在多個平行邊緣的有向圖。支持平行邊緣的能力簡化了在相同頂點之間可以有多個關(guān)系(例如: 同事和朋友)的建模場景。每個頂點都由唯一的64位長標(biāo)識符(VertexId)鍵入。 GraphX 不對頂點標(biāo)識符施加任何排序約束。類似地,邊緣具有對應(yīng)的源和目標(biāo)頂點標(biāo)識符。
屬性圖是通過 vertex (VD)和 edge (ED) 類型進行參數(shù)化的。這些是分別與每個頂點和邊緣相關(guān)聯(lián)的對象的類型。
當(dāng)它們是原始數(shù)據(jù)類型(例如: int ,double 等等)時,GraphX 優(yōu)化頂點和邊緣類型的表示,通過將其存儲在專門的數(shù)組中來減少內(nèi)存占用。
在某些情況下,可能希望在同一個圖形中具有不同屬性類型的頂點。這可以通過繼承來實現(xiàn)。例如,將用戶和產(chǎn)品建模為二分圖,我們可能會執(zhí)行以下操作:
classVertexProperty()caseclassUserProperty(valname:String)extendsVertexPropertycaseclassProductProperty(valname:String,valprice:Double)extendsVertexProperty// The graph might then have the type:vargraph:Graph[VertexProperty,String]=null
像 RDD 一樣,屬性圖是不可變的,分布式的和容錯的。通過生成具有所需更改的新圖形來完成對圖表的值或結(jié)構(gòu)的更改。請注意,原始圖形的大部分(即,未受影響的結(jié)構(gòu),屬性和索引)在新圖表中重復(fù)使用,可降低此內(nèi)在功能數(shù)據(jù)結(jié)構(gòu)的成本。使用一系列頂點分割啟發(fā)式方法,在執(zhí)行器之間劃分圖形。與 RDD 一樣,在發(fā)生故障的情況下,可以在不同的機器上重新創(chuàng)建圖形的每個分區(qū)。
邏輯上,屬性圖對應(yīng)于一對編碼每個頂點和邊緣的屬性的類型集合( RDD )。因此,圖類包含訪問圖形頂點和邊的成員:
classGraph[VD,ED]{valvertices:VertexRDD[VD]valedges:EdgeRDD[ED]}
VertexRDD[VD]和EdgeRDD[ED]分別擴展了RDD[(VertexId, VD)]和RDD[Edge[ED]]的優(yōu)化版本。VertexRDD[VD]和EdgeRDD[ED]都提供了圍繞圖形計算和利用內(nèi)部優(yōu)化的附加功能。 我們在頂點和邊緣 RDD部分更詳細地討論了VertexRDD和EdgeRDDAPI,但現(xiàn)在它們可以被認為是RDD[(VertexId, VD)]和RDD[Edge[ED]]的簡單 RDD。
假設(shè)我們要構(gòu)建一個由 GraphX 項目中的各種協(xié)作者組成的屬性圖。頂點屬性可能包含用戶名和職業(yè)。我們可以用描述協(xié)作者之間關(guān)系的字符串來注釋邊:

生成的圖形將具有類型簽名:
valuserGraph:Graph[(String,String),String]
從原始文件, RDD 甚至合成生成器構(gòu)建屬性圖有許多方法,這些在圖形構(gòu)建器的一節(jié)中有更詳細的討論 。最普遍的方法是使用Graph 對象。例如,以下代碼從 RDD 集合中構(gòu)建一個圖:
// Assume the SparkContext has already been constructedvalsc:SparkContext// Create an RDD for the verticesvalusers:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof"))))// Create an RDD for edgesvalrelationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi")))// Define a default user in case there are relationship with missing uservaldefaultUser=("John Doe","Missing")// Build the initial Graphvalgraph=Graph(users,relationships,defaultUser)
在上面的例子中,我們使用了Edgecase 類。邊緣具有srcId和dstId對應(yīng)于源和目標(biāo)頂點標(biāo)識符。此外,Edge該類有一個attr存儲邊緣屬性的成員。
我們可以分別使用graph.vertices和graph.edges成員將圖形解構(gòu)成相應(yīng)的頂點和邊緣視圖。
valgraph:Graph[(String,String),String]// Constructed from above// Count all users which are postdocsgraph.vertices.filter{case(id,(name,pos))=>pos=="postdoc"}.count// Count all the edges where src > dstgraph.edges.filter(e=>e.srcId>e.dstId).count
注意,graph.vertices返回一個VertexRDD[(String, String)]擴展RDD[(VertexId, (String, String))],所以我們使用 scalacase表達式來解構(gòu)元組。另一方面,graph.edges返回一個EdgeRDD包含Edge[String]對象。我們也可以使用 case 類型構(gòu)造函數(shù),如下所示:
graph.edges.filter{caseEdge(src,dst,prop)=>src>dst}.count
除了屬性圖的頂點和邊緣視圖之外, GraphX 還暴露了三元組視圖。三元組視圖邏輯上連接頂點和邊緣屬性,生成RDD[EdgeTriplet[VD, ED]]包含EdgeTriplet該類的實例。此 連接可以用以下SQL表達式表示:
SELECTsrc.id,dst.id,src.attr,e.attr,dst.attrFROMedgesASeLEFTJOINverticesASsrc,verticesASdstONe.srcId=src.IdANDe.dstId=dst.Id
或圖形為:

EdgeTriplet類通過分別添加包含源和目標(biāo)屬性的srcAttr和dstAttr成員來擴展Edge類。 我們可以使用圖形的三元組視圖來渲染描述用戶之間關(guān)系的字符串集合。
valgraph:Graph[(String,String),String]// Constructed from above// Use the triplets view to create an RDD of facts.valfacts:RDD[String]=graph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1)facts.collect.foreach(println(_))
正如 RDDs 有這樣的基本操作map,filter, 以及reduceByKey,性能圖表也有采取用戶定義的函數(shù)基本運算符的集合,產(chǎn)生具有轉(zhuǎn)化特性和結(jié)構(gòu)的新圖。定義了優(yōu)化實現(xiàn)的核心運算符,并定義了Graph表示為核心運算符組合的方便運算符GraphOps。不過,由于 Scala 的含義,操作員GraphOps可自動作為成員使用Graph。例如,我們可以通過以下方法計算每個頂點的入度(定義GraphOps):
valgraph:Graph[(String,String),String]// Use the implicit GraphOps.inDegrees operatorvalinDegrees:VertexRDD[Int]=graph.inDegrees
區(qū)分核心圖形操作的原因GraphOps是能夠在將來支持不同的圖形表示。每個圖形表示必須提供核心操作的實現(xiàn),并重用許多有用的操作GraphOps。
以下是兩個定義的功能的簡要摘要,但為簡單起見Graph,GraphOps它作為 Graph 的成員呈現(xiàn)。請注意,已經(jīng)簡化了一些功能簽名(例如,刪除了默認參數(shù)和類型約束),并且已經(jīng)刪除了一些更高級的功能,因此請參閱 API 文檔以獲取正式的操作列表。
/** Summary of the functionality in the property graph */classGraph[VD,ED]{// Information about the Graph ===================================================================valnumEdges:LongvalnumVertices:LongvalinDegrees:VertexRDD[Int]valoutDegrees:VertexRDD[Int]valdegrees:VertexRDD[Int]// Views of the graph as collections =============================================================valvertices:VertexRDD[VD]valedges:EdgeRDD[ED]valtriplets:RDD[EdgeTriplet[VD,ED]]// Functions for caching graphs ==================================================================defpersist(newLevel:StorageLevel=StorageLevel.MEMORY_ONLY):Graph[VD,ED]defcache():Graph[VD,ED]defunpersistVertices(blocking:Boolean=true):Graph[VD,ED]// Change the partitioning heuristic? ============================================================defpartitionBy(partitionStrategy:PartitionStrategy):Graph[VD,ED]// Transform vertex and edge attributes ==========================================================defmapVertices[VD2](map:(VertexId,VD)=>VD2):Graph[VD2,ED]defmapEdges[ED2](map:Edge[ED]=>ED2):Graph[VD,ED2]defmapEdges[ED2](map:(PartitionID,Iterator[Edge[ED]])=>Iterator[ED2]):Graph[VD,ED2]defmapTriplets[ED2](map:EdgeTriplet[VD,ED]=>ED2):Graph[VD,ED2]defmapTriplets[ED2](map:(PartitionID,Iterator[EdgeTriplet[VD,ED]])=>Iterator[ED2]):Graph[VD,ED2]// Modify the graph structure ====================================================================defreverse:Graph[VD,ED]defsubgraph(epred:EdgeTriplet[VD,ED]=>Boolean=(x=>true),vpred:(VertexId,VD)=>Boolean=((v,d)=>true)):Graph[VD,ED]defmask[VD2,ED2](other:Graph[VD2,ED2]):Graph[VD,ED]defgroupEdges(merge:(ED,ED)=>ED):Graph[VD,ED]// Join RDDs with the graph ======================================================================defjoinVertices[U](table:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,U)=>VD):Graph[VD,ED]defouterJoinVertices[U,VD2](other:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,Option[U])=>VD2):Graph[VD2,ED]// Aggregate information about adjacent triplets =================================================defcollectNeighborIds(edgeDirection:EdgeDirection):VertexRDD[Array[VertexId]]defcollectNeighbors(edgeDirection:EdgeDirection):VertexRDD[Array[(VertexId,VD)]]defaggregateMessages[Msg:ClassTag](sendMsg:EdgeContext[VD,ED,Msg]=>Unit,mergeMsg:(Msg,Msg)=>Msg,tripletFields:TripletFields=TripletFields.All):VertexRDD[A]// Iterative graph-parallel computation ==========================================================defpregel[A](initialMsg:A,maxIterations:Int,activeDirection:EdgeDirection)(vprog:(VertexId,VD,A)=>VD,sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],mergeMsg:(A,A)=>A):Graph[VD,ED]// Basic graph algorithms ========================================================================defpageRank(tol:Double,resetProb:Double=0.15):Graph[Double,Double]defconnectedComponents():Graph[VertexId,ED]deftriangleCount():Graph[Int,ED]defstronglyConnectedComponents(numIter:Int):Graph[VertexId,ED]}
與 RDDmap運算符一樣,屬性圖包含以下內(nèi)容:
classGraph[VD,ED]{defmapVertices[VD2](map:(VertexId,VD)=>VD2):Graph[VD2,ED]defmapEdges[ED2](map:Edge[ED]=>ED2):Graph[VD,ED2]defmapTriplets[ED2](map:EdgeTriplet[VD,ED]=>ED2):Graph[VD,ED2]}
這些運算符中的每一個產(chǎn)生一個新的圖形,其中頂點或邊緣屬性被用戶定義的map函數(shù)修改。
請注意,在每種情況下,圖形結(jié)構(gòu)都不受影響。這是這些運算符的一個關(guān)鍵特征,它允許生成的圖形重用原始圖形的結(jié)構(gòu)索引。以下代碼段在邏輯上是等效的,但是第一個代碼片段不保留結(jié)構(gòu)索引,并且不會從GraphX系統(tǒng)優(yōu)化中受益:
valnewVertices=graph.vertices.map{case(id,attr)=>(id,mapUdf(id,attr))}valnewGraph=Graph(newVertices,graph.edges)
而是mapVertices用來保存索引:
valnewGraph=graph.mapVertices((id,attr)=>mapUdf(id,attr))
這些運算符通常用于初始化特定計算或項目的圖形以避免不必要的屬性。例如,給出一個以度為頂點屬性的圖(我們稍后將描述如何構(gòu)建這樣一個圖),我們?yōu)镻ageRank初始化它:
// Given a graph where the vertex property is the out degreevalinputGraph:Graph[Int,String]=graph.outerJoinVertices(graph.outDegrees)((vid,_,degOpt)=>degOpt.getOrElse(0))// Construct a graph where each edge contains the weight// and each vertex is the initial PageRankvaloutputGraph:Graph[Double,Double]=inputGraph.mapTriplets(triplet=>1.0/triplet.srcAttr).mapVertices((id,_)=>1.0)
目前GraphX只支持一套簡單的常用結(jié)構(gòu)運算符,我們預(yù)計將來會增加更多。以下是基本結(jié)構(gòu)運算符的列表。
classGraph[VD,ED]{defreverse:Graph[VD,ED]defsubgraph(epred:EdgeTriplet[VD,ED]=>Boolean,vpred:(VertexId,VD)=>Boolean):Graph[VD,ED]defmask[VD2,ED2](other:Graph[VD2,ED2]):Graph[VD,ED]defgroupEdges(merge:(ED,ED)=>ED):Graph[VD,ED]}
該reverse運算符將返回逆轉(zhuǎn)的所有邊緣方向上的新圖。這在例如嘗試計算逆PageRank時是有用的。由于反向操作不會修改頂點或邊緣屬性或更改邊緣數(shù)量,因此可以在沒有數(shù)據(jù)移動或重復(fù)的情況下高效地實現(xiàn)。
在subgraph操作者需要的頂點和邊緣的謂詞,并返回包含只有滿足謂詞頂點的頂點的曲線圖(評估為真),并且滿足謂詞邊緣邊緣并連接滿足頂點謂詞頂點。所述subgraph操作員可在情況編號被用來限制圖形以頂點和感興趣的邊緣或消除斷開的鏈接。例如,在以下代碼中,我們刪除了斷開的鏈接:
// Create an RDD for the verticesvalusers:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof")),(4L,("peter","student"))))// Create an RDD for edgesvalrelationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi"),Edge(4L,0L,"student"),Edge(5L,0L,"colleague")))// Define a default user in case there are relationship with missing uservaldefaultUser=("John Doe","Missing")// Build the initial Graphvalgraph=Graph(users,relationships,defaultUser)// Notice that there is a user 0 (for which we have no information) connected to users// 4 (peter) and 5 (franklin).graph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1).collect.foreach(println(_))// Remove missing vertices as well as the edges to connected to themvalvalidGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="Missing")// The valid subgraph will disconnect users 4 and 5 by removing user 0validGraph.vertices.collect.foreach(println(_))validGraph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1).collect.foreach(println(_))
注意在上面的例子中只提供了頂點謂詞。 如果未提供頂點或邊緣謂詞,則subgraph運算符默認為true。
在mask操作者通過返回包含該頂點和邊,它們也在輸入圖形中發(fā)現(xiàn)的曲線構(gòu)造一個子圖。這可以與subgraph運算符一起使用, 以便根據(jù)另一個相關(guān)圖中的屬性限制圖形。例如,我們可以使用缺少頂點的圖運行連接的組件,然后將答案限制為有效的子圖。
// Run Connected ComponentsvalccGraph=graph.connectedComponents()// No longer contains missing field// Remove missing vertices as well as the edges to connected to themvalvalidGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="Missing")// Restrict the answer to the valid subgraphvalvalidCCGraph=ccGraph.mask(validGraph)
groupEdges操作符將多邊形中的平行邊(即,頂點對之間的重復(fù)邊)合并。 在許多數(shù)值應(yīng)用中,可以將平行邊緣(它們的權(quán)重組合)合并成單個邊緣,從而減小圖形的大小。
在許多情況下,有必要使用圖形連接來自外部收集( RDD )的數(shù)據(jù)。例如,我們可能有額外的用戶屬性,我們要與現(xiàn)有的圖形合并,或者我們可能希望將頂點屬性從一個圖形拉到另一個。這些任務(wù)可以使用join運算符完成。下面我們列出關(guān)鍵 join 運算符:
classGraph[VD,ED]{defjoinVertices[U](table:RDD[(VertexId,U)])(map:(VertexId,VD,U)=>VD):Graph[VD,ED]defouterJoinVertices[U,VD2](table:RDD[(VertexId,U)])(map:(VertexId,VD,Option[U])=>VD2):Graph[VD2,ED]}
joinVertices操作符將頂點與輸入 RDD 相連,并返回一個新的圖形,其中通過將用戶定義的map函數(shù)應(yīng)用于已連接頂點的結(jié)果而獲得的頂點屬性。 RDD 中沒有匹配值的頂點保留其原始值。
請注意,如果 RDD 包含給定頂點的多個值,則只能使用一個值。因此,建議使用以下命令使輸入 RDD 變得獨一無二,這也將對結(jié)果值進行pre-index,以顯著加速后續(xù)連接。
valnonUniqueCosts:RDD[(VertexId,Double)]valuniqueCosts:VertexRDD[Double]=graph.vertices.aggregateUsingIndex(nonUnique,(a,b)=>a+b)valjoinedGraph=graph.joinVertices(uniqueCosts)((id,oldCost,extraCost)=>oldCost+extraCost)
除了將用戶定義的map函數(shù)應(yīng)用于所有頂點并且可以更改頂點屬性類型之外,更一般的outerJoinVertices的行為類似于joinVertices。 因為不是所有的頂點都可能在輸入 RDD 中具有匹配的值,所以map函數(shù)采用Option類型。 例如,我們可以通過使用outDegree初始化頂點屬性來為 PageRank 設(shè)置一個圖。
valoutDegrees:VertexRDD[Int]=graph.outDegreesvaldegreeGraph=graph.outerJoinVertices(outDegrees){(id,oldAttr,outDegOpt)=>outDegOptmatch{caseSome(outDeg)=>outDegcaseNone=>0// No outDegree means zero outDegree}}
您可能已經(jīng)注意到上述示例中使用的多個參數(shù)列表(例如:f(a)(b)curried 函數(shù)模式。 雖然我們可以將f(a)(b)同樣地寫成f(a,b),這意味著b上的類型推斷不依賴于a。 因此,用戶需要為用戶定義的函數(shù)提供類型注釋:
valjoinedGraph=graph.joinVertices(uniqueCosts,(id:VertexId,oldCost:Double,extraCost:Double)=>oldCost+extraCost)
許多圖形分析任務(wù)的關(guān)鍵步驟是聚合關(guān)于每個頂點鄰域的信息。 例如,我們可能想知道每個用戶擁有的關(guān)注者數(shù)量或每個用戶的追隨者的平均年齡。 許多迭代圖表算法(例如:網(wǎng)頁級別,最短路徑,以及連接成分)相鄰頂點(例如:電流值的 PageRank ,最短到源路徑,和最小可達頂點 ID )的重復(fù)聚合性質(zhì)。
為了提高性能,主聚合操作員graph.mapReduceTriplets從新的更改graph.AggregateMessages。雖然 API 的變化相對較小,但我們在下面提供了一個轉(zhuǎn)換指南。
GraphX 中的核心聚合操作是aggregateMessages。該運算符將用戶定義的sendMsg函數(shù)應(yīng)用于圖中的每個邊緣三元組,然后使用該mergeMsg函數(shù)在其目標(biāo)頂點聚合這些消息。
classGraph[VD,ED]{defaggregateMessages[Msg:ClassTag](sendMsg:EdgeContext[VD,ED,Msg]=>Unit,mergeMsg:(Msg,Msg)=>Msg,tripletFields:TripletFields=TripletFields.All):VertexRDD[Msg]}
用戶定義的sendMsg函數(shù)接受一個EdgeContext,它將源和目標(biāo)屬性以及 edge 屬性和函數(shù) (sendToSrc, 和sendToDst) 一起發(fā)送到源和目標(biāo)屬性。 在 map-reduce 中,將sendMsg作為map函數(shù)。 用戶定義的mergeMsg函數(shù)需要兩個發(fā)往同一頂點的消息,并產(chǎn)生一條消息。 想想mergeMsg是 map-reduce 中的reduce函數(shù)。aggregateMessages運算符返回一個VertexRDD[Msg],其中包含去往每個頂點的聚合消息(Msg類型)。 沒有收到消息的頂點不包括在返回的VertexRDDVertexRDD中。
另外,aggregateMessages采用一個可選的tripletsFields,它們指示在EdgeContext中訪問哪些數(shù)據(jù)(即源頂點屬性,而不是目標(biāo)頂點屬性)。tripletsFields定義的可能選項,TripletFields默認值是TripletFields.All指示用戶定義的sendMsg函數(shù)可以訪問的任何字段EdgeContext。該tripletFields參數(shù)可用于通知 GraphX ,只有部分EdgeContext需要允許 GraphX 選擇優(yōu)化的連接策略。例如,如果我們計算每個用戶的追隨者的平均年齡,我們只需要源字段,因此我們將用于TripletFields.Src表示我們只需要源字段。
在早期版本的 GraphX 中,我們使用字節(jié)碼檢測來推斷,TripletFields但是我們發(fā)現(xiàn)字節(jié)碼檢查稍微不可靠,而是選擇了更明確的用戶控制。
在下面的例子中,我們使用aggregateMessages運算符來計算每個用戶的資深追蹤者的平均年齡。
importorg.apache.spark.graphx.{Graph,VertexRDD}importorg.apache.spark.graphx.util.GraphGenerators// Create a graph with "age" as the vertex property.// Here we use a random graph for simplicity.valgraph:Graph[Double,Int]=GraphGenerators.logNormalGraph(sc,numVertices=100).mapVertices((id,_)=>id.toDouble)// Compute the number of older followers and their total agevalolderFollowers:VertexRDD[(Int,Double)]=graph.aggregateMessages[(Int,Double)](triplet=>{// Map Functionif(triplet.srcAttr>triplet.dstAttr){// Send message to destination vertex containing counter and agetriplet.sendToDst(1,triplet.srcAttr)}},// Add counter and age(a,b)=>(a._1+b._1,a._2+b._2)// Reduce Function)// Divide total age by number of older followers to get average age of older followersvalavgAgeOfOlderFollowers:VertexRDD[Double]=olderFollowers.mapValues((id,value)=>valuematch{case(count,totalAge)=>totalAge/count})// Display the resultsavgAgeOfOlderFollowers.collect.foreach(println(_))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala" in the Spark repo.
aggregateMessages當(dāng)消息(和消息的總和)是恒定大小(例如:浮動和加法而不是列表和級聯(lián))時,該操作最佳地執(zhí)行。
Map Reduce Triplets Transition Guide (Legacy)
在早期版本的 GraphX 中,鄰域聚合是使用mapReduceTriplets運算符完成的 :
classGraph[VD,ED]{defmapReduceTriplets[Msg](map:EdgeTriplet[VD,ED]=>Iterator[(VertexId,Msg)],reduce:(Msg,Msg)=>Msg):VertexRDD[Msg]}
mapReduceTriplets操作符接受用戶定義的映射函數(shù),該函數(shù)應(yīng)用于每個三元組,并且可以使用用戶定義的縮減函數(shù)來生成聚合的消息。 然而,我們發(fā)現(xiàn)返回的迭代器的用戶是昂貴的,并且它阻止了我們應(yīng)用其他優(yōu)化(例如:局部頂點重新編號)的能力。 在aggregateMessages中,我們引入了 EdgeContext ,它暴露了三元組字段,并且還顯示了向源和目標(biāo)頂點發(fā)送消息的功能。 此外,我們刪除了字節(jié)碼檢查,而是要求用戶指出三元組中實際需要哪些字段。
以下代碼塊使用mapReduceTriplets:
valgraph:Graph[Int,Float]=...defmsgFun(triplet:Triplet[Int,Float]):Iterator[(Int,String)]={Iterator((triplet.dstId,"Hi"))}defreduceFun(a:String,b:String):String=a+" "+bvalresult=graph.mapReduceTriplets[String](msgFun,reduceFun)
可以使用aggregateMessages:?
valgraph:Graph[Int,Float]=...defmsgFun(triplet:EdgeContext[Int,Float,String]){triplet.sendToDst("Hi")}defreduceFun(a:String,b:String):String=a+" "+bvalresult=graph.aggregateMessages[String](msgFun,reduceFun)
常見的聚合任務(wù)是計算每個頂點的程度:與每個頂點相鄰的邊數(shù)。在有向圖的上下文中,通常需要知道每個頂點的度數(shù),外部程度和總程度。本GraphOps類包含運營商計算度數(shù)每個頂點的集合。例如,在下面我們將計算最大值,最大和最大級別:
// Define a reduce operation to compute the highest degree vertexdefmax(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={if(a._2>b._2)aelseb}// Compute the max degreesvalmaxInDegree:(VertexId,Int)=graph.inDegrees.reduce(max)valmaxOutDegree:(VertexId,Int)=graph.outDegrees.reduce(max)valmaxDegrees:(VertexId,Int)=graph.degrees.reduce(max)
在某些情況下,通過在每個頂點處收集相鄰頂點及其屬性可以更容易地表達計算。這可以使用collectNeighborIds和collectNeighbors運算符輕松實現(xiàn) 。
classGraphOps[VD,ED]{defcollectNeighborIds(edgeDirection:EdgeDirection):VertexRDD[Array[VertexId]]defcollectNeighbors(edgeDirection:EdgeDirection):VertexRDD[Array[(VertexId,VD)]]}
這些操作可能相當(dāng)昂貴,因為它們重復(fù)信息并需要大量通信。 如果可能,請直接使用aggregateMessages操作來表達相同的計算。
在 Spark 中,默認情況下,RDD 不會保留在內(nèi)存中。為了避免重新計算,在多次使用它們時,必須明確緩存它們(參見Spark Programming Guide)。GraphX 中的圖形表現(xiàn)方式相同。當(dāng)多次使用圖表時,請務(wù)必先調(diào)用Graph.cache()。
在迭代計算中,uncaching也可能是最佳性能所必需的。默認情況下,緩存的 RDD 和圖形將保留在內(nèi)存中,直到內(nèi)存壓力迫使它們以 LRU 順序逐出。對于迭代計算,來自先前迭代的中間結(jié)果將填滿緩存。雖然它們最終被驅(qū)逐出來,但存儲在內(nèi)存中的不必要的數(shù)據(jù)會減慢垃圾收集速度。一旦不再需要中間結(jié)果,就會更有效率。這涉及每次迭代實現(xiàn)(緩存和強制)圖形或 RDD ,取消所有其他數(shù)據(jù)集,并且僅在將來的迭代中使用實例化數(shù)據(jù)集。然而,由于圖形由多個 RDD 組成,所以很難將它們正確地分開。對于迭代計算,我們建議使用Pregel API,它可以正確地解析中間結(jié)果。
圖形是固有的遞歸數(shù)據(jù)結(jié)構(gòu),因為頂點的屬性取決于其鄰居的屬性,而鄰居的屬性又依賴于其鄰居的屬性。因此,許多重要的圖算法迭代地重新計算每個頂點的屬性,直到達到一個固定點條件。已經(jīng)提出了一系列圖并行抽象來表達這些迭代算法。GraphX 公開了 Pregel API 的變體。
在高層次上,GraphX 中的 Pregel 運算符是限制到圖形拓撲的批量同步并行消息抽象。 Pregel 操作符在一系列超級步驟中執(zhí)行,其中頂點接收來自先前超級步驟的入站消息的總和,計算頂點屬性的新值,然后在下一個超級步驟中將消息發(fā)送到相鄰頂點。與 Pregel 不同,消息作為邊緣三元組的函數(shù)并行計算,消息計算可以訪問源和目標(biāo)頂點屬性。在超級步驟中跳過不接收消息的頂點。 Pregel 運算符終止迭代,并在沒有剩余的消息時返回最終的圖。
注意,與更多的標(biāo)準(zhǔn) Pregel 實現(xiàn)不同,GraphX 中的頂點只能將消息發(fā)送到相鄰頂點,并且使用用戶定義的消息傳遞功能并行完成消息構(gòu)造。這些約束允許在 GraphX 中進行額外優(yōu)化。
以下是Pregel 運算符的類型簽名以及 其實現(xiàn)的草圖(注意:為了避免由于長譜系鏈引起的 stackOverflowError , pregel 支持周期性檢查點圖和消息,將 “spark.graphx.pregel.checkpointInterval” 設(shè)置為正數(shù),說10。并使用 SparkContext.setCheckpointDir(directory: String)) 設(shè)置 checkpoint 目錄):
classGraphOps[VD,ED]{defpregel[A](initialMsg:A,maxIter:Int=Int.MaxValue,activeDir:EdgeDirection=EdgeDirection.Out)(vprog:(VertexId,VD,A)=>VD,sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],mergeMsg:(A,A)=>A):Graph[VD,ED]={// Receive the initial message at each vertexvarg=mapVertices((vid,vdata)=>vprog(vid,vdata,initialMsg)).cache()// compute the messagesvarmessages=g.mapReduceTriplets(sendMsg,mergeMsg)varactiveMessages=messages.count()// Loop until no messages remain or maxIterations is achievedvari=0while(activeMessages>0&&i
請注意,Pregel 需要兩個參數(shù)列表(即:graph.pregel(list1)(list2)。第一個參數(shù)列表包含配置參數(shù),包括初始消息,最大迭代次數(shù)以及發(fā)送消息的邊緣方向(默認情況下為邊緣)。第二個參數(shù)列表包含用于接收消息(頂點程序vprog),計算消息(sendMsg)和組合消息的用戶定義函數(shù)mergeMsg。
在以下示例中,我們可以使用 Pregel 運算符來表達單源最短路徑的計算。
importorg.apache.spark.graphx.{Graph,VertexId}importorg.apache.spark.graphx.util.GraphGenerators// A graph with edge attributes containing distancesvalgraph:Graph[Long,Double]=GraphGenerators.logNormalGraph(sc,numVertices=100).mapEdges(e=>e.attr.toDouble)valsourceId:VertexId=42// The ultimate source// Initialize the graph such that all vertices except the root have distance infinity.valinitialGraph=graph.mapVertices((id,_)=>if(id==sourceId)0.0elseDouble.PositiveInfinity)valsssp=initialGraph.pregel(Double.PositiveInfinity)((id,dist,newDist)=>math.min(dist,newDist),// Vertex Programtriplet=>{// Send Messageif(triplet.srcAttr+triplet.attrmath.min(a,b)// Merge Message)println(sssp.vertices.collect.mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala" in the Spark repo.
GraphX 提供了從 RDD 或磁盤上的頂點和邊的集合構(gòu)建圖形的幾種方法。默認情況下,圖形構(gòu)建器都不會重新分配圖形邊; 相反,邊緣保留在其默認分區(qū)(例如 HDFS 中的原始塊)中。Graph.groupEdges需要重新分區(qū)圖,因為它假定相同的邊將被共同定位在同一分區(qū)上,因此您必須在調(diào)用Graph.partitionBy之前調(diào)用groupEdges。
objectGraphLoader{defedgeListFile(sc:SparkContext,path:String,canonicalOrientation:Boolean=false,minEdgePartitions:Int=1):Graph[Int,Int]}
GraphLoader.edgeListFile提供了從磁盤邊緣列表中加載圖形的方法。它解析以下形式的(源頂點 ID ,目標(biāo)頂點 ID )對的鄰接列表,跳過以下開始的注釋行#:
# This is a comment
2 1
4 1
1 2
它Graph從指定的邊緣創(chuàng)建一個,自動創(chuàng)建邊緣提到的任何頂點。所有頂點和邊緣屬性默認為1.canonicalOrientation參數(shù)允許在正方向 (srcId < dstId) 重新定向邊,這是連接的組件算法所要求的。該minEdgePartitions參數(shù)指定要生成的邊緣分區(qū)的最小數(shù)量; 如果例如 HDFS 文件具有更多塊,則可能存在比指定更多的邊緣分區(qū)。
objectGraph{defapply[VD,ED](vertices:RDD[(VertexId,VD)],edges:RDD[Edge[ED]],defaultVertexAttr:VD=null):Graph[VD,ED]deffromEdges[VD,ED](edges:RDD[Edge[ED]],defaultValue:VD):Graph[VD,ED]deffromEdgeTuples[VD](rawEdges:RDD[(VertexId,VertexId)],defaultValue:VD,uniqueEdges:Option[PartitionStrategy]=None):Graph[VD,Int]}
Graph.apply允許從頂點和邊緣的 RDD 創(chuàng)建圖形。重復(fù)的頂點被任意挑選,并且邊緣 RDD 中找到的頂點,而不是頂點 RDD 被分配了默認屬性。
Graph.fromEdges允許僅從 RDD 的邊緣創(chuàng)建圖形,自動創(chuàng)建邊緣提到的任何頂點并將其分配給默認值。
Graph.fromEdgeTuples允許僅從邊緣元組的 RDD 創(chuàng)建圖形,將邊緣分配為值1,并自動創(chuàng)建邊緣提到的任何頂點并將其分配給默認值。 它還支持重復(fù)數(shù)據(jù)刪除邊緣; 重復(fù)數(shù)據(jù)刪除,將某些PartitionStrategy作為uniqueEdges參數(shù)傳遞(例如:uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。 分區(qū)策略是必須的,以便在相同的分區(qū)上共同使用相同的邊,以便可以進行重復(fù)數(shù)據(jù)刪除。
GraphX 公開RDD了圖中存儲的頂點和邊的視圖。然而,由于 GraphX 在優(yōu)化的數(shù)據(jù)結(jié)構(gòu)中維護頂點和邊,并且這些數(shù)據(jù)結(jié)構(gòu)提供了附加功能,所以頂點和邊分別作為VertexRDD和EdgeRDD返回 。在本節(jié)中,我們將回顧一些這些類型中的其他有用功能。請注意,這只是一個不完整的列表,請參閱API文檔中的正式操作列表。
該VertexRDD[A]擴展RDD[(VertexId, A)]并增加了額外的限制,每個VertexId只發(fā)生一次。此外,VertexRDD[A]表示一組頂點,每個頂點的屬性類型A。在內(nèi)部,這是通過將頂點屬性存儲在可重用的散列圖數(shù)據(jù)結(jié)構(gòu)中來實現(xiàn)的。因此,如果兩個VertexRDD派生自相同的基礎(chǔ)VertexRDD(例如:filter或mapValues),則可以在不使用散列評估的情況下連續(xù)連接。為了利用這個索引的數(shù)據(jù)結(jié)構(gòu),VertexRDD公開了以下附加功能:
classVertexRDD[VD]extendsRDD[(VertexId,VD)]{// Filter the vertex set but preserves the internal indexdeffilter(pred:Tuple2[VertexId,VD]=>Boolean):VertexRDD[VD]// Transform the values without changing the ids (preserves the internal index)defmapValues[VD2](map:VD=>VD2):VertexRDD[VD2]defmapValues[VD2](map:(VertexId,VD)=>VD2):VertexRDD[VD2]// Show only vertices unique to this set based on their VertexId'sdefminus(other:RDD[(VertexId,VD)])// Remove vertices from this set that appear in the other setdefdiff(other:VertexRDD[VD]):VertexRDD[VD]// Join operators that take advantage of the internal indexing to accelerate joins (substantially)defleftJoin[VD2,VD3](other:RDD[(VertexId,VD2)])(f:(VertexId,VD,Option[VD2])=>VD3):VertexRDD[VD3]definnerJoin[U,VD2](other:RDD[(VertexId,U)])(f:(VertexId,VD,U)=>VD2):VertexRDD[VD2]// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.defaggregateUsingIndex[VD2](other:RDD[(VertexId,VD2)],reduceFunc:(VD2,VD2)=>VD2):VertexRDD[VD2]}
請注意,例如,filter運算符 如何返回VertexRDD。過濾器實際上是通過BitSet使用索引重新實現(xiàn)的,并保留與其他VertexRDD進行快速連接的能力。同樣,mapValues運算符不允許map功能改變,VertexId從而使相同的HashMap數(shù)據(jù)結(jié)構(gòu)能夠被重用。無論是leftJoin和innerJoin能夠連接兩個時識別VertexRDD來自同一來源的小號HashMap和落實線性掃描,而不是昂貴的點查找的加入。
aggregateUsingIndex運算符對于從RDD[(VertexId, A)]有效構(gòu)建新的VertexRDD非常有用。 在概念上,如果我在一組頂點上構(gòu)造了一個VertexRDD[B],這是一些RDD[(VertexId, A)]中的頂點的超集,那么我可以重用索引來聚合然后再索引RDD[(VertexId, A)]。 例如:
valsetA:VertexRDD[Int]=VertexRDD(sc.parallelize(0Luntil100L).map(id=>(id,1)))valrddB:RDD[(VertexId,Double)]=sc.parallelize(0Luntil100L).flatMap(id=>List((id,1.0),(id,2.0)))// There should be 200 entries in rddBrddB.countvalsetB:VertexRDD[Double]=setA.aggregateUsingIndex(rddB,_+_)// There should be 100 entries in setBsetB.count// Joining A and B should now be fast!valsetC:VertexRDD[Double]=setA.innerJoin(setB)((id,a,b)=>a+b)
該EdgeRDD[ED]擴展RDD[Edge[ED]]使用其中定義的各種分區(qū)策略之一來組織塊中的邊PartitionStrategy。在每個分區(qū)中,邊緣屬性和鄰接結(jié)構(gòu)分別存儲,可以在更改屬性值時進行最大限度的重用。
EdgeRDDEdgeRDD公開的三個附加功能是:
// Transform the edge attributes while preserving the structuredefmapValues[ED2](f:Edge[ED]=>ED2):EdgeRDD[ED2]// Reverse the edges reusing both attributes and structuredefreverse:EdgeRDD[ED]// Join two `EdgeRDD`s partitioned using the same partitioning strategy.definnerJoin[ED2,ED3](other:EdgeRDD[ED2])(f:(VertexId,VertexId,ED,ED2)=>ED3):EdgeRDD[ED3]
在大多數(shù)應(yīng)用中,我們發(fā)現(xiàn)在EdgeRDDEdgeRDD上的操作是通過圖形運算符完成的,或者依賴基RDD類中定義的操作。
雖然在分布式圖形的 GraphX 表示中使用的優(yōu)化的詳細描述超出了本指南的范圍,但一些高級理解可能有助于可擴展算法的設(shè)計以及 API 的最佳使用。GraphX 采用頂點切分方式進行分布式圖分割:

GraphX 不是沿著邊沿分割圖形,而是沿著頂點分割圖形,這可以減少通信和存儲開銷。在邏輯上,這對應(yīng)于將邊緣分配給機器并允許頂點跨越多臺機器。分配邊緣的確切方法取決于PartitionStrategy各種啟發(fā)式的幾種折衷。用戶可以通過與Graph.partitionBy運算符重新分區(qū)圖來選擇不同的策略。默認分區(qū)策略是使用圖形構(gòu)建中提供的邊的初始分區(qū)。然而,用戶可以輕松切換到 GraphX 中包含的 2D 劃分或其他啟發(fā)式算法。

一旦邊緣被劃分,高效的圖形并行計算的關(guān)鍵挑戰(zhàn)就是有效地將頂點屬性與邊緣連接起來。因為真實世界的圖形通常具有比頂點更多的邊緣,所以我們將頂點屬性移動到邊緣。因為不是所有的分區(qū)都將包含鄰近的所有頂點的邊緣,我們內(nèi)部維護標(biāo)識在哪里執(zhí)行所需的連接像操作時,廣播頂點的路由表triplets和aggregateMessages。
GraphX 包括一組簡化分析任務(wù)的圖算法。該算法被包含在org.apache.spark.graphx.lib包可直接作為方法來訪問Graph通過GraphOps。本節(jié)介紹算法及其使用方法。
PageRank 測量在圖中每個頂點的重要性,假設(shè)從邊緣u到v表示的認可v通過的重要性u。例如,如果 Twitter 用戶遵循許多其他用戶,則用戶將被高度排名。
GraphX 附帶了 PageRank 的靜態(tài)和動態(tài)實現(xiàn)方法作PageRank 對象上的方法。靜態(tài) PageRank 運行固定次數(shù)的迭代,而動態(tài) PageRank 運行直到排列收斂(即,停止改變超過指定的公差)。GraphOps允許直接調(diào)用這些算法作為方法Graph。
GraphX還包括一個可以運行 PageRank 的社交網(wǎng)絡(luò)數(shù)據(jù)集示例。給出了一組用戶data/graphx/users.txt,并給出了一組用戶之間的關(guān)系data/graphx/followers.txt。我們計算每個用戶的 PageRank 如下:
importorg.apache.spark.graphx.GraphLoader// Load the edges as a graphvalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Run PageRankvalranks=graph.pageRank(0.0001).vertices// Join the ranks with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valranksByUsername=users.join(ranks).map{case(id,(username,rank))=>(username,rank)}// Print the resultprintln(ranksByUsername.collect().mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala" in the Spark repo.
連接的組件算法將圖中每個連接的組件與其最低編號頂點的ID進行標(biāo)記。例如,在社交網(wǎng)絡(luò)中,連接的組件可以近似群集。GraphX包含ConnectedComponents object中算法的實現(xiàn),我們從PageRank 部分計算示例社交網(wǎng)絡(luò)數(shù)據(jù)集的連接組件如下:
importorg.apache.spark.graphx.GraphLoader// Load the graph as in the PageRank examplevalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Find the connected componentsvalcc=graph.connectedComponents().vertices// Join the connected components with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valccByUsername=users.join(cc).map{case(id,(username,cc))=>(username,cc)}// Print the resultprintln(ccByUsername.collect().mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala" in the Spark repo.
頂點是三角形的一部分,當(dāng)它有兩個相鄰的頂點之間有一個邊。GraphX 在TriangleCount 對象中實現(xiàn)一個三角計數(shù)算法,用于確定通過每個頂點的三角形數(shù)量,提供聚類度量。我們從PageRank 部分計算社交網(wǎng)絡(luò)數(shù)據(jù)集的三角形數(shù)。需要注意的是TriangleCount邊緣要處于規(guī)范方向 (srcId < dstId),而圖形要使用Graph.partitionBy。
importorg.apache.spark.graphx.{GraphLoader,PartitionStrategy}// Load the edges in canonical order and partition the graph for triangle countvalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt",true).partitionBy(PartitionStrategy.RandomVertexCut)// Find the triangle count for each vertexvaltriCounts=graph.triangleCount().vertices// Join the triangle counts with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valtriCountByUsername=users.join(triCounts).map{case(id,(username,tc))=>(username,tc)}// Print the resultprintln(triCountByUsername.collect().mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala" in the Spark repo.
假設(shè)我想從一些文本文件中構(gòu)建圖形,將圖形限制為重要的關(guān)系和用戶,在 sub-graph 上運行 page-rank ,然后返回與頂級用戶關(guān)聯(lián)的屬性。我可以用 GraphX 在幾行內(nèi)完成所有這些:
importorg.apache.spark.graphx.GraphLoader// Load my user data and parse into tuples of user id and attribute listvalusers=(sc.textFile("data/graphx/users.txt").map(line=>line.split(",")).map(parts=>(parts.head.toLong,parts.tail)))// Parse the edge data which is already in userId -> userId formatvalfollowerGraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Attach the user attributesvalgraph=followerGraph.outerJoinVertices(users){case(uid,deg,Some(attrList))=>attrList// Some users may not have attributes so we set them as emptycase(uid,deg,None)=>Array.empty[String]}// Restrict the graph to users with usernames and namesvalsubgraph=graph.subgraph(vpred=(vid,attr)=>attr.size==2)// Compute the PageRankvalpagerankGraph=subgraph.pageRank(0.001)// Get the attributes of the top pagerank usersvaluserInfoWithPageRank=subgraph.outerJoinVertices(pagerankGraph.vertices){case(uid,attrList,Some(pr))=>(pr,attrList.toList)case(uid,attrList,None)=>(0.0,attrList.toList)}println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala" in the Spark repo.

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/graphx-programming-guide.html
網(wǎng)頁地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(覺得不錯麻煩給個 Star,謝謝!~)