Apache Spark 2.2.0 中文文檔 - GraphX Programming Guide | ApacheCN

GraphX Programming Guide

概述

入門

屬性 Graph

示例屬性 Graph

Graph 運(yùn)算符

運(yùn)算符的匯總表

Property 運(yùn)算符

Structural 運(yùn)算符

Join 運(yùn)算符

鄰域聚合

聚合消息 (aggregateMessages)

Map Reduce Triplets Transition Guide (Legacy)

計(jì)算級(jí)別信息

收集相鄰點(diǎn)

Caching and Uncaching

Pregel API

Graph 建造者

Vertex and Edge RDDs

VertexRDDs

EdgeRDDs

優(yōu)化表示

Graph 算法

PageRank

連接組件

Triangle 計(jì)數(shù)

示例

概述

GraphX 是 Spark 中用于圖形和圖形并行計(jì)算的新組件。在高層次上, GraphX 通過(guò)引入一個(gè)新的圖形抽象來(lái)擴(kuò)展 SparkRDD:一種具有附加到每個(gè)頂點(diǎn)和邊緣的屬性的定向多重圖形。為了支持圖形計(jì)算,GraphX 公開了一組基本運(yùn)算符(例如:subgraph,joinVerticesaggregateMessages)以及PregelAPI 的優(yōu)化變體。此外,GraphX 還包括越來(lái)越多的圖形算法構(gòu)建器,以簡(jiǎn)化圖形分析任務(wù)。

入門

首先需要將 Spark 和 GraphX 導(dǎo)入到項(xiàng)目中,如下所示:

importorg.apache.spark._importorg.apache.spark.graphx._// To make some of the examples work we will also need RDDimportorg.apache.spark.rdd.RDD

如果您不使用 Spark 外殼,您還需要一個(gè)SparkContext。要了解有關(guān) Spark 入門的更多信息,請(qǐng)參考Spark快速入門指南

屬性 Graph

屬性 Graph是一個(gè)定向多重圖形,用戶定義的對(duì)象附加到每個(gè)頂點(diǎn)和邊緣。定向多圖是具有共享相同源和目標(biāo)頂點(diǎn)的潛在多個(gè)平行邊緣的有向圖。支持平行邊緣的能力簡(jiǎn)化了在相同頂點(diǎn)之間可以有多個(gè)關(guān)系(例如: 同事和朋友)的建模場(chǎng)景。每個(gè)頂點(diǎn)都由唯一的64位長(zhǎng)標(biāo)識(shí)符(VertexId)鍵入。 GraphX 不對(duì)頂點(diǎn)標(biāo)識(shí)符施加任何排序約束。類似地,邊緣具有對(duì)應(yīng)的源和目標(biāo)頂點(diǎn)標(biāo)識(shí)符。

屬性圖是通過(guò) vertex (VD)和 edge (ED) 類型進(jìn)行參數(shù)化的。這些是分別與每個(gè)頂點(diǎn)和邊緣相關(guān)聯(lián)的對(duì)象的類型。

當(dāng)它們是原始數(shù)據(jù)類型(例如: int ,double 等等)時(shí),GraphX 優(yōu)化頂點(diǎn)和邊緣類型的表示,通過(guò)將其存儲(chǔ)在專門的數(shù)組中來(lái)減少內(nèi)存占用。

在某些情況下,可能希望在同一個(gè)圖形中具有不同屬性類型的頂點(diǎn)。這可以通過(guò)繼承來(lái)實(shí)現(xiàn)。例如,將用戶和產(chǎn)品建模為二分圖,我們可能會(huì)執(zhí)行以下操作:

classVertexProperty()caseclassUserProperty(valname:String)extendsVertexPropertycaseclassProductProperty(valname:String,valprice:Double)extendsVertexProperty// The graph might then have the type:vargraph:Graph[VertexProperty,String]=null

像 RDD 一樣,屬性圖是不可變的,分布式的和容錯(cuò)的。通過(guò)生成具有所需更改的新圖形來(lái)完成對(duì)圖表的值或結(jié)構(gòu)的更改。請(qǐng)注意,原始圖形的大部分(即,未受影響的結(jié)構(gòu),屬性和索引)在新圖表中重復(fù)使用,可降低此內(nèi)在功能數(shù)據(jù)結(jié)構(gòu)的成本。使用一系列頂點(diǎn)分割啟發(fā)式方法,在執(zhí)行器之間劃分圖形。與 RDD 一樣,在發(fā)生故障的情況下,可以在不同的機(jī)器上重新創(chuàng)建圖形的每個(gè)分區(qū)。

邏輯上,屬性圖對(duì)應(yīng)于一對(duì)編碼每個(gè)頂點(diǎn)和邊緣的屬性的類型集合( RDD )。因此,圖類包含訪問(wèn)圖形頂點(diǎn)和邊的成員:

classGraph[VD,ED]{valvertices:VertexRDD[VD]valedges:EdgeRDD[ED]}

VertexRDD[VD]和EdgeRDD[ED]分別擴(kuò)展了RDD[(VertexId, VD)]和RDD[Edge[ED]]的優(yōu)化版本。VertexRDD[VD]和EdgeRDD[ED]都提供了圍繞圖形計(jì)算和利用內(nèi)部?jī)?yōu)化的附加功能。 我們?cè)?a target="_blank" rel="nofollow">頂點(diǎn)和邊緣 RDD部分更詳細(xì)地討論了VertexRDDEdgeRDDAPI,但現(xiàn)在它們可以被認(rèn)為是RDD[(VertexId, VD)]和RDD[Edge[ED]]的簡(jiǎn)單 RDD。

示例屬性 Graph

假設(shè)我們要構(gòu)建一個(gè)由 GraphX 項(xiàng)目中的各種協(xié)作者組成的屬性圖。頂點(diǎn)屬性可能包含用戶名和職業(yè)。我們可以用描述協(xié)作者之間關(guān)系的字符串來(lái)注釋邊:

生成的圖形將具有類型簽名:

valuserGraph:Graph[(String,String),String]

從原始文件, RDD 甚至合成生成器構(gòu)建屬性圖有許多方法,這些在圖形構(gòu)建器的一節(jié)中有更詳細(xì)的討論 。最普遍的方法是使用Graph 對(duì)象。例如,以下代碼從 RDD 集合中構(gòu)建一個(gè)圖:

// Assume the SparkContext has already been constructedvalsc:SparkContext// Create an RDD for the verticesvalusers:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof"))))// Create an RDD for edgesvalrelationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi")))// Define a default user in case there are relationship with missing uservaldefaultUser=("John Doe","Missing")// Build the initial Graphvalgraph=Graph(users,relationships,defaultUser)

