spark graphx 圖計算

寫在前面

  • 態(tài)度決定高度!讓優(yōu)秀成為一種習慣!
  • 世界上沒有什么事兒是加一次班解決不了的,如果有,就加兩次?。? - -茂強)

什么是一個圖

  • 一個網(wǎng)絡


    Network
  • 一個樹


    Tree
  • 一個RDBMS


    RDMBMS
  • 一個稀疏矩陣


    稀疏矩陣網(wǎng)絡
  • 或者


    Kitchen sink

屬性圖

  • 頂點


    頂點

GRAPHX

graphx是一個圖計算引擎,而不是一個圖數(shù)據(jù)庫,它可以處理像倒排索引,推薦系統(tǒng),最短路徑,群體檢測等等

  • 有向圖與無向圖


    有向圖無向圖
  • 有環(huán)圖與無環(huán)圖
    兩者的區(qū)別在于是否能夠沿著方向構(gòu)成一個閉環(huán)


    有環(huán)圖無環(huán)圖
  • 有標簽圖與無標簽圖


    有標簽無標簽圖
  • 偽圖與循環(huán)
    從簡單的圖開始,當允許兩個節(jié)點之間有多個邊的時候,就是一個復合圖,如果在某個節(jié)點上加個循環(huán)就成了偽圖,GRAPHX中的圖都是偽圖


    偽圖與循環(huán)
  • 二部圖/偶圖
    偶圖有個特殊的結(jié)構(gòu),就是所有的頂點分為兩個數(shù)據(jù)集,所有的邊都是建立在這兩個數(shù)據(jù)集之間的,在一個數(shù)據(jù)集中不會存在邊


    偶圖
  • RDF(Resource Description Framework )圖與屬性圖


    RDF圖與屬性圖
  • 鄰接矩陣


    鄰接矩陣

SPARK GRAPHX

  • RDD


    DATA IN GRAPHX

    graphx中的Graph有兩個RDD,一個是邊RDD,一個是點RDD
    其中UML如下


    Graph UML
  • 理解三元組
    其實就是由(點、邊,點)的一個有效組合,由triplets()接口獲取


    三元組

    其中triplets()返回的結(jié)果是EdgeTriplet[VD,ED],EdgeTriplet[VD,ED]的屬性接口有:


    屬性接口
  • 理解aggregateMessages
    首先看下源碼:

def aggregateMessages[A: ClassTag](
         sendMsg: EdgeContext[VD, ED, A] => Unit,
         mergeMsg: (A, A) => A,
         tripletFields: TripletFields = TripletFields.All): VertexRDD[A] = {
  aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}

EdgeContext


EdgeContext

主要考慮


sendmsg

這兩個方法
這兩個方法一個吧triplets中數(shù)據(jù)發(fā)送到源節(jié)點

一個是把triplets中的數(shù)據(jù)發(fā)送到目的節(jié)點
這樣就可以在源或者目的節(jié)點進行聚合操作了
看個例子:

graph.aggregateMessages[Int](_.sendToSrc(1), _ + _).foreach(println)

這個例子就是求出圖的出度
sendToSrc(1)會針對每一個triplets向源節(jié)點發(fā)送1
如圖


三元組

會向2節(jié)點發(fā)送一個1
_ + _ :表示針對每個節(jié)點做相加的聚合
比如下圖5節(jié)點有4個triplets,采用sendToSrc方法后,它的聚合就是1+1 = 2
也就是它的出度



