Spark圖計(jì)算API簡介

一、屬性操作符:

graph中提供了對vertex,edge和triplet的map操作,類似于RDD中的map操作:

def mapVertices[VD2](map:(VertexId, VD)=> VD2): Graph[VD2, ED]

def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

使用這些方法不會改變圖的結(jié)構(gòu),所以這些操作符可以利用原有的圖的structural indicies。所以不要用graph.vertices.map的方法來實(shí)現(xiàn)同樣的操作。

mapEdges: transform each edge attribute in the graph using the map function.

實(shí)例:注意在mapEdges中使用的函數(shù)里,輸入?yún)?shù)x是一個Edge對象,返回對象則是Edge的屬性對象。在例子中,屬性對象的類型并沒有改變,(都是String)但屬性的值有所變化。也可以變成其它的類型的對象。

val sheyouGraph = graph.mapEdges(x => {if("roommate".equals(x.attr)) "sheyou" else x.attr})


mapVertices: transform each vertex attribute in the graph using the map function

跟mapEdges類似,mapVerticies中傳入的對象也是Vertex的實(shí)例化對象,返回值也是頂點(diǎn)的屬性對象:

val oneAttrGraph = graph.mapVertices((id, attr) => {attr._1+ " is:"+attr._2})

mapTriplets: Transforms each edge attribute using the map function, passing it the adjacent(臨近的) vertex attributes as well.

也就是在mapTriplets中,與mapEdges不同的地方僅僅在于可以使用的作為map條件的東西多了鄰近的頂點(diǎn)的屬性,最終改變的東西仍然是edge的屬性。如果轉(zhuǎn)換中不需要根據(jù)頂點(diǎn)的屬性,就直接用mapEdges就行了。

什么是Triplet:

Triplet的全稱是EdgeTriplet,繼承自Edge,所代表的entity是:An edge along with the vertex attributes of its neighboring vertices. 一個EdgeTriplet中包含srcId, dstId, attr(繼承自Edge)和srcAttr和dstAttr五個屬性。

graph.mapTriplets(triplet => {.....})

二、Structural Operators:

1. subgraph:

方法的定義:

def subgraph(

????epred: EdgeTriplet[VD, ED] => Boolean = (x => true),

????vpred: (VertexId, VD) => Boolean = ((v, d) => true)

): Graph[VD, ED]

返回的對象是一個圖,圖中包含著的頂點(diǎn)和邊分別要滿足vpred和epred兩個函數(shù)。(要注意,頂點(diǎn)和邊是完全不同的概念,如果一個邊被砍掉了,這個邊關(guān)聯(lián)的兩個頂點(diǎn)并不會受影響)

要注意,在圖里,如果一個頂點(diǎn)沒了,其對應(yīng)的邊也就沒了,但邊沒了之后,點(diǎn)不會受影響。

所以,subgraph一般用于:restrict the graph to the vertices and edges of interest或者eliminate broken links.

2. joinVertices/outerJoinVerticies:

有時候需要從外部的RDD中跟Graph做數(shù)據(jù)的連接操作。例如:外部的user屬性想要跟現(xiàn)有的graph做一個合并,或者想把圖的頂點(diǎn)的屬性從一個圖遷移到另一個圖中。這些可以用join來完成。

def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]

def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]

joinVertices: 將頂點(diǎn)跟輸入的RDD[(VertexId, U)]做關(guān)聯(lián),返回一個新的圖。新的圖的屬性的類型跟原圖是一樣的,但值可以改變;在mapFunc中,可以使用原來的圖的頂點(diǎn)屬性和輸入的RDD的頂點(diǎn)屬性U來計(jì)算新的頂點(diǎn)屬性。輸入的RDD中每個vertex最多只能有一個vertex。如果原圖在input table中沒有對應(yīng)的entry,則原來的屬性不做改變。

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]

從函數(shù)的定義可以看出,該操作不會改變vertex的屬性的類型,但值是可以改變的。比如first name需要加上last name。

事實(shí)上,joinVerticies方法的實(shí)現(xiàn)中就使用了outerJoinVerticies方法:

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = {

????val uf = (id: VertexId, data: VD, o: Option[U]) => {

????????o match {

????????????case Some(u) => mapFunc(id, data, u) case None => data

????????}

????}

????graph.outerJoinVertices(table)(uf)

}