在上面的例子中,我們使用了Edgecase 類。邊緣具有srcId和dstId對(duì)應(yīng)于源和目標(biāo)頂點(diǎn)標(biāo)識(shí)符。此外,Edge該類有一個(gè)attr存儲(chǔ)邊緣屬性的成員。

我們可以分別使用graph.vertices和graph.edges成員將圖形解構(gòu)成相應(yīng)的頂點(diǎn)和邊緣視圖。

valgraph:Graph[(String,String),String]// Constructed from above// Count all users which are postdocsgraph.vertices.filter{case(id,(name,pos))=>pos=="postdoc"}.count// Count all the edges where src > dstgraph.edges.filter(e=>e.srcId>e.dstId).count

注意,graph.vertices返回一個(gè)VertexRDD[(String, String)]擴(kuò)展RDD[(VertexId, (String, String))],所以我們使用 scalacase表達(dá)式來(lái)解構(gòu)元組。另一方面,graph.edges返回一個(gè)EdgeRDD包含Edge[String]對(duì)象。我們也可以使用 case 類型構(gòu)造函數(shù),如下所示:

graph.edges.filter{caseEdge(src,dst,prop)=>src>dst}.count

除了屬性圖的頂點(diǎn)和邊緣視圖之外, GraphX 還暴露了三元組視圖。三元組視圖邏輯上連接頂點(diǎn)和邊緣屬性,生成RDD[EdgeTriplet[VD, ED]]包含EdgeTriplet該類的實(shí)例。此 連接可以用以下SQL表達(dá)式表示:

SELECTsrc.id,dst.id,src.attr,e.attr,dst.attrFROMedgesASeLEFTJOINverticesASsrc,verticesASdstONe.srcId=src.IdANDe.dstId=dst.Id

或圖形為:

EdgeTriplet類通過(guò)分別添加包含源和目標(biāo)屬性的srcAttr和dstAttr成員來(lái)擴(kuò)展Edge類。 我們可以使用圖形的三元組視圖來(lái)渲染描述用戶之間關(guān)系的字符串集合。

valgraph:Graph[(String,String),String]// Constructed from above// Use the triplets view to create an RDD of facts.valfacts:RDD[String]=graph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1)facts.collect.foreach(println(_))

Graph 運(yùn)算符

正如 RDDs 有這樣的基本操作map,filter, 以及reduceByKey,性能圖表也有采取用戶定義的函數(shù)基本運(yùn)算符的集合,產(chǎn)生具有轉(zhuǎn)化特性和結(jié)構(gòu)的新圖。定義了優(yōu)化實(shí)現(xiàn)的核心運(yùn)算符,并定義了Graph表示為核心運(yùn)算符組合的方便運(yùn)算符GraphOps。不過(guò),由于 Scala 的含義,操作員GraphOps可自動(dòng)作為成員使用Graph。例如,我們可以通過(guò)以下方法計(jì)算每個(gè)頂點(diǎn)的入度(定義GraphOps):

valgraph:Graph[(String,String),String]// Use the implicit GraphOps.inDegrees operatorvalinDegrees:VertexRDD[Int]=graph.inDegrees

區(qū)分核心圖形操作的原因GraphOps是能夠在將來(lái)支持不同的圖形表示。每個(gè)圖形表示必須提供核心操作的實(shí)現(xiàn),并重用許多有用的操作GraphOps。

運(yùn)算符的匯總表

以下是兩個(gè)定義的功能的簡(jiǎn)要摘要,但為簡(jiǎn)單起見(jiàn)Graph,GraphOps它作為 Graph 的成員呈現(xiàn)。請(qǐng)注意,已經(jīng)簡(jiǎn)化了一些功能簽名(例如,刪除了默認(rèn)參數(shù)和類型約束),并且已經(jīng)刪除了一些更高級(jí)的功能,因此請(qǐng)參閱 API 文檔以獲取正式的操作列表。

/** Summary of the functionality in the property graph */classGraph[VD,ED]{// Information about the Graph ===================================================================valnumEdges:LongvalnumVertices:LongvalinDegrees:VertexRDD[Int]valoutDegrees:VertexRDD[Int]valdegrees:VertexRDD[Int]// Views of the graph as collections =============================================================valvertices:VertexRDD[VD]valedges:EdgeRDD[ED]valtriplets:RDD[EdgeTriplet[VD,ED]]// Functions for caching graphs ==================================================================defpersist(newLevel:StorageLevel=StorageLevel.MEMORY_ONLY):Graph[VD,ED]defcache():Graph[VD,ED]defunpersistVertices(blocking:Boolean=true):Graph[VD,ED]// Change the partitioning heuristic? ============================================================defpartitionBy(partitionStrategy:PartitionStrategy):Graph[VD,ED]// Transform vertex and edge attributes ==========================================================defmapVertices[VD2](map:(VertexId,VD)=>VD2):Graph[VD2,ED]defmapEdges[ED2](map:Edge[ED]=>ED2):Graph[VD,ED2]defmapEdges[ED2](map:(PartitionID,Iterator[Edge[ED]])=>Iterator[ED2]):Graph[VD,ED2]defmapTriplets[ED2](map:EdgeTriplet[VD,ED]=>ED2):Graph[VD,ED2]defmapTriplets[ED2](map:(PartitionID,Iterator[EdgeTriplet[VD,ED]])=>Iterator[ED2]):Graph[VD,ED2]// Modify the graph structure ====================================================================defreverse:Graph[VD,ED]defsubgraph(epred:EdgeTriplet[VD,ED]=>Boolean=(x=>true),vpred:(VertexId,VD)=>Boolean=((v,d)=>true)):Graph[VD,ED]defmask[VD2,ED2](other:Graph[VD2,ED2]):Graph[VD,ED]defgroupEdges(merge:(ED,ED)=>ED):Graph[VD,ED]// Join RDDs with the graph ======================================================================defjoinVertices[U](table:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,U)=>VD):Graph[VD,ED]defouterJoinVertices[U,VD2](other:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,Option[U])=>VD2):Graph[VD2,ED]// Aggregate information about adjacent triplets =================================================defcollectNeighborIds(edgeDirection:EdgeDirection):VertexRDD[Array[VertexId]]defcollectNeighbors(edgeDirection:EdgeDirection):VertexRDD[Array[(VertexId,VD)]]defaggregateMessages[Msg:ClassTag](sendMsg:EdgeContext[VD,ED,Msg]=>Unit,mergeMsg:(Msg,Msg)=>Msg,tripletFields:TripletFields=TripletFields.All):VertexRDD[A]// Iterative graph-parallel computation ==========================================================defpregel[A](initialMsg:A,maxIterations:Int,activeDirection:EdgeDirection)(vprog:(VertexId,VD,A)=>VD,sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],mergeMsg:(A,A)=>A):Graph[VD,ED]// Basic graph algorithms ========================================================================defpageRank(tol:Double,resetProb:Double=0.15):Graph[Double,Double]defconnectedComponents():Graph[VertexId,ED]deftriangleCount():Graph[Int,ED]defstronglyConnectedComponents(numIter:Int):Graph[VertexId,ED]}

Property 運(yùn)算符

與 RDDmap運(yùn)算符一樣,屬性圖包含以下內(nèi)容:

classGraph[VD,ED]{defmapVertices[VD2](map:(VertexId,VD)=>VD2):Graph[VD2,ED]defmapEdges[ED2](map:Edge[ED]=>ED2):Graph[VD,ED2]defmapTriplets[ED2](map:EdgeTriplet[VD,ED]=>ED2):Graph[VD,ED2]}

這些運(yùn)算符中的每一個(gè)產(chǎn)生一個(gè)新的圖形,其中頂點(diǎn)或邊緣屬性被用戶定義的map函數(shù)修改。

請(qǐng)注意,在每種情況下,圖形結(jié)構(gòu)都不受影響。這是這些運(yùn)算符的一個(gè)關(guān)鍵特征,它允許生成的圖形重用原始圖形的結(jié)構(gòu)索引。以下代碼段在邏輯上是等效的,但是第一個(gè)代碼片段不保留結(jié)構(gòu)索引,并且不會(huì)從GraphX系統(tǒng)優(yōu)化中受益:

valnewVertices=graph.vertices.map{case(id,attr)=>(id,mapUdf(id,attr))}valnewGraph=Graph(newVertices,graph.edges)

而是mapVertices用來(lái)保存索引:

valnewGraph=graph.mapVertices((id,attr)=>mapUdf(id,attr))

這些運(yùn)算符通常用于初始化特定計(jì)算或項(xiàng)目的圖形以避免不必要的屬性。例如,給出一個(gè)以度為頂點(diǎn)屬性的圖(我們稍后將描述如何構(gòu)建這樣一個(gè)圖),我們?yōu)镻ageRank初始化它:

// Given a graph where the vertex property is the out degreevalinputGraph:Graph[Int,String]=graph.outerJoinVertices(graph.outDegrees)((vid,_,degOpt)=>degOpt.getOrElse(0))// Construct a graph where each edge contains the weight// and each vertex is the initial PageRankvaloutputGraph:Graph[Double,Double]=inputGraph.mapTriplets(triplet=>1.0/triplet.srcAttr).mapVertices((id,_)=>1.0)

Structural 運(yùn)算符

目前GraphX只支持一套簡(jiǎn)單的常用結(jié)構(gòu)運(yùn)算符,我們預(yù)計(jì)將來(lái)會(huì)增加更多。以下是基本結(jié)構(gòu)運(yùn)算符的列表。

classGraph[VD,ED]{defreverse:Graph[VD,ED]defsubgraph(epred:EdgeTriplet[VD,ED]=>Boolean,vpred:(VertexId,VD)=>Boolean):Graph[VD,ED]defmask[VD2,ED2](other:Graph[VD2,ED2]):Graph[VD,ED]defgroupEdges(merge:(ED,ED)=>ED):Graph[VD,ED]}

reverse運(yùn)算符將返回逆轉(zhuǎn)的所有邊緣方向上的新圖。這在例如嘗試計(jì)算逆PageRank時(shí)是有用的。由于反向操作不會(huì)修改頂點(diǎn)或邊緣屬性或更改邊緣數(shù)量,因此可以在沒(méi)有數(shù)據(jù)移動(dòng)或重復(fù)的情況下高效地實(shí)現(xiàn)。

subgraph操作者需要的頂點(diǎn)和邊緣的謂詞,并返回包含只有滿足謂詞頂點(diǎn)的頂點(diǎn)的曲線圖(評(píng)估為真),并且滿足謂詞邊緣邊緣并連接滿足頂點(diǎn)謂詞頂點(diǎn)。所述subgraph操作員可在情況編號(hào)被用來(lái)限制圖形以頂點(diǎn)和感興趣的邊緣或消除斷開的鏈接。例如,在以下代碼中,我們刪除了斷開的鏈接:

// Create an RDD for the verticesvalusers:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof")),(4L,("peter","student"))))// Create an RDD for edgesvalrelationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi"),Edge(4L,0L,"student"),Edge(5L,0L,"colleague")))// Define a default user in case there are relationship with missing uservaldefaultUser=("John Doe","Missing")// Build the initial Graphvalgraph=Graph(users,relationships,defaultUser)// Notice that there is a user 0 (for which we have no information) connected to users// 4 (peter) and 5 (franklin).graph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1).collect.foreach(println(_))// Remove missing vertices as well as the edges to connected to themvalvalidGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="Missing")// The valid subgraph will disconnect users 4 and 5 by removing user 0validGraph.vertices.collect.foreach(println(_))validGraph.triplets.map(triplet=>triplet.srcAttr._1+" is the "+triplet.attr+" of "+triplet.dstAttr._1).collect.foreach(println(_))

注意在上面的例子中只提供了頂點(diǎn)謂詞。 如果未提供頂點(diǎn)或邊緣謂詞,則subgraph運(yùn)算符默認(rèn)為true。

mask操作者通過(guò)返回包含該頂點(diǎn)和邊,它們也在輸入圖形中發(fā)現(xiàn)的曲線構(gòu)造一個(gè)子圖。這可以與subgraph運(yùn)算符一起使用, 以便根據(jù)另一個(gè)相關(guān)圖中的屬性限制圖形。例如,我們可以使用缺少頂點(diǎn)的圖運(yùn)行連接的組件,然后將答案限制為有效的子圖。

// Run Connected ComponentsvalccGraph=graph.connectedComponents()// No longer contains missing field// Remove missing vertices as well as the edges to connected to themvalvalidGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="Missing")// Restrict the answer to the valid subgraphvalvalidCCGraph=ccGraph.mask(validGraph)

groupEdges操作符將多邊形中的平行邊(即,頂點(diǎn)對(duì)之間的重復(fù)邊)合并。 在許多數(shù)值應(yīng)用中,可以將平行邊緣(它們的權(quán)重組合)合并成單個(gè)邊緣,從而減小圖形的大小。

Join 運(yùn)算符

在許多情況下,有必要使用圖形連接來(lái)自外部收集( RDD )的數(shù)據(jù)。例如,我們可能有額外的用戶屬性,我們要與現(xiàn)有的圖形合并,或者我們可能希望將頂點(diǎn)屬性從一個(gè)圖形拉到另一個(gè)。這些任務(wù)可以使用join運(yùn)算符完成。下面我們列出關(guān)鍵 join 運(yùn)算符:

classGraph[VD,ED]{defjoinVertices[U](table:RDD[(VertexId,U)])(map:(VertexId,VD,U)=>VD):Graph[VD,ED]defouterJoinVertices[U,VD2](table:RDD[(VertexId,U)])(map:(VertexId,VD,Option[U])=>VD2):Graph[VD2,ED]}

joinVertices操作符將頂點(diǎn)與輸入 RDD 相連,并返回一個(gè)新的圖形,其中通過(guò)將用戶定義的map函數(shù)應(yīng)用于已連接頂點(diǎn)的結(jié)果而獲得的頂點(diǎn)屬性。 RDD 中沒(méi)有匹配值的頂點(diǎn)保留其原始值。

請(qǐng)注意,如果 RDD 包含給定頂點(diǎn)的多個(gè)值,則只能使用一個(gè)值。因此,建議使用以下命令使輸入 RDD 變得獨(dú)一無(wú)二,這也將對(duì)結(jié)果值進(jìn)行pre-index,以顯著加速后續(xù)連接。

valnonUniqueCosts:RDD[(VertexId,Double)]valuniqueCosts:VertexRDD[Double]=graph.vertices.aggregateUsingIndex(nonUnique,(a,b)=>a+b)valjoinedGraph=graph.joinVertices(uniqueCosts)((id,oldCost,extraCost)=>oldCost+extraCost)

除了將用戶定義的map函數(shù)應(yīng)用于所有頂點(diǎn)并且可以更改頂點(diǎn)屬性類型之外,更一般的outerJoinVertices的行為類似于joinVertices。 因?yàn)椴皇撬械捻旤c(diǎn)都可能在輸入 RDD 中具有匹配的值,所以map函數(shù)采用Option類型。 例如,我們可以通過(guò)使用outDegree初始化頂點(diǎn)屬性來(lái)為 PageRank 設(shè)置一個(gè)圖。

valoutDegrees:VertexRDD[Int]=graph.outDegreesvaldegreeGraph=graph.outerJoinVertices(outDegrees){(id,oldAttr,outDegOpt)=>outDegOptmatch{caseSome(outDeg)=>outDegcaseNone=>0// No outDegree means zero outDegree}}

您可能已經(jīng)注意到上述示例中使用的多個(gè)參數(shù)列表(例如:f(a)(b)curried 函數(shù)模式。 雖然我們可以將f(a)(b)同樣地寫成f(a,b),這意味著b上的類型推斷不依賴于a。 因此,用戶需要為用戶定義的函數(shù)提供類型注釋:

valjoinedGraph=graph.joinVertices(uniqueCosts,(id:VertexId,oldCost:Double,extraCost:Double)=>oldCost+extraCost)

鄰域聚合

許多圖形分析任務(wù)的關(guān)鍵步驟是聚合關(guān)于每個(gè)頂點(diǎn)鄰域的信息。 例如,我們可能想知道每個(gè)用戶擁有的關(guān)注者數(shù)量或每個(gè)用戶的追隨者的平均年齡。 許多迭代圖表算法(例如:網(wǎng)頁(yè)級(jí)別,最短路徑,以及連接成分)相鄰頂點(diǎn)(例如:電流值的 PageRank ,最短到源路徑,和最小可達(dá)頂點(diǎn) ID )的重復(fù)聚合性質(zhì)。

為了提高性能,主聚合操作員graph.mapReduceTriplets從新的更改graph.AggregateMessages。雖然 API 的變化相對(duì)較小,但我們?cè)谙旅嫣峁┝艘粋€(gè)轉(zhuǎn)換指南。

聚合消息 (aggregateMessages)

GraphX 中的核心聚合操作是aggregateMessages。該運(yùn)算符將用戶定義的sendMsg函數(shù)應(yīng)用于圖中的每個(gè)邊緣三元組,然后使用該mergeMsg函數(shù)在其目標(biāo)頂點(diǎn)聚合這些消息。

classGraph[VD,ED]{defaggregateMessages[Msg:ClassTag](sendMsg:EdgeContext[VD,ED,Msg]=>Unit,mergeMsg:(Msg,Msg)=>Msg,tripletFields:TripletFields=TripletFields.All):VertexRDD[Msg]}

用戶定義的sendMsg函數(shù)接受一個(gè)EdgeContext,它將源和目標(biāo)屬性以及 edge 屬性和函數(shù) (sendToSrc, 和sendToDst) 一起發(fā)送到源和目標(biāo)屬性。 在 map-reduce 中,將sendMsg作為map函數(shù)。 用戶定義的mergeMsg函數(shù)需要兩個(gè)發(fā)往同一頂點(diǎn)的消息,并產(chǎn)生一條消息。 想想mergeMsg是 map-reduce 中的reduce函數(shù)。aggregateMessages運(yùn)算符返回一個(gè)VertexRDD[Msg],其中包含去往每個(gè)頂點(diǎn)的聚合消息(Msg類型)。 沒(méi)有收到消息的頂點(diǎn)不包括在返回的VertexRDDVertexRDD中。

另外,aggregateMessages采用一個(gè)可選的tripletsFields,它們指示在EdgeContext中訪問(wèn)哪些數(shù)據(jù)(即源頂點(diǎn)屬性,而不是目標(biāo)頂點(diǎn)屬性)。tripletsFields定義的可能選項(xiàng),TripletFields默認(rèn)值是TripletFields.All指示用戶定義的sendMsg函數(shù)可以訪問(wèn)的任何字段EdgeContext。該tripletFields參數(shù)可用于通知 GraphX ,只有部分EdgeContext需要允許 GraphX 選擇優(yōu)化的連接策略。例如,如果我們計(jì)算每個(gè)用戶的追隨者的平均年齡,我們只需要源字段,因此我們將用于TripletFields.Src表示我們只需要源字段。

在早期版本的 GraphX 中,我們使用字節(jié)碼檢測(cè)來(lái)推斷,TripletFields但是我們發(fā)現(xiàn)字節(jié)碼檢查稍微不可靠,而是選擇了更明確的用戶控制。

在下面的例子中,我們使用aggregateMessages運(yùn)算符來(lái)計(jì)算每個(gè)用戶的資深追蹤者的平均年齡。

importorg.apache.spark.graphx.{Graph,VertexRDD}importorg.apache.spark.graphx.util.GraphGenerators// Create a graph with "age" as the vertex property.// Here we use a random graph for simplicity.valgraph:Graph[Double,Int]=GraphGenerators.logNormalGraph(sc,numVertices=100).mapVertices((id,_)=>id.toDouble)// Compute the number of older followers and their total agevalolderFollowers:VertexRDD[(Int,Double)]=graph.aggregateMessages[(Int,Double)](triplet=>{// Map Functionif(triplet.srcAttr>triplet.dstAttr){// Send message to destination vertex containing counter and agetriplet.sendToDst(1,triplet.srcAttr)}},// Add counter and age(a,b)=>(a._1+b._1,a._2+b._2)// Reduce Function)// Divide total age by number of older followers to get average age of older followersvalavgAgeOfOlderFollowers:VertexRDD[Double]=olderFollowers.mapValues((id,value)=>valuematch{case(count,totalAge)=>totalAge/count})// Display the resultsavgAgeOfOlderFollowers.collect.foreach(println(_))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala" in the Spark repo.

aggregateMessages當(dāng)消息(和消息的總和)是恒定大?。ɡ纾焊?dòng)和加法而不是列表和級(jí)聯(lián))時(shí),該操作最佳地執(zhí)行。