結(jié)果是
(4,1)
(3,1)
(5,3)
(2,1)
  • Pregel
    先看源碼
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
     (graph: Graph[VD, ED],
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)
     (vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
(1)graph:
輸入的圖
(2) initialMsg:
初始化消息,在第一次迭代的時候,這個初始消息會被用來初始化圖中的每個節(jié)點,在pregel進行調(diào)用時,會首先在圖上使用mapVertices來根據(jù)initialMsg的值更新每個節(jié)點的值,至于如何更新,則由vprog參數(shù)而定,vprog函數(shù)就接收了initialMsg消息做為參數(shù)來更新對應節(jié)點的值
(3) maxIterations:
最大迭代的次數(shù)
(4) activeDirection: 
活躍方向,首先理解活躍消息與活躍頂點,活躍節(jié)點是指在某一輪迭代中pregel以sendMsg和mergeMsg為參數(shù)來調(diào)用graph的aggregateMessage方法后收到消息的節(jié)點,活躍消息就是這輪迭代中所有被成功收到的消息。這樣一來,有的邊的src節(jié)點是活躍節(jié)點,有的dst節(jié)點是活躍節(jié)點,而有的邊兩端節(jié)點都是活躍節(jié)點。如果activeDirection參數(shù)指定為“EdgeDirection.Out”,則在下一輪迭代時,只有接收消息的出邊(src—>dst)才會執(zhí)行sendMsg函數(shù),也就是說,sendMsg回調(diào)函數(shù)會過濾掉”dst—>src”的edgeTriplet上下文參數(shù)
EdgeDirection.Out —sendMsg gets called if srcId received a message during the previous iteration, meaning this edge is considered an “out-edge” of srcId.
EdgeDirection.In—sendMsg gets called if dstId received a message during the previous iteration, meaning this edge is considered an “in-edge” of dstId.
EdgeDirection.Either—sendMsg gets called if either srcId or dstId received a message during the previous iteration.
EdgeDirection.Both —sendMsg gets called if both srcId and dstId received mes- sages during the previous iteration.
(5) vprog:
節(jié)點變換函數(shù),在初始時,在每輪迭代后,pregel會根據(jù)上一輪使用的msg和這里的vprod函數(shù)在圖上調(diào)用joinVertices方法變化每個收到消息的節(jié)點,注意這個函數(shù)除初始時外,都是僅在接收到消息的節(jié)點上運行,這一點可以從源碼中看到,源碼中用的是joinVertices(message)(vprog),因此,沒有收到消息的節(jié)點在join之后就濾掉了
(6) sendMsg: 
消息發(fā)送函數(shù),該函數(shù)的運行參數(shù)是一個代表邊的上下文,pregel在調(diào)用aggregateMessages時,會將EdgeContext轉(zhuǎn)換成EdgeTriplet對象(ctx.toEdgeTriplet)來使用,用戶需要通過Iterator[(VertexId,A)]指定發(fā)送哪些消息,發(fā)給那些節(jié)點,發(fā)送的內(nèi)容是什么,因為在一條邊上可以發(fā)送多個消息,有sendToDst和sendToSrc,所以這里是個Iterator,每一個元素是一個tuple,其中的vertexId表示要接收此消息的節(jié)點的id,它只能是該邊上的srcId或dstId,而A就是要發(fā)送的內(nèi)容,因此如果是需要由src發(fā)送一條消息A給dst,則有:Iterator((dstId,A)),如果什么消息也不發(fā)送,則可以返回一個空的Iterator:Iterator.empty
(7) mergeMsg: 
鄰居節(jié)點收到多條消息時的合并邏輯,注意它區(qū)別于vprog函數(shù),mergeMsg僅能合并消息內(nèi)容,但合并后并不會更新到節(jié)點中去,而vprog函數(shù)可以根據(jù)收到的消息(就是mergeMsg產(chǎn)生的結(jié)果)更新節(jié)點屬性
  • (最小路徑算法)
    從圖上可以看出最小路徑算法Dijkstra的原理
    a. 初始時,S只包含源點,即S={v},v的距離為0。U包含除v外的其他頂點,即:U={其余頂點},若v與U中頂點u有邊,則<u,v>正常有權(quán)值,若u不是v的出邊鄰接點,則<u,v>權(quán)值為∞。
    b. 從U中選取一個距離v最小的頂點k,把k,加入S中(該選定的距離就是v到k的最短路徑長度)。
    c. 以k為新考慮的中間點,修改U中各頂點的距離;若從源點v到頂點u的距離(經(jīng)過頂點k)比原來距離(不經(jīng)過頂點k)短,則修改頂點u的距離值,修改后的距離值的頂點k的距離加上邊上的權(quán)。
    d. 重復步驟b和c直到所有頂點都包含在S中。


    最小路徑
  • 在GRAPHX中
    GraphX 采用頂點切分方式進行分布式圖分割


    邊切分與頂點切分

    GraphX 不是沿著邊沿分割圖形,而是沿著頂點分割圖形,這可以減少通信和存儲開銷,在邏輯上,這對應于將邊緣分配給機器并允許頂點跨越多臺機器。分配邊緣的確切方法取決于PartitionStrategy各種啟發(fā)式的幾種折衷。用戶可以通過與Graph.partitionBy運算符重新分區(qū)圖來選擇不同的策略。默認分區(qū)策略是使用圖形構(gòu)建中提供的邊的初始分區(qū)(使用邊的 srcId 進行哈希分區(qū),將邊數(shù)據(jù)以多分區(qū)形式分布在集群),另外,頂點 RDD 中還擁有頂點到邊 RDD 分區(qū)的路由信息——路由表.路由表存在頂點 RDD 的分區(qū)中,它記錄分區(qū)內(nèi)頂點跟所有邊 RDD 分區(qū)的關(guān)系.在邊 RDD 需要頂點數(shù)據(jù)時(如構(gòu)造邊三元組),頂點 RDD 會根據(jù)路由表把頂點數(shù)據(jù)發(fā)送至邊 RDD 分區(qū)。


    分區(qū)

    如下圖按頂點分割方法將圖分解后得到頂點 RDD、邊 RDD 和路由表
    分區(qū)解釋圖

    GraphX 會依據(jù)路由表,從頂點 RDD 中生成與邊 RDD 分區(qū)相對應的重復頂點視圖( ReplicatedVertexView),它的作用是作為中間 RDD,將頂點數(shù)據(jù)傳送至邊 RDD 分區(qū)。重復頂點視圖按邊 RDD 分區(qū)并攜帶頂點數(shù)據(jù)的 RDD,如圖下圖所示,重復頂點分區(qū) A 中便攜了帶邊 RDD 分區(qū) A 中的所有的頂點,它與邊 RDD 中的頂點是 co-partition(即分區(qū)個數(shù)相同,且分區(qū)方法相同),在圖計算時, GraphX 將重復頂點視圖和邊 RDD 按分區(qū)進行拉鏈( zipPartition)操作,即將重復頂點視圖和邊 RDD 的分區(qū)一一對應地組合起來,從而將邊與頂點數(shù)據(jù)連接起來,使邊分區(qū)擁有頂點數(shù)據(jù)。在整個形成邊三元組過程中,只有在頂點 RDD 形成的重復頂點視圖中存在分區(qū)間數(shù)據(jù)移動,拉鏈操作不需要移動頂點數(shù)據(jù)和邊數(shù)據(jù).由于頂點數(shù)據(jù)一般比邊數(shù)據(jù)要少的多,而且隨著迭代次數(shù)的增加,需要更新的頂點數(shù)目也越來越少,重復頂點視圖中攜帶的頂點數(shù)據(jù)也會相應減少,這樣就可以大大減少集群中數(shù)據(jù)的移動量,加快執(zhí)行速度。
    重復頂點視圖

    重復頂點視圖有四種模式
    (1)bothAttr: 計算中需要每條邊的源頂點和目的頂點的數(shù)據(jù)
    (2)srcAttrOnly:計算中只需要每條邊的源頂點的數(shù)據(jù)
    (3)destAttrOnly:計算中只需要每條邊的目的頂點的數(shù)據(jù)
    (4)noAttr:計算中不需要頂點的數(shù)據(jù)


    bothAttr

    srcAttrOnly

    destAttrOnly

    noAttr

    重復頂點視圖創(chuàng)建之后就會被加載到內(nèi)存,因為圖計算過程中,他可能會被多次使用,如果程序不再使用重復頂點視圖,那么就需要手動調(diào)用GraphImpl中的unpersistVertices,將其從內(nèi)存中刪除。
    生成重復頂點視圖時,在邊RDD的每個分區(qū)中創(chuàng)建集合,存儲該分區(qū)包含的源頂點和目的頂點的ID集合,該集合被稱作本地頂點ID映射(local VertexId Map),在生成重復頂點視圖時,若重復頂點視圖時第一次被創(chuàng)建,則把本地頂點ID映射和發(fā)送給邊RDD各分區(qū)的頂點數(shù)據(jù)組合起來,在每個分區(qū)中以分區(qū)的本地頂點ID映射為索引存儲頂點數(shù)據(jù),生成新的頂點分區(qū),最后得到一個新的頂點RDD,若重復頂點視圖不是第一次被創(chuàng)建,則使用之前重復頂點視圖創(chuàng)建的頂點RDD預發(fā)送給邊RDD各分區(qū)的丁帶你更新數(shù)據(jù)進行連接(join)操作,更新頂點RDD中頂點的數(shù)據(jù),生成新的頂點RDD。

GraphX 在頂點 RDD 和邊 RDD 的分區(qū)中以數(shù)組形式存儲頂點數(shù)據(jù)和邊數(shù)據(jù),目的是為了不損失元素訪問性能。同時,GraphX 在分區(qū)里建立了眾多索引結(jié)構(gòu),高效地實現(xiàn)快速訪問頂點數(shù)據(jù)或邊數(shù)據(jù)。在迭代過程中,圖的結(jié)構(gòu)不會發(fā)生變化,因而頂點 RDD、邊 RDD 以及重復頂點視圖中的索引結(jié)構(gòu)全部可以重用,當由一個圖生成另一個圖時,只須更新頂點 RDD 和邊 RDD 的數(shù)據(jù)存儲數(shù)組,因此,索引結(jié)構(gòu)的重用保持了GraphX 高性能,也是相對于原生 RDD 實現(xiàn)圖模型性能能夠大幅提高的主要原因。

-分區(qū)方式簡介


分區(qū)方式

算法

  • 最小路徑算法
val sourceId: VertexId = 5L
    val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
    val sssp = initialGraph.pregel(
      Double.PositiveInfinity,
      activeDirection = EdgeDirection.Out
    )(
      (vertexId, vertexValue, msg) =>
        math.min(vertexValue, msg),//vprog,作用是處理到達頂點的參數(shù),取較小的那個作為頂點的值
      triplet => { //sendMsg,計算權(quán)重,如果鄰居節(jié)點的屬性加上邊上的距離小于該節(jié)點的屬性,說明從源節(jié)點比從鄰居節(jié)點到該頂點的距離更小,更新值
          if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
            Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
          } else {
            Iterator.empty
          }
      },
      (a, b) => math.min(a, b) //mergeMsg,合并到達頂點的所有信息
    )
    println(sssp.vertices.collect.mkString("\n"))