outerJoinVertices與joinVertices很類似,但在map方法中可以修改vertex的屬性類型。由于并非所有的圖的頂點(diǎn)都一定能跟傳入的RDD匹配上,所以定義mapFunc的時候使用了option選項(xiàng)。對于joinVerticies方法,如果某個頂點(diǎn)沒有跟傳入的RDD匹配上,就直接用原有的值。因?yàn)閖oinVerticies并不改變頂點(diǎn)的數(shù)據(jù)類型(有沒有忘了option跟Some、None之間的愛恨情仇?使用Option的時候一定離不開match,要注意match的語法)。

val outDegGraph = graph.outDegrees

val degGraph = graph.outerJoinVertices(outDegGraph){????

????(id, oldAttr, outDeg) => {????

????????outDeg match{

????????????case Some(outDeg) => outDegcase None => 0

????????}

????}

}

3. aggregateMessages(原來的名字叫做mapReduceTriplets):

如果需要將頂點(diǎn)跟其鄰居的信息集成起來,可以使用aggregateMessages方法。比如,想要知道有多少人follow了一個用戶,或者follow用戶的平均年齡。

函數(shù)的定義:

def aggregateMessages[A: ClassTag](

????sendMsg: EdgeContext[VD, ED, A] => Unit,

????mergeMessage: (A, A) => A,

????tripletFields: TripletFields = TripletFields.ALL

): VertexRDD[A]

跟mapReduceTriplets的定義很類似:

def mapReduceTriplets[Msg](

????map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],

????reduce: (Msg, Msg) => Msg

):VertexRDD[Msg]

VertexRDD繼承了RDD[(VertexId, VD)],本身自帶的泛型VD。

API:aggregate values from the neighboring edges and vertices of each vertex.

方法中的sendMsg是在圖中的每個邊都會被調(diào)用的,用于將message發(fā)送給相鄰頂點(diǎn)。

mergeMsg用于將sendMsg中發(fā)送給同一個頂點(diǎn)的message做組合。

tripletFields: 那些fields可以用于EdgeContext中,可用的值包括TripletFields.None, TripletFields.EdgeOnly, TripletFields.Src, TripletFields.Dst, TripletFields.ALL。默認(rèn)為ALL,也就是所有的信息都要用,如果只需要用部分?jǐn)?shù)據(jù),可以單獨(dú)選擇部分屬性發(fā)送,可以提升計(jì)算效率。

其中,Src和Dst分別會將source和destination field進(jìn)行傳遞,而且都會添加edge fields:

如果TripletFields中傳入的資源少了,也就是在sendMsg中需要使用到的信息并沒有包含在TripletFields中,可能會報空指針異常。

使用實(shí)例:計(jì)算graph的出度或入度:

val inDeg: RDD[(VertexId, Int)] = graph.aggregateMessages[Int](edgeContext => edgeContext.sendToDst(1), _+_)

由于這里我們并沒有用到edgeContext中的任何屬性,所以其實(shí)也可以在參數(shù)中添加TripletFields.None,從而提高一點(diǎn)執(zhí)行效率:

graph.aggregateMessages[Int](ctx => ctx.sendToDst(1), _+_ , TripletFields.None)

TIPS: 什么是EdgeContext:EdgeContext中會將source和destination屬性以及Edge屬性都暴露出來,包含sendToSrc和sendToDst來將信息發(fā)送給source和destination屬性。

4. reverse: return a new graph with all edge directions reversed.

調(diào)用方法:

val reverseGraph = graph.reverse

5. mask

mask用于創(chuàng)建一個子圖,子圖中包含在輸入的圖中也包含的頂點(diǎn)和邊。該方法通常跟subgraph方法一起使用,來根據(jù)另一個關(guān)聯(lián)的圖來過濾當(dāng)前的圖中展示的數(shù)據(jù)。

6. groupEdges:

groupEdges用于將多重圖中的相同的頂點(diǎn)之間的邊做合并(除了屬性其實(shí)沒其他可以合并的)。

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

7. 一組collect:

graph中有多個collect算法,包括collectEdges, collectNeighbours, collectNeighbourIds等

collectEdges: return an RDD that contains for each vertex its local edges

8. Pregel API:

圖本身就是內(nèi)在的遞歸的數(shù)據(jù)結(jié)構(gòu),因?yàn)橐粋€頂點(diǎn)的屬性可能依賴于其neighbor,而neighbor的屬性又依賴于他們的neighbour。所以很多重要的圖算法都會迭代計(jì)算每個頂點(diǎn)的屬性,直到達(dá)到一個穩(wěn)定狀態(tài)。

GraphX中的Pregel操作符是一個批量同步并行(bulk-synchronous parallel message abstraction)的messaging abstraction,用于圖的拓?fù)浣Y(jié)構(gòu)(topology of the graph)。The Pregel operator executes in a series of super steps in whichvertices receive the sum of their inbound messagesfrom the previous super step,compute a new valuefor the vertex property, and thensend messages to neighboring verticesin the next super step. Message是作為edge triplet的一個函數(shù)并行計(jì)算的,message的計(jì)算可以使用source和dest頂點(diǎn)的屬性。沒有收到message的頂點(diǎn)在super step中被跳過。迭代會在么有剩余的信息之后停止,并返回最終的圖。

pregel的定義:

def pregel[A]

????(initialMsg: A,//在第一次迭代中每個頂點(diǎn)獲取的起始

????msgmaxIter: Int = Int.MaxValue,//迭代計(jì)算的次數(shù)

????activeDir: EdgeDirection = EdgeDirection.Out

)(

????vprog: (VertexId, VD, A) => VD,//頂點(diǎn)的計(jì)算函數(shù),在每個頂點(diǎn)運(yùn)行,根據(jù)頂點(diǎn)的ID,屬性和獲取的inbound message來計(jì)算頂點(diǎn)的新屬性值。頂一次迭代的時候,inbound message為initialMsg,且每個頂點(diǎn)都會執(zhí)行一遍該函數(shù)。以后只有上次迭代中接收到信息的頂點(diǎn)會執(zhí)行。

????sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],//應(yīng)用于頂點(diǎn)的出邊(out edges)用于接收頂點(diǎn)發(fā)出的信息

????mergeMsg: (A, A) => A//合并信息的算法

)

算法實(shí)現(xiàn)的大致過程:

var g = mapVertices((vid, vdata) => vprog(vid, vdata, initMsg)).cache //第一步是根據(jù)initMsg在每個頂點(diǎn)執(zhí)行一次vprog算法,從而每個頂點(diǎn)的屬性都會迭代一次。

var messages = g.mapReduceTriplets(sendMsg, mergeMsg)

var messagesCount = messages.count

var i = 0

while(activeMessages > 0 && i < maxIterations){

????g = g.joinVertices(messages)(vprog).cache

????val oldMessages = messages

????messages = g.mapReduceTriplets(

????????sendMsg,

? ? ? ? mergeMsg,

????????Some((oldMessages, activeDirection))

????).cache()

????activeMessages = messages.count

????i += 1

}

g

pregel算法的一個實(shí)例:將圖跟一些一些初始的score做關(guān)聯(lián),然后將頂點(diǎn)分?jǐn)?shù)根據(jù)出度大小向外發(fā)散,并自己保留一份:

//將圖中頂點(diǎn)添加上該頂點(diǎn)的出度屬性

val graphWithDegree = graph.outerJoinVertices(graph.outDegrees){

????case (vid, name, deg) => (name, deg match {

????????case Some(deg) => deg+0.0

????????case None => 1.25}

????)

}//將圖與初始分?jǐn)?shù)做關(guān)聯(lián)

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

????case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))//將圖與初始分?jǐn)?shù)做關(guān)聯(lián)

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

????case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))

算法的第一步:將0.0(也就是傳入的初始值initMsg)跟各個頂點(diǎn)的值相加(還是原來的值),然后除以頂點(diǎn)的出度。這一步很重要,不能忽略。 并且在設(shè)計(jì)的時候也要考慮結(jié)果會不會被這一步所影響。

9. 計(jì)算圖的度、入度和出度:

graph.degrees

graph.outDegrees

graph.inDegrees

返回的對象是VertexRDD[Int]

注意的是返回的RDD對象中,度為0的頂點(diǎn)并不包含在內(nèi)。

10. filter方法:先計(jì)算一些用于過濾的值(preprocess),然后在使用predicate進(jìn)行過濾。

def filter[VD2: ClassTag, ED2: ClassTag](

????preprocess: Graph[VD, ED] => Graph[VD2, ED2],

????epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,

????vpred: (VertexId, VD2) => Boolean = (v: VertexId, d: VD2) => true

):Graph[VD, ED] = {

????graph.mask(

????????preprocess(graph).subgraph(epred, vpred)

????)

}