Map Reduce Triplets Transition Guide (Legacy)

在早期版本的 GraphX 中,鄰域聚合是使用mapReduceTriplets運(yùn)算符完成的 :

classGraph[VD,ED]{defmapReduceTriplets[Msg](map:EdgeTriplet[VD,ED]=>Iterator[(VertexId,Msg)],reduce:(Msg,Msg)=>Msg):VertexRDD[Msg]}

mapReduceTriplets操作符接受用戶定義的映射函數(shù),該函數(shù)應(yīng)用于每個(gè)三元組,并且可以使用用戶定義的縮減函數(shù)來(lái)生成聚合的消息。 然而,我們發(fā)現(xiàn)返回的迭代器的用戶是昂貴的,并且它阻止了我們應(yīng)用其他優(yōu)化(例如:局部頂點(diǎn)重新編號(hào))的能力。 在aggregateMessages中,我們引入了 EdgeContext ,它暴露了三元組字段,并且還顯示了向源和目標(biāo)頂點(diǎn)發(fā)送消息的功能。 此外,我們刪除了字節(jié)碼檢查,而是要求用戶指出三元組中實(shí)際需要哪些字段。

以下代碼塊使用mapReduceTriplets:

valgraph:Graph[Int,Float]=...defmsgFun(triplet:Triplet[Int,Float]):Iterator[(Int,String)]={Iterator((triplet.dstId,"Hi"))}defreduceFun(a:String,b:String):String=a+" "+bvalresult=graph.mapReduceTriplets[String](msgFun,reduceFun)

可以使用aggregateMessages:?

valgraph:Graph[Int,Float]=...defmsgFun(triplet:EdgeContext[Int,Float,String]){triplet.sendToDst("Hi")}defreduceFun(a:String,b:String):String=a+" "+bvalresult=graph.aggregateMessages[String](msgFun,reduceFun)

計(jì)算級(jí)別信息

常見(jiàn)的聚合任務(wù)是計(jì)算每個(gè)頂點(diǎn)的程度:與每個(gè)頂點(diǎn)相鄰的邊數(shù)。在有向圖的上下文中,通常需要知道每個(gè)頂點(diǎn)的度數(shù),外部程度和總程度。本GraphOps類包含運(yùn)營(yíng)商計(jì)算度數(shù)每個(gè)頂點(diǎn)的集合。例如,在下面我們將計(jì)算最大值,最大和最大級(jí)別:

// Define a reduce operation to compute the highest degree vertexdefmax(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={if(a._2>b._2)aelseb}// Compute the max degreesvalmaxInDegree:(VertexId,Int)=graph.inDegrees.reduce(max)valmaxOutDegree:(VertexId,Int)=graph.outDegrees.reduce(max)valmaxDegrees:(VertexId,Int)=graph.degrees.reduce(max)

收集相鄰點(diǎn)

在某些情況下,通過(guò)在每個(gè)頂點(diǎn)處收集相鄰頂點(diǎn)及其屬性可以更容易地表達(dá)計(jì)算。這可以使用collectNeighborIdscollectNeighbors運(yùn)算符輕松實(shí)現(xiàn) 。

classGraphOps[VD,ED]{defcollectNeighborIds(edgeDirection:EdgeDirection):VertexRDD[Array[VertexId]]defcollectNeighbors(edgeDirection:EdgeDirection):VertexRDD[Array[(VertexId,VD)]]}

這些操作可能相當(dāng)昂貴,因?yàn)樗鼈冎貜?fù)信息并需要大量通信。 如果可能,請(qǐng)直接使用aggregateMessages操作來(lái)表達(dá)相同的計(jì)算。

Caching and Uncaching

在 Spark 中,默認(rèn)情況下,RDD 不會(huì)保留在內(nèi)存中。為了避免重新計(jì)算,在多次使用它們時(shí),必須明確緩存它們(參見(jiàn)Spark Programming Guide)。GraphX 中的圖形表現(xiàn)方式相同。當(dāng)多次使用圖表時(shí),請(qǐng)務(wù)必先調(diào)用Graph.cache()

在迭代計(jì)算中,uncaching也可能是最佳性能所必需的。默認(rèn)情況下,緩存的 RDD 和圖形將保留在內(nèi)存中,直到內(nèi)存壓力迫使它們以 LRU 順序逐出。對(duì)于迭代計(jì)算,來(lái)自先前迭代的中間結(jié)果將填滿緩存。雖然它們最終被驅(qū)逐出來(lái),但存儲(chǔ)在內(nèi)存中的不必要的數(shù)據(jù)會(huì)減慢垃圾收集速度。一旦不再需要中間結(jié)果,就會(huì)更有效率。這涉及每次迭代實(shí)現(xiàn)(緩存和強(qiáng)制)圖形或 RDD ,取消所有其他數(shù)據(jù)集,并且僅在將來(lái)的迭代中使用實(shí)例化數(shù)據(jù)集。然而,由于圖形由多個(gè) RDD 組成,所以很難將它們正確地分開。對(duì)于迭代計(jì)算,我們建議使用Pregel API,它可以正確地解析中間結(jié)果。

Pregel API

圖形是固有的遞歸數(shù)據(jù)結(jié)構(gòu),因?yàn)轫旤c(diǎn)的屬性取決于其鄰居的屬性,而鄰居的屬性又依賴于其鄰居的屬性。因此,許多重要的圖算法迭代地重新計(jì)算每個(gè)頂點(diǎn)的屬性,直到達(dá)到一個(gè)固定點(diǎn)條件。已經(jīng)提出了一系列圖并行抽象來(lái)表達(dá)這些迭代算法。GraphX 公開了 Pregel API 的變體。