以上代碼是求節(jié)點ID為5的所有可到達節(jié)點的最短路徑
算法詳解:首先initialGraph就先遍歷所有的節(jié)點吧我們設置的目標節(jié)點設置的屬性值設置成0.0其他的所有節(jié)點設置成正無窮,pregel中的Double.PositiveInfinity是初始化參數(shù),在pregel執(zhí)行的過程中的第一次迭代時,會初始化所有的節(jié)點屬性值,會根據(jù)下邊的vprog = (vertexId, vertexValue, msg) => math.min(vertexValue, msg),//(vprog,作用是處理到達頂點的參數(shù),取較小的那個作為頂點的值)去處理所有的節(jié)點,所以,初始化后除了5節(jié)點的屬性值為0.0外,其他的都是正無窮。activeDirection = EdgeDirection.Out限定所有的有效方向是出邊,triplet限定了只有在每次迭代中滿足triplet.srcAttr + triplet.attr < triplet.dstAttr條件的才會更新當前節(jié)點值,最后(a, b) => math.min(a, b)方法合并了迭代到當前所有接受到消息的頂點的屬性值,也就是說找到源頂點到可達頂點中的路徑最小的那個可達頂點。不斷的迭代下去,最后掃描完整個圖,最終得出到所有可達頂點最短路徑。

  • 找出目標節(jié)點所有的2跳節(jié)點