preprocess:a function to compute new vertex and edge data before filtering

要注意最后返回的圖跟傳入的圖的頂點(diǎn)和邊的屬性類型是一樣的。

該方法可以用于在不改變頂點(diǎn)和邊的屬性值(要注意的是,在preprocess中,使用graph的時候可能會有類似于修改graph操作的api調(diào)用,但在調(diào)用的過程中,graph本身的值不會發(fā)生改變。比如在下邊的例子的中,graph做了一個跟其degree關(guān)聯(lián)的操作,但graph本身的值沒有任何變化)的情況下對圖進(jìn)行基于某些屬性的過濾。這些屬性的值可以是計(jì)算得來的。例如,刪除圖中沒有出度的頂點(diǎn):

graph.filter(

????graph => {

????????val degrees: VertexRDD[Int] = graph.outDegrees

????????graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}

????},

????vpred = (vid: VertexId, deg:Int) => deg > 0

)

11. groupEdges:合并兩個頂點(diǎn)中的多條邊稱為一條邊。要獲取正確的結(jié)果,graph必須調(diào)用partitionBy來做partition。這是因?yàn)樵摬僮骷俣ㄐ枰黄鸷喜⒌倪叾挤植荚谕粋€partition上。所以在調(diào)用groupEdges之前必須調(diào)用partitionBy。

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED],也就是對邊的屬性做合并操作。

partitionBy: repartitions the edges in the graph according to partitionStrategy.

def partitionBy(partitionStrtegy: PartitionStrategy): Graph[VD, ED]

12. convertToCanonicalEdges:將雙向變轉(zhuǎn)化為單向邊。

具體的算法是:將所有邊都轉(zhuǎn)化成srcId小于dstId的邊,然后合并多余的邊。

二、Graph Builders:

http://spark.apache.org/docs/latest/graphx-programming-guide.html#graph_builders

GraphX提供了一組使用vertex和edge的集合來構(gòu)建一個圖的方法。這些Graph Builder默認(rèn)不會對邊做repartition,邊一般留在其原來的默認(rèn)的partition中,例如其原來的HDFS的block。

1. GraphLoader.edgeListFile:

用于從一組edge(每個edge中包括簡單的source id和destination id)中來構(gòu)建一個graph,自動創(chuàng)建其中涉及的頂點(diǎn),頂點(diǎn)和邊的屬性都設(shè)置為1。

def edgeListFile(

????sc: SparkContext,

????path: String,

????canonicalOrientation: Boolean = false,

????minEdgePartitions: Int = 1,

????edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

????vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[Int, Int]

canonicalOrientation參數(shù)可以強(qiáng)制讓邊按照srcId

2. Graph.apply:使用頂點(diǎn)和邊的RDD對象來創(chuàng)建一個圖。重復(fù)的頂點(diǎn)被任意拋棄,edgeRDD中有而verticiesRDD中沒有的頂點(diǎn)會被賦予一個默認(rèn)的屬性值。

def apply[VD, ED](

????vertices: RDD[(VertexId, VD)],

????edges: RDD[Edge[ED]],

????defaultVertexAttr: VD = null,

????edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

????vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, ED]

3. Graph.fromEdges: 從單獨(dú)的一個邊的RDD中構(gòu)建一個圖。自動創(chuàng)建邊中使用的頂點(diǎn),并賦予默認(rèn)值。

def fromEdges[VD, ED](

????edges: RDD[Edge[ED]],

????defaultValue: VD,

????edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

????vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, ED]

4. Graph.fromEdgeTuples: 使用一個edge tuple的RDD創(chuàng)建圖。邊的值默認(rèn)為1,頂點(diǎn)自動創(chuàng)建并賦予默認(rèn)值。該方法也支持對邊的deduplication(也就是去重)。如果發(fā)現(xiàn)多個相同的邊,就將他們合并,屬性值計(jì)算他們的和?;蛘邔⒅貜?fù)的邊當(dāng)做多條邊。

def fromEdgeTuples[VD](

????rawEdges: RDD[(VertexId, VertexId)],

????defaultValue: VD,

????uniqueEdges: Option[PartitionStrategy] = None,

????edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

????vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, Int]

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容