Graphx圖算法【1】三角形TriangleCount

Graphx的數(shù)三角形算法TriangleCount用于統(tǒng)計(jì)每個(gè)頂點(diǎn)所在的三角形個(gè)數(shù)。

1.1 簡介

對網(wǎng)絡(luò)圖中進(jìn)行三角形個(gè)數(shù)計(jì)數(shù)可以根據(jù)三角形數(shù)量反應(yīng)網(wǎng)絡(luò)中的稠密程度和質(zhì)量。

1.2 應(yīng)用場景

(一)用于社區(qū)發(fā)現(xiàn)

如微博中你關(guān)注的人也關(guān)注你,大家的關(guān)注關(guān)系中有很多三角形,說明社區(qū)很強(qiáng)很穩(wěn)定,大家聯(lián)系比較緊密;如果一個(gè)人只關(guān)注了很多人,卻沒有形成三角形,則說明社交群體很小很松散。

(二)衡量社群耦合關(guān)系的緊密程度

通過三角形數(shù)量來反應(yīng)社區(qū)內(nèi)部的緊密程度,作為一項(xiàng)參考指標(biāo)。

1.3 算法思路

計(jì)算規(guī)則:

如果一條邊的兩個(gè)頂點(diǎn)有共同的鄰居,則這三個(gè)點(diǎn)構(gòu)成三角形。

計(jì)算步驟:

   1.  為每個(gè)節(jié)點(diǎn)計(jì)算鄰居集合
   2.  對于每條邊,計(jì)算兩端節(jié)點(diǎn)鄰居集合的交集,將交集中元素個(gè)數(shù)告知兩端節(jié)點(diǎn),
       該個(gè)數(shù)即對應(yīng)著節(jié)點(diǎn)關(guān)聯(lián)的三角形數(shù)。
   3.  對每個(gè)節(jié)點(diǎn)合并三角形數(shù)目統(tǒng)計(jì)總數(shù),由于三角形中一個(gè)頂點(diǎn)關(guān)聯(lián)兩條邊,所以
       對于同一個(gè)三角形而言,一個(gè)頂點(diǎn)計(jì)算了兩次,故最終結(jié)果需要除以2。

1.4 源碼解析


object TriangleCount {

  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
    // Transform the edge data something cheap to shuffle and then canonicalize
    //得到的是一個(gè)無自連邊且無重復(fù)邊的、邊是從小id指向大id的圖
    val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
    // Get the triangle counts
    val counters = runPreCanonicalized(canonicalGraph).vertices
    // Join them bath with the original graph
    graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
      optCounter.getOrElse(0)
    }
  }


  def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
    // 構(gòu)建鄰居集合
    val nbrSets: VertexRDD[VertexSet] =
      // 收集鄰居節(jié)點(diǎn),邊方向?yàn)镋ither,保證點(diǎn)的入邊和出邊連接的鄰居點(diǎn)都會(huì)被收集
      graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
        val set = new VertexSet(nbrs.length)
        var i = 0
        while (i < nbrs.length) {
          // prevent self cycle
          if (nbrs(i) != vid) {
            set.add(nbrs(i))
          }
          i += 1
        }
        set
      }

    // 更新圖中頂點(diǎn)的屬性為鄰居點(diǎn)集合
    val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
      (vid, _, optSet) => optSet.getOrElse(null)
    }

    def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
      // 在邊上操作源點(diǎn)和終點(diǎn)的鄰居集合是,遍歷較小的集合,加快遍歷速度
      val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
        (ctx.srcAttr, ctx.dstAttr)
      } else {
        (ctx.dstAttr, ctx.srcAttr)
      }
      val iter = smallSet.iterator
      var counter: Int = 0
      while (iter.hasNext) {
        val vid = iter.next()
        if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) {
          counter += 1
        }
      }
      ctx.sendToSrc(counter)
      ctx.sendToDst(counter)
    }

    // 沿著圖中的邊計(jì)算兩個(gè)頂點(diǎn)的鄰居集合的交集,并為每個(gè)頂點(diǎn)合并消息(消息為三角形個(gè)數(shù))
    val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
    graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
      val dblCount = optCounter.getOrElse(0)
      // 算法為每個(gè)三角形計(jì)算了兩次,所以結(jié)果是偶數(shù)
      require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.")
      dblCount / 2        //注意最后需要除以2,每個(gè)三角形被計(jì)算了兩遍
    }
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容