val friends = Pregel(
      graph.mapVertices((vid,value) => if(vid ==2) 2 else -1),//初始化信息,源節(jié)點為2,其他節(jié)點為-1
      -1,
      2,
      EdgeDirection.Either
    )(
      vprog = (vid,attr,msg) =>math.max(attr, msg),//頂點操作,到來的屬性和原屬性比較,較大的作為該節(jié)點的屬性
      edge => {
        if (edge.srcAttr <= 0) {
          if (edge.dstAttr <= 0) {
            Iterator.empty//都小于0,說明從源節(jié)點還沒有傳遞到這里
          }else {
            Iterator((edge.srcId,edge.dstAttr - 1))//目的節(jié)點大于0,將目的節(jié)點屬性減一賦值給源節(jié)點
          }
        }else {
          if(edge.dstAttr <= 0) {
            Iterator((edge.dstId,edge.srcAttr -1))//源節(jié)點大于0,將源節(jié)點屬性減一賦值給目的節(jié)點
          }else {
            Iterator.empty//都大于0,說明在二跳節(jié)點以內(nèi),不操作
          }
        }
      },
      (a,b) => math.max(a, b)//當有多個屬性傳遞到一個節(jié)點,取大的,因為大的離源節(jié)點更近
    ).subgraph(vpred =(vid,v) =>v >= 0)
    friends.vertices.collect.foreach(println(_))