在高層次上,GraphX 中的 Pregel 運(yùn)算符是限制到圖形拓?fù)涞呐客讲⑿邢⒊橄蟆?Pregel 操作符在一系列超級(jí)步驟中執(zhí)行,其中頂點(diǎn)接收來(lái)自先前超級(jí)步驟的入站消息的總和,計(jì)算頂點(diǎn)屬性的新值,然后在下一個(gè)超級(jí)步驟中將消息發(fā)送到相鄰頂點(diǎn)。與 Pregel 不同,消息作為邊緣三元組的函數(shù)并行計(jì)算,消息計(jì)算可以訪問(wèn)源和目標(biāo)頂點(diǎn)屬性。在超級(jí)步驟中跳過(guò)不接收消息的頂點(diǎn)。 Pregel 運(yùn)算符終止迭代,并在沒(méi)有剩余的消息時(shí)返回最終的圖。

注意,與更多的標(biāo)準(zhǔn) Pregel 實(shí)現(xiàn)不同,GraphX 中的頂點(diǎn)只能將消息發(fā)送到相鄰頂點(diǎn),并且使用用戶定義的消息傳遞功能并行完成消息構(gòu)造。這些約束允許在 GraphX 中進(jìn)行額外優(yōu)化。

以下是Pregel 運(yùn)算符的類型簽名以及 其實(shí)現(xiàn)的草圖(注意:為了避免由于長(zhǎng)譜系鏈引起的 stackOverflowError , pregel 支持周期性檢查點(diǎn)圖和消息,將 “spark.graphx.pregel.checkpointInterval” 設(shè)置為正數(shù),說(shuō)10。并使用 SparkContext.setCheckpointDir(directory: String)) 設(shè)置 checkpoint 目錄):

classGraphOps[VD,ED]{defpregel[A](initialMsg:A,maxIter:Int=Int.MaxValue,activeDir:EdgeDirection=EdgeDirection.Out)(vprog:(VertexId,VD,A)=>VD,sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],mergeMsg:(A,A)=>A):Graph[VD,ED]={// Receive the initial message at each vertexvarg=mapVertices((vid,vdata)=>vprog(vid,vdata,initialMsg)).cache()// compute the messagesvarmessages=g.mapReduceTriplets(sendMsg,mergeMsg)varactiveMessages=messages.count()// Loop until no messages remain or maxIterations is achievedvari=0while(activeMessages>0&&i

請(qǐng)注意,Pregel 需要兩個(gè)參數(shù)列表(即:graph.pregel(list1)(list2)。第一個(gè)參數(shù)列表包含配置參數(shù),包括初始消息,最大迭代次數(shù)以及發(fā)送消息的邊緣方向(默認(rèn)情況下為邊緣)。第二個(gè)參數(shù)列表包含用于接收消息(頂點(diǎn)程序vprog),計(jì)算消息(sendMsg)和組合消息的用戶定義函數(shù)mergeMsg。

在以下示例中,我們可以使用 Pregel 運(yùn)算符來(lái)表達(dá)單源最短路徑的計(jì)算。

importorg.apache.spark.graphx.{Graph,VertexId}importorg.apache.spark.graphx.util.GraphGenerators// A graph with edge attributes containing distancesvalgraph:Graph[Long,Double]=GraphGenerators.logNormalGraph(sc,numVertices=100).mapEdges(e=>e.attr.toDouble)valsourceId:VertexId=42// The ultimate source// Initialize the graph such that all vertices except the root have distance infinity.valinitialGraph=graph.mapVertices((id,_)=>if(id==sourceId)0.0elseDouble.PositiveInfinity)valsssp=initialGraph.pregel(Double.PositiveInfinity)((id,dist,newDist)=>math.min(dist,newDist),// Vertex Programtriplet=>{// Send Messageif(triplet.srcAttr+triplet.attrmath.min(a,b)// Merge Message)println(sssp.vertices.collect.mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala" in the Spark repo.

Graph 建造者

GraphX 提供了從 RDD 或磁盤上的頂點(diǎn)和邊的集合構(gòu)建圖形的幾種方法。默認(rèn)情況下,圖形構(gòu)建器都不會(huì)重新分配圖形邊; 相反,邊緣保留在其默認(rèn)分區(qū)(例如 HDFS 中的原始?jí)K)中。Graph.groupEdges需要重新分區(qū)圖,因?yàn)樗俣ㄏ嗤倪厡⒈还餐ㄎ辉谕环謪^(qū)上,因此您必須在調(diào)用Graph.partitionBy之前調(diào)用groupEdges。

objectGraphLoader{defedgeListFile(sc:SparkContext,path:String,canonicalOrientation:Boolean=false,minEdgePartitions:Int=1):Graph[Int,Int]}

GraphLoader.edgeListFile提供了從磁盤邊緣列表中加載圖形的方法。它解析以下形式的(源頂點(diǎn) ID ,目標(biāo)頂點(diǎn) ID )對(duì)的鄰接列表,跳過(guò)以下開始的注釋行#:

# This is a comment

2 1

4 1

1 2

它Graph從指定的邊緣創(chuàng)建一個(gè),自動(dòng)創(chuàng)建邊緣提到的任何頂點(diǎn)。所有頂點(diǎn)和邊緣屬性默認(rèn)為1.canonicalOrientation參數(shù)允許在正方向 (srcId < dstId) 重新定向邊,這是連接的組件算法所要求的。該minEdgePartitions參數(shù)指定要生成的邊緣分區(qū)的最小數(shù)量; 如果例如 HDFS 文件具有更多塊,則可能存在比指定更多的邊緣分區(qū)。

objectGraph{defapply[VD,ED](vertices:RDD[(VertexId,VD)],edges:RDD[Edge[ED]],defaultVertexAttr:VD=null):Graph[VD,ED]deffromEdges[VD,ED](edges:RDD[Edge[ED]],defaultValue:VD):Graph[VD,ED]deffromEdgeTuples[VD](rawEdges:RDD[(VertexId,VertexId)],defaultValue:VD,uniqueEdges:Option[PartitionStrategy]=None):Graph[VD,Int]}

Graph.apply允許從頂點(diǎn)和邊緣的 RDD 創(chuàng)建圖形。重復(fù)的頂點(diǎn)被任意挑選,并且邊緣 RDD 中找到的頂點(diǎn),而不是頂點(diǎn) RDD 被分配了默認(rèn)屬性。

Graph.fromEdges允許僅從 RDD 的邊緣創(chuàng)建圖形,自動(dòng)創(chuàng)建邊緣提到的任何頂點(diǎn)并將其分配給默認(rèn)值。

