一、屬性操作符:
graph中提供了對vertex,edge和triplet的map操作,類似于RDD中的map操作:
def mapVertices[VD2](map:(VertexId, VD)=> VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
使用這些方法不會改變圖的結(jié)構(gòu),所以這些操作符可以利用原有的圖的structural indicies。所以不要用graph.vertices.map的方法來實(shí)現(xiàn)同樣的操作。
mapEdges: transform each edge attribute in the graph using the map function.
實(shí)例:注意在mapEdges中使用的函數(shù)里,輸入?yún)?shù)x是一個Edge對象,返回對象則是Edge的屬性對象。在例子中,屬性對象的類型并沒有改變,(都是String)但屬性的值有所變化。也可以變成其它的類型的對象。
val sheyouGraph = graph.mapEdges(x => {if("roommate".equals(x.attr)) "sheyou" else x.attr})
mapVertices: transform each vertex attribute in the graph using the map function
跟mapEdges類似,mapVerticies中傳入的對象也是Vertex的實(shí)例化對象,返回值也是頂點(diǎn)的屬性對象:
val oneAttrGraph = graph.mapVertices((id, attr) => {attr._1+ " is:"+attr._2})
mapTriplets: Transforms each edge attribute using the map function, passing it the adjacent(臨近的) vertex attributes as well.
也就是在mapTriplets中,與mapEdges不同的地方僅僅在于可以使用的作為map條件的東西多了鄰近的頂點(diǎn)的屬性,最終改變的東西仍然是edge的屬性。如果轉(zhuǎn)換中不需要根據(jù)頂點(diǎn)的屬性,就直接用mapEdges就行了。
什么是Triplet:
Triplet的全稱是EdgeTriplet,繼承自Edge,所代表的entity是:An edge along with the vertex attributes of its neighboring vertices. 一個EdgeTriplet中包含srcId, dstId, attr(繼承自Edge)和srcAttr和dstAttr五個屬性。
graph.mapTriplets(triplet => {.....})
二、Structural Operators:
1. subgraph:
方法的定義:
def subgraph(
????epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
????vpred: (VertexId, VD) => Boolean = ((v, d) => true)
): Graph[VD, ED]
返回的對象是一個圖,圖中包含著的頂點(diǎn)和邊分別要滿足vpred和epred兩個函數(shù)。(要注意,頂點(diǎn)和邊是完全不同的概念,如果一個邊被砍掉了,這個邊關(guān)聯(lián)的兩個頂點(diǎn)并不會受影響)
要注意,在圖里,如果一個頂點(diǎn)沒了,其對應(yīng)的邊也就沒了,但邊沒了之后,點(diǎn)不會受影響。
所以,subgraph一般用于:restrict the graph to the vertices and edges of interest或者eliminate broken links.
2. joinVertices/outerJoinVerticies:
有時候需要從外部的RDD中跟Graph做數(shù)據(jù)的連接操作。例如:外部的user屬性想要跟現(xiàn)有的graph做一個合并,或者想把圖的頂點(diǎn)的屬性從一個圖遷移到另一個圖中。這些可以用join來完成。
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
joinVertices: 將頂點(diǎn)跟輸入的RDD[(VertexId, U)]做關(guān)聯(lián),返回一個新的圖。新的圖的屬性的類型跟原圖是一樣的,但值可以改變;在mapFunc中,可以使用原來的圖的頂點(diǎn)屬性和輸入的RDD的頂點(diǎn)屬性U來計(jì)算新的頂點(diǎn)屬性。輸入的RDD中每個vertex最多只能有一個vertex。如果原圖在input table中沒有對應(yīng)的entry,則原來的屬性不做改變。
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
從函數(shù)的定義可以看出,該操作不會改變vertex的屬性的類型,但值是可以改變的。比如first name需要加上last name。
事實(shí)上,joinVerticies方法的實(shí)現(xiàn)中就使用了outerJoinVerticies方法:
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = {
????val uf = (id: VertexId, data: VD, o: Option[U]) => {
????????o match {
????????????case Some(u) => mapFunc(id, data, u) case None => data
????????}
????}
????graph.outerJoinVertices(table)(uf)
}
outerJoinVertices與joinVertices很類似,但在map方法中可以修改vertex的屬性類型。由于并非所有的圖的頂點(diǎn)都一定能跟傳入的RDD匹配上,所以定義mapFunc的時候使用了option選項(xiàng)。對于joinVerticies方法,如果某個頂點(diǎn)沒有跟傳入的RDD匹配上,就直接用原有的值。因?yàn)閖oinVerticies并不改變頂點(diǎn)的數(shù)據(jù)類型(有沒有忘了option跟Some、None之間的愛恨情仇?使用Option的時候一定離不開match,要注意match的語法)。
val outDegGraph = graph.outDegrees
val degGraph = graph.outerJoinVertices(outDegGraph){????
????(id, oldAttr, outDeg) => {????
????????outDeg match{
????????????case Some(outDeg) => outDegcase None => 0
????????}
????}
}
3. aggregateMessages(原來的名字叫做mapReduceTriplets):
如果需要將頂點(diǎn)跟其鄰居的信息集成起來,可以使用aggregateMessages方法。比如,想要知道有多少人follow了一個用戶,或者follow用戶的平均年齡。
函數(shù)的定義:
def aggregateMessages[A: ClassTag](
????sendMsg: EdgeContext[VD, ED, A] => Unit,
????mergeMessage: (A, A) => A,
????tripletFields: TripletFields = TripletFields.ALL
): VertexRDD[A]
跟mapReduceTriplets的定義很類似:
def mapReduceTriplets[Msg](
????map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
????reduce: (Msg, Msg) => Msg
):VertexRDD[Msg]
VertexRDD繼承了RDD[(VertexId, VD)],本身自帶的泛型VD。
API:aggregate values from the neighboring edges and vertices of each vertex.
方法中的sendMsg是在圖中的每個邊都會被調(diào)用的,用于將message發(fā)送給相鄰頂點(diǎn)。
mergeMsg用于將sendMsg中發(fā)送給同一個頂點(diǎn)的message做組合。
tripletFields: 那些fields可以用于EdgeContext中,可用的值包括TripletFields.None, TripletFields.EdgeOnly, TripletFields.Src, TripletFields.Dst, TripletFields.ALL。默認(rèn)為ALL,也就是所有的信息都要用,如果只需要用部分?jǐn)?shù)據(jù),可以單獨(dú)選擇部分屬性發(fā)送,可以提升計(jì)算效率。
其中,Src和Dst分別會將source和destination field進(jìn)行傳遞,而且都會添加edge fields:
如果TripletFields中傳入的資源少了,也就是在sendMsg中需要使用到的信息并沒有包含在TripletFields中,可能會報空指針異常。
使用實(shí)例:計(jì)算graph的出度或入度:
val inDeg: RDD[(VertexId, Int)] = graph.aggregateMessages[Int](edgeContext => edgeContext.sendToDst(1), _+_)
由于這里我們并沒有用到edgeContext中的任何屬性,所以其實(shí)也可以在參數(shù)中添加TripletFields.None,從而提高一點(diǎn)執(zhí)行效率:
graph.aggregateMessages[Int](ctx => ctx.sendToDst(1), _+_ , TripletFields.None)
TIPS: 什么是EdgeContext:EdgeContext中會將source和destination屬性以及Edge屬性都暴露出來,包含sendToSrc和sendToDst來將信息發(fā)送給source和destination屬性。
4. reverse: return a new graph with all edge directions reversed.
調(diào)用方法:
val reverseGraph = graph.reverse
5. mask
mask用于創(chuàng)建一個子圖,子圖中包含在輸入的圖中也包含的頂點(diǎn)和邊。該方法通常跟subgraph方法一起使用,來根據(jù)另一個關(guān)聯(lián)的圖來過濾當(dāng)前的圖中展示的數(shù)據(jù)。
6. groupEdges:
groupEdges用于將多重圖中的相同的頂點(diǎn)之間的邊做合并(除了屬性其實(shí)沒其他可以合并的)。
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
7. 一組collect:
graph中有多個collect算法,包括collectEdges, collectNeighbours, collectNeighbourIds等
collectEdges: return an RDD that contains for each vertex its local edges
8. Pregel API:
圖本身就是內(nèi)在的遞歸的數(shù)據(jù)結(jié)構(gòu),因?yàn)橐粋€頂點(diǎn)的屬性可能依賴于其neighbor,而neighbor的屬性又依賴于他們的neighbour。所以很多重要的圖算法都會迭代計(jì)算每個頂點(diǎn)的屬性,直到達(dá)到一個穩(wěn)定狀態(tài)。
GraphX中的Pregel操作符是一個批量同步并行(bulk-synchronous parallel message abstraction)的messaging abstraction,用于圖的拓?fù)浣Y(jié)構(gòu)(topology of the graph)。The Pregel operator executes in a series of super steps in whichvertices receive the sum of their inbound messagesfrom the previous super step,compute a new valuefor the vertex property, and thensend messages to neighboring verticesin the next super step. Message是作為edge triplet的一個函數(shù)并行計(jì)算的,message的計(jì)算可以使用source和dest頂點(diǎn)的屬性。沒有收到message的頂點(diǎn)在super step中被跳過。迭代會在么有剩余的信息之后停止,并返回最終的圖。
pregel的定義:
def pregel[A]
????(initialMsg: A,//在第一次迭代中每個頂點(diǎn)獲取的起始
????msgmaxIter: Int = Int.MaxValue,//迭代計(jì)算的次數(shù)
????activeDir: EdgeDirection = EdgeDirection.Out
)(
????vprog: (VertexId, VD, A) => VD,//頂點(diǎn)的計(jì)算函數(shù),在每個頂點(diǎn)運(yùn)行,根據(jù)頂點(diǎn)的ID,屬性和獲取的inbound message來計(jì)算頂點(diǎn)的新屬性值。頂一次迭代的時候,inbound message為initialMsg,且每個頂點(diǎn)都會執(zhí)行一遍該函數(shù)。以后只有上次迭代中接收到信息的頂點(diǎn)會執(zhí)行。
????sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],//應(yīng)用于頂點(diǎn)的出邊(out edges)用于接收頂點(diǎn)發(fā)出的信息
????mergeMsg: (A, A) => A//合并信息的算法
)
算法實(shí)現(xiàn)的大致過程:
var g = mapVertices((vid, vdata) => vprog(vid, vdata, initMsg)).cache //第一步是根據(jù)initMsg在每個頂點(diǎn)執(zhí)行一次vprog算法,從而每個頂點(diǎn)的屬性都會迭代一次。
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var messagesCount = messages.count
var i = 0
while(activeMessages > 0 && i < maxIterations){
????g = g.joinVertices(messages)(vprog).cache
????val oldMessages = messages
????messages = g.mapReduceTriplets(
????????sendMsg,
? ? ? ? mergeMsg,
????????Some((oldMessages, activeDirection))
????).cache()
????activeMessages = messages.count
????i += 1
}
g
pregel算法的一個實(shí)例:將圖跟一些一些初始的score做關(guān)聯(lián),然后將頂點(diǎn)分?jǐn)?shù)根據(jù)出度大小向外發(fā)散,并自己保留一份:
//將圖中頂點(diǎn)添加上該頂點(diǎn)的出度屬性
val graphWithDegree = graph.outerJoinVertices(graph.outDegrees){
????case (vid, name, deg) => (name, deg match {
????????case Some(deg) => deg+0.0
????????case None => 1.25}
????)
}//將圖與初始分?jǐn)?shù)做關(guān)聯(lián)
val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){
????case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))
}
graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))//將圖與初始分?jǐn)?shù)做關(guān)聯(lián)
val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){
????case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))
}
graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))
算法的第一步:將0.0(也就是傳入的初始值initMsg)跟各個頂點(diǎn)的值相加(還是原來的值),然后除以頂點(diǎn)的出度。這一步很重要,不能忽略。 并且在設(shè)計(jì)的時候也要考慮結(jié)果會不會被這一步所影響。
9. 計(jì)算圖的度、入度和出度:
graph.degrees
graph.outDegrees
graph.inDegrees
返回的對象是VertexRDD[Int]
注意的是返回的RDD對象中,度為0的頂點(diǎn)并不包含在內(nèi)。
10. filter方法:先計(jì)算一些用于過濾的值(preprocess),然后在使用predicate進(jìn)行過濾。
def filter[VD2: ClassTag, ED2: ClassTag](
????preprocess: Graph[VD, ED] => Graph[VD2, ED2],
????epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
????vpred: (VertexId, VD2) => Boolean = (v: VertexId, d: VD2) => true
):Graph[VD, ED] = {
????graph.mask(
????????preprocess(graph).subgraph(epred, vpred)
????)
}
preprocess:a function to compute new vertex and edge data before filtering
要注意最后返回的圖跟傳入的圖的頂點(diǎn)和邊的屬性類型是一樣的。
該方法可以用于在不改變頂點(diǎn)和邊的屬性值(要注意的是,在preprocess中,使用graph的時候可能會有類似于修改graph操作的api調(diào)用,但在調(diào)用的過程中,graph本身的值不會發(fā)生改變。比如在下邊的例子的中,graph做了一個跟其degree關(guān)聯(lián)的操作,但graph本身的值沒有任何變化)的情況下對圖進(jìn)行基于某些屬性的過濾。這些屬性的值可以是計(jì)算得來的。例如,刪除圖中沒有出度的頂點(diǎn):
graph.filter(
????graph => {
????????val degrees: VertexRDD[Int] = graph.outDegrees
????????graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
????},
????vpred = (vid: VertexId, deg:Int) => deg > 0
)
11. groupEdges:合并兩個頂點(diǎn)中的多條邊稱為一條邊。要獲取正確的結(jié)果,graph必須調(diào)用partitionBy來做partition。這是因?yàn)樵摬僮骷俣ㄐ枰黄鸷喜⒌倪叾挤植荚谕粋€partition上。所以在調(diào)用groupEdges之前必須調(diào)用partitionBy。
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED],也就是對邊的屬性做合并操作。
partitionBy: repartitions the edges in the graph according to partitionStrategy.
def partitionBy(partitionStrtegy: PartitionStrategy): Graph[VD, ED]
12. convertToCanonicalEdges:將雙向變轉(zhuǎn)化為單向邊。
具體的算法是:將所有邊都轉(zhuǎn)化成srcId小于dstId的邊,然后合并多余的邊。
二、Graph Builders:
http://spark.apache.org/docs/latest/graphx-programming-guide.html#graph_builders
GraphX提供了一組使用vertex和edge的集合來構(gòu)建一個圖的方法。這些Graph Builder默認(rèn)不會對邊做repartition,邊一般留在其原來的默認(rèn)的partition中,例如其原來的HDFS的block。
1. GraphLoader.edgeListFile:
用于從一組edge(每個edge中包括簡單的source id和destination id)中來構(gòu)建一個graph,自動創(chuàng)建其中涉及的頂點(diǎn),頂點(diǎn)和邊的屬性都設(shè)置為1。
def edgeListFile(
????sc: SparkContext,
????path: String,
????canonicalOrientation: Boolean = false,
????minEdgePartitions: Int = 1,
????edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
????vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[Int, Int]
canonicalOrientation參數(shù)可以強(qiáng)制讓邊按照srcId
2. Graph.apply:使用頂點(diǎn)和邊的RDD對象來創(chuàng)建一個圖。重復(fù)的頂點(diǎn)被任意拋棄,edgeRDD中有而verticiesRDD中沒有的頂點(diǎn)會被賦予一個默認(rèn)的屬性值。
def apply[VD, ED](
????vertices: RDD[(VertexId, VD)],
????edges: RDD[Edge[ED]],
????defaultVertexAttr: VD = null,
????edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
????vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[VD, ED]
3. Graph.fromEdges: 從單獨(dú)的一個邊的RDD中構(gòu)建一個圖。自動創(chuàng)建邊中使用的頂點(diǎn),并賦予默認(rèn)值。
def fromEdges[VD, ED](
????edges: RDD[Edge[ED]],
????defaultValue: VD,
????edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
????vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[VD, ED]
4. Graph.fromEdgeTuples: 使用一個edge tuple的RDD創(chuàng)建圖。邊的值默認(rèn)為1,頂點(diǎn)自動創(chuàng)建并賦予默認(rèn)值。該方法也支持對邊的deduplication(也就是去重)。如果發(fā)現(xiàn)多個相同的邊,就將他們合并,屬性值計(jì)算他們的和?;蛘邔⒅貜?fù)的邊當(dāng)做多條邊。
def fromEdgeTuples[VD](
????rawEdges: RDD[(VertexId, VertexId)],
????defaultValue: VD,
????uniqueEdges: Option[PartitionStrategy] = None,
????edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
????vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[VD, Int]