算法詳解:首先,把目標節(jié)點的屬性值置為2,初始化其他的所有的節(jié)點的屬性值為-1,第一次迭代消息(-1)初始化就是根據(jù)vprog = (vid, attr, msg) => math.max(attr, msg)再過濾一遍節(jié)點,在剩下的迭代過程中,edge中的條件限定只掃描:
(1)如果源小于0,目標也小于0,則不發(fā)消息
(2)如果源小于0,目標大于0,則目標值-1賦給源節(jié)點
(3)如果源大于0,目標值也大于0,則不發(fā)消息
(4)如果源大于0,目標值小于0,則把源-1賦給目標節(jié)點
也就是說只會在有正負差距的的節(jié)點之間才會有消息傳遞


初始化圖

條件遍歷

算法

  • pageRank
    該算法就不過多介紹了,直接上代碼,基于graphx的實現(xiàn),想了解具體算法的請百度或者google一大堆
    這里首先假設了你已經(jīng)加載了一個圖
graph.pageRank(0.001,0.15)
      .vertices   //列出所有點
      .sortBy(_._2, false) //根據(jù)pagerank降序排序
      .take(20)  //取出前20個
      .foreach(println)

很簡單,解釋下參數(shù):0.001是個容忍度,是在對下邊公式進行迭代過程中退出迭代的條件,0.15也是默認的初始跳轉(zhuǎn)概率,也就是公式中的resetProb


公式
  • 個性化pageRank
    該算法主要用于推薦中,比如社交網(wǎng)絡中,對于某個人來說,你想給他再推薦一個人,當然這個被推薦的這個人肯定是那個某人感興趣的?;蛘邔τ谟脩羯唐返耐扑]中,用戶商品兩個實體可以形成一個圖,我們就可以根據(jù)具體的某個用戶來給他推薦一些商品
       graph.personalizedPageRank(34175, 0.001) //某人是34175
      .vertices
      .filter(_._1 != 34175)
      .reduce((a,b) => if (a._2 > b._2) a else b)  //找出那個34175感興趣的人
  • 三角環(huán)統(tǒng)計
    三角環(huán)統(tǒng)計應用場景:大規(guī)模的社區(qū)發(fā)現(xiàn),通過該算法可以做群體性檢測,社交網(wǎng)絡中就是那種組團的、關(guān)系復雜的,互相有一腿情況比較多的。也就是說,在某個用戶下邊,這個人擁有越多的三角形環(huán),那么這個人就擁有越多的連接,這樣就可以檢測一些小團體,小派系等,同時也可以支持一些推薦,確認一些造謠生事者(能夠根據(jù)圖去找到謠言的散播者),只要是跟大規(guī)模小團體檢測方面該算法都可以很好的支持
graph.triangleCount()
      .vertices
      .sortBy(_._2, false)
      .take(20)
      .foreach(println)

找出擁有三角形環(huán)關(guān)系的最多的頂點

  • 最短路徑算法
    最酸路徑算法的原理上面已經(jīng)說過了,現(xiàn)在利用graphx內(nèi)置的方式實現(xiàn)
ShortestPaths.run(diseaseSymptom,Array(19328L))
      .vertices
      .filter(!_._2.isEmpty)
      .foreach(println)

其中19328L是自定義的起始點

(266,Map(19328 -> 15))
(282,Map(19328 -> 12))
(770,Map(19328 -> 9))
(1730,Map(19328 -> 11))
(2170,Map(19328 -> 6))
(1530,Map(19328 -> 13))
(1346,Map(19328 -> 14))
(378,Map(19328 -> 3))
(1378,Map(19328 -> 11))
(970,Map(19328 -> 10))
...