Graph.fromEdgeTuples允許僅從邊緣元組的 RDD 創(chuàng)建圖形,將邊緣分配為值1,并自動(dòng)創(chuàng)建邊緣提到的任何頂點(diǎn)并將其分配給默認(rèn)值。 它還支持重復(fù)數(shù)據(jù)刪除邊緣; 重復(fù)數(shù)據(jù)刪除,將某些PartitionStrategy作為uniqueEdges參數(shù)傳遞(例如:uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。 分區(qū)策略是必須的,以便在相同的分區(qū)上共同使用相同的邊,以便可以進(jìn)行重復(fù)數(shù)據(jù)刪除。

Vertex and Edge RDDs

GraphX 公開RDD了圖中存儲(chǔ)的頂點(diǎn)和邊的視圖。然而,由于 GraphX 在優(yōu)化的數(shù)據(jù)結(jié)構(gòu)中維護(hù)頂點(diǎn)和邊,并且這些數(shù)據(jù)結(jié)構(gòu)提供了附加功能,所以頂點(diǎn)和邊分別作為VertexRDDEdgeRDD返回 。在本節(jié)中,我們將回顧一些這些類型中的其他有用功能。請(qǐng)注意,這只是一個(gè)不完整的列表,請(qǐng)參閱API文檔中的正式操作列表。

VertexRDDs

該VertexRDD[A]擴(kuò)展RDD[(VertexId, A)]并增加了額外的限制,每個(gè)VertexId只發(fā)生一次。此外,VertexRDD[A]表示一組頂點(diǎn),每個(gè)頂點(diǎn)的屬性類型A。在內(nèi)部,這是通過(guò)將頂點(diǎn)屬性存儲(chǔ)在可重用的散列圖數(shù)據(jù)結(jié)構(gòu)中來(lái)實(shí)現(xiàn)的。因此,如果兩個(gè)VertexRDD派生自相同的基礎(chǔ)VertexRDD(例如:filter或mapValues),則可以在不使用散列評(píng)估的情況下連續(xù)連接。為了利用這個(gè)索引的數(shù)據(jù)結(jié)構(gòu),VertexRDD公開了以下附加功能:

classVertexRDD[VD]extendsRDD[(VertexId,VD)]{// Filter the vertex set but preserves the internal indexdeffilter(pred:Tuple2[VertexId,VD]=>Boolean):VertexRDD[VD]// Transform the values without changing the ids (preserves the internal index)defmapValues[VD2](map:VD=>VD2):VertexRDD[VD2]defmapValues[VD2](map:(VertexId,VD)=>VD2):VertexRDD[VD2]// Show only vertices unique to this set based on their VertexId'sdefminus(other:RDD[(VertexId,VD)])// Remove vertices from this set that appear in the other setdefdiff(other:VertexRDD[VD]):VertexRDD[VD]// Join operators that take advantage of the internal indexing to accelerate joins (substantially)defleftJoin[VD2,VD3](other:RDD[(VertexId,VD2)])(f:(VertexId,VD,Option[VD2])=>VD3):VertexRDD[VD3]definnerJoin[U,VD2](other:RDD[(VertexId,U)])(f:(VertexId,VD,U)=>VD2):VertexRDD[VD2]// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.defaggregateUsingIndex[VD2](other:RDD[(VertexId,VD2)],reduceFunc:(VD2,VD2)=>VD2):VertexRDD[VD2]}

請(qǐng)注意,例如,filter運(yùn)算符 如何返回VertexRDD。過(guò)濾器實(shí)際上是通過(guò)BitSet使用索引重新實(shí)現(xiàn)的,并保留與其他VertexRDD進(jìn)行快速連接的能力。同樣,mapValues運(yùn)算符不允許map功能改變,VertexId從而使相同的HashMap數(shù)據(jù)結(jié)構(gòu)能夠被重用。無(wú)論是leftJoin和innerJoin能夠連接兩個(gè)時(shí)識(shí)別VertexRDD來(lái)自同一來(lái)源的小號(hào)HashMap和落實(shí)線性掃描,而不是昂貴的點(diǎn)查找的加入。

aggregateUsingIndex運(yùn)算符對(duì)于從RDD[(VertexId, A)]有效構(gòu)建新的VertexRDD非常有用。 在概念上,如果我在一組頂點(diǎn)上構(gòu)造了一個(gè)VertexRDD[B],這是一些RDD[(VertexId, A)]中的頂點(diǎn)的超集,那么我可以重用索引來(lái)聚合然后再索引RDD[(VertexId, A)]。 例如:

valsetA:VertexRDD[Int]=VertexRDD(sc.parallelize(0Luntil100L).map(id=>(id,1)))valrddB:RDD[(VertexId,Double)]=sc.parallelize(0Luntil100L).flatMap(id=>List((id,1.0),(id,2.0)))// There should be 200 entries in rddBrddB.countvalsetB:VertexRDD[Double]=setA.aggregateUsingIndex(rddB,_+_)// There should be 100 entries in setBsetB.count// Joining A and B should now be fast!valsetC:VertexRDD[Double]=setA.innerJoin(setB)((id,a,b)=>a+b)

EdgeRDDs

該EdgeRDD[ED]擴(kuò)展RDD[Edge[ED]]使用其中定義的各種分區(qū)策略之一來(lái)組織塊中的邊PartitionStrategy。在每個(gè)分區(qū)中,邊緣屬性和鄰接結(jié)構(gòu)分別存儲(chǔ),可以在更改屬性值時(shí)進(jìn)行最大限度的重用。

EdgeRDDEdgeRDD公開的三個(gè)附加功能是:

// Transform the edge attributes while preserving the structuredefmapValues[ED2](f:Edge[ED]=>ED2):EdgeRDD[ED2]// Reverse the edges reusing both attributes and structuredefreverse:EdgeRDD[ED]// Join two `EdgeRDD`s partitioned using the same partitioning strategy.definnerJoin[ED2,ED3](other:EdgeRDD[ED2])(f:(VertexId,VertexId,ED,ED2)=>ED3):EdgeRDD[ED3]

在大多數(shù)應(yīng)用中,我們發(fā)現(xiàn)在EdgeRDDEdgeRDD上的操作是通過(guò)圖形運(yùn)算符完成的,或者依賴基RDD類中定義的操作。

優(yōu)化表示

雖然在分布式圖形的 GraphX 表示中使用的優(yōu)化的詳細(xì)描述超出了本指南的范圍,但一些高級(jí)理解可能有助于可擴(kuò)展算法的設(shè)計(jì)以及 API 的最佳使用。GraphX 采用頂點(diǎn)切分方式進(jìn)行分布式圖分割:

GraphX 不是沿著邊沿分割圖形,而是沿著頂點(diǎn)分割圖形,這可以減少通信和存儲(chǔ)開銷。在邏輯上,這對(duì)應(yīng)于將邊緣分配給機(jī)器并允許頂點(diǎn)跨越多臺(tái)機(jī)器。分配邊緣的確切方法取決于PartitionStrategy各種啟發(fā)式的幾種折衷。用戶可以通過(guò)與Graph.partitionBy運(yùn)算符重新分區(qū)圖來(lái)選擇不同的策略。默認(rèn)分區(qū)策略是使用圖形構(gòu)建中提供的邊的初始分區(qū)。然而,用戶可以輕松切換到 GraphX 中包含的 2D 劃分或其他啟發(fā)式算法。

一旦邊緣被劃分,高效的圖形并行計(jì)算的關(guān)鍵挑戰(zhàn)就是有效地將頂點(diǎn)屬性與邊緣連接起來(lái)。因?yàn)檎鎸?shí)世界的圖形通常具有比頂點(diǎn)更多的邊緣,所以我們將頂點(diǎn)屬性移動(dòng)到邊緣。因?yàn)椴皇撬械姆謪^(qū)都將包含鄰近的所有頂點(diǎn)的邊緣,我們內(nèi)部維護(hù)標(biāo)識(shí)在哪里執(zhí)行所需的連接像操作時(shí),廣播頂點(diǎn)的路由表triplets和aggregateMessages。