結(jié)果如上,(266,Map(19328 -> 15))表示19328到266的最短路徑為15

  • 獨立群體檢測:
    獨立群體檢測就是發(fā)現(xiàn)那些不合群的成分,如下圖:


    獨立成分
val g = Graph(sc.makeRDD((1L to 7L).map((_,""))),
      sc.makeRDD(Array(Edge(2L,5L,""), Edge(5L,3L,""), Edge(3L,2L,""),
        Edge(4L,5L,""), Edge(6L,7L,""))))
    g.connectedComponents
      .vertices
      .map(_.swap)
      .groupByKey()
      .map(_._2)
      .foreach(println)

輸出結(jié)果:

CompactBuffer(6, 7)
CompactBuffer(4, 2, 3, 5)
CompactBuffer(1)
  • 強連接網(wǎng)絡
    所謂的強連接網(wǎng)絡就是:在這個網(wǎng)絡中無論你從哪個頂點開始,其他所有頂點都是可達的,就如下圖:


    強連接網(wǎng)絡
g.stronglyConnectedComponents(3)
      .vertices.map(_.swap)
      .groupByKey()
      .map(_._2)
      .foreach(println)

其中3是最大迭代次數(shù),在上邊圖中,迭代三次剛好,也可以設置的大一點,不過結(jié)果都是一樣的

  • 標簽傳播算法(LPA)
    主要是用于團體檢測,LPA能夠以接近線性復雜度去檢測一個大規(guī)模圖中的團體結(jié)構(gòu),主要思想是給所有頂點中的密集連接組打上一個唯一標簽,這些擁有相同標簽的組就是所謂的團體
    該算法常常是不收斂的,如下圖


    標簽傳播算法

    該算法也可以用于半監(jiān)督學習(大部分沒有標簽,小部分有標簽),給那些沒有標簽的通過標簽傳播算法進行打標簽。也可以應用于風控,對于通過已有風險評估的人,通過社交網(wǎng)絡去評估何其有關(guān)系的人的風險

  • Dijkstra算法的實現(xiàn)


    算法圖

    就拿這個圖為例
    算法步驟就是:
    (1)首先初始化圖,把起始目標節(jié)點屬性值設置成0,其他的節(jié)點設置成正無窮,同時把節(jié)點狀態(tài)全部設置成未激活狀態(tài)
    (2)然后進入迭代操作,迭代的次數(shù)為所有頂點的個數(shù),進入迭代過程:找到當前的節(jié)點(就是每次迭代過程中紅色的點),每次迭代都會生成一個新的圖,主要是因為RDD是不可變的,如果想更新一個RDD就必須生成一個新的RDD然后把兩個RDD再join起來,所以接下來就是生成新圖的過程,針對剛才找到的當前節(jié)點,我們向它的目的指向頂點發(fā)送消息,消息就是當前節(jié)點的屬性值加上指向邊上的權(quán)重,然后再合并目的節(jié)點的屬性值,取其中最小的屬性值,其實就是選擇當前節(jié)點的目的一個最優(yōu)目的節(jié)點作為下一輪迭代的當前節(jié)點。在當前節(jié)點中,發(fā)送消以及合并目的節(jié)點的屬性值以后就會生成一個新的圖,為了更新初始圖,我們這里只能outerJoinVertices,把兩個圖join起來,這樣不停的迭代,直到所有頂點都是激活的

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
    //初始化起始節(jié)點的屬性值
    var g2 = g.mapVertices(
      (vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue))
    for (i <- 1L to g.vertices.count-1) {
      val currentVertexId =
        g2.vertices.filter(!_._2._1)
          .fold((0L,(false,Double.MaxValue)))((a,b) =>
            if (a._2._2 < b._2._2) a else b)
          ._1
      val newDistances = g2.aggregateMessages[Double](
        ctx => if (ctx.srcId == currentVertexId)
          ctx.sendToDst(ctx.srcAttr._2 + ctx.attr),
        (a,b) => math.min(a,b))
      g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) =>
        (vd._1 || vid == currentVertexId,
          math.min(vd._2, newSum.getOrElse(Double.MaxValue))))
    }
    g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
      (vd, dist.getOrElse((false,Double.MaxValue))._2))
  }

待續(xù)-------

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容