Graph 算法

GraphX 包括一組簡(jiǎn)化分析任務(wù)的圖算法。該算法被包含在org.apache.spark.graphx.lib包可直接作為方法來(lái)訪問(wèn)Graph通過(guò)GraphOps。本節(jié)介紹算法及其使用方法。

PageRank

PageRank 測(cè)量在圖中每個(gè)頂點(diǎn)的重要性,假設(shè)從邊緣u到v表示的認(rèn)可v通過(guò)的重要性u(píng)。例如,如果 Twitter 用戶遵循許多其他用戶,則用戶將被高度排名。

GraphX 附帶了 PageRank 的靜態(tài)和動(dòng)態(tài)實(shí)現(xiàn)方法作PageRank 對(duì)象上的方法。靜態(tài) PageRank 運(yùn)行固定次數(shù)的迭代,而動(dòng)態(tài) PageRank 運(yùn)行直到排列收斂(即,停止改變超過(guò)指定的公差)。GraphOps允許直接調(diào)用這些算法作為方法Graph。

GraphX還包括一個(gè)可以運(yùn)行 PageRank 的社交網(wǎng)絡(luò)數(shù)據(jù)集示例。給出了一組用戶data/graphx/users.txt,并給出了一組用戶之間的關(guān)系data/graphx/followers.txt。我們計(jì)算每個(gè)用戶的 PageRank 如下:

importorg.apache.spark.graphx.GraphLoader// Load the edges as a graphvalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Run PageRankvalranks=graph.pageRank(0.0001).vertices// Join the ranks with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valranksByUsername=users.join(ranks).map{case(id,(username,rank))=>(username,rank)}// Print the resultprintln(ranksByUsername.collect().mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala" in the Spark repo.

連接組件

連接的組件算法將圖中每個(gè)連接的組件與其最低編號(hào)頂點(diǎn)的ID進(jìn)行標(biāo)記。例如,在社交網(wǎng)絡(luò)中,連接的組件可以近似群集。GraphX包含ConnectedComponents object中算法的實(shí)現(xiàn),我們從PageRank 部分計(jì)算示例社交網(wǎng)絡(luò)數(shù)據(jù)集的連接組件如下:

importorg.apache.spark.graphx.GraphLoader// Load the graph as in the PageRank examplevalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Find the connected componentsvalcc=graph.connectedComponents().vertices// Join the connected components with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valccByUsername=users.join(cc).map{case(id,(username,cc))=>(username,cc)}// Print the resultprintln(ccByUsername.collect().mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala" in the Spark repo.

Triangle 計(jì)數(shù)

頂點(diǎn)是三角形的一部分,當(dāng)它有兩個(gè)相鄰的頂點(diǎn)之間有一個(gè)邊。GraphX 在TriangleCount 對(duì)象中實(shí)現(xiàn)一個(gè)三角計(jì)數(shù)算法,用于確定通過(guò)每個(gè)頂點(diǎn)的三角形數(shù)量,提供聚類度量。我們從PageRank 部分計(jì)算社交網(wǎng)絡(luò)數(shù)據(jù)集的三角形數(shù)。需要注意的是TriangleCount邊緣要處于規(guī)范方向 (srcId < dstId),而圖形要使用Graph.partitionBy

importorg.apache.spark.graphx.{GraphLoader,PartitionStrategy}// Load the edges in canonical order and partition the graph for triangle countvalgraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt",true).partitionBy(PartitionStrategy.RandomVertexCut)// Find the triangle count for each vertexvaltriCounts=graph.triangleCount().vertices// Join the triangle counts with the usernamesvalusers=sc.textFile("data/graphx/users.txt").map{line=>valfields=line.split(",")(fields(0).toLong,fields(1))}valtriCountByUsername=users.join(triCounts).map{case(id,(username,tc))=>(username,tc)}// Print the resultprintln(triCountByUsername.collect().mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala" in the Spark repo.

示例

假設(shè)我想從一些文本文件中構(gòu)建圖形,將圖形限制為重要的關(guān)系和用戶,在 sub-graph 上運(yùn)行 page-rank ,然后返回與頂級(jí)用戶關(guān)聯(lián)的屬性。我可以用 GraphX 在幾行內(nèi)完成所有這些:

importorg.apache.spark.graphx.GraphLoader// Load my user data and parse into tuples of user id and attribute listvalusers=(sc.textFile("data/graphx/users.txt").map(line=>line.split(",")).map(parts=>(parts.head.toLong,parts.tail)))// Parse the edge data which is already in userId -> userId formatvalfollowerGraph=GraphLoader.edgeListFile(sc,"data/graphx/followers.txt")// Attach the user attributesvalgraph=followerGraph.outerJoinVertices(users){case(uid,deg,Some(attrList))=>attrList// Some users may not have attributes so we set them as emptycase(uid,deg,None)=>Array.empty[String]}// Restrict the graph to users with usernames and namesvalsubgraph=graph.subgraph(vpred=(vid,attr)=>attr.size==2)// Compute the PageRankvalpagerankGraph=subgraph.pageRank(0.001)// Get the attributes of the top pagerank usersvaluserInfoWithPageRank=subgraph.outerJoinVertices(pagerankGraph.vertices){case(uid,attrList,Some(pr))=>(pr,attrList.toList)case(uid,attrList,None)=>(0.0,attrList.toList)}println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala" in the Spark repo.

我們一直在努力

apachecn/spark-doc-zh

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/graphx-programming-guide.html

網(wǎng)頁(yè)地址: http://spark.apachecn.org/

github: https://github.com/apachecn/spark-doc-zh(覺(jué)得不錯(cuò)麻煩給個(gè) Star,謝謝!~)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容