Graphx的數(shù)三角形算法TriangleCount用于統(tǒng)計(jì)每個(gè)頂點(diǎn)所在的三角形個(gè)數(shù)。
1.1 簡介
對網(wǎng)絡(luò)圖中進(jìn)行三角形個(gè)數(shù)計(jì)數(shù)可以根據(jù)三角形數(shù)量反應(yīng)網(wǎng)絡(luò)中的稠密程度和質(zhì)量。
1.2 應(yīng)用場景
(一)用于社區(qū)發(fā)現(xiàn)
如微博中你關(guān)注的人也關(guān)注你,大家的關(guān)注關(guān)系中有很多三角形,說明社區(qū)很強(qiáng)很穩(wěn)定,大家聯(lián)系比較緊密;如果一個(gè)人只關(guān)注了很多人,卻沒有形成三角形,則說明社交群體很小很松散。
(二)衡量社群耦合關(guān)系的緊密程度
通過三角形數(shù)量來反應(yīng)社區(qū)內(nèi)部的緊密程度,作為一項(xiàng)參考指標(biāo)。
1.3 算法思路
計(jì)算規(guī)則:
如果一條邊的兩個(gè)頂點(diǎn)有共同的鄰居,則這三個(gè)點(diǎn)構(gòu)成三角形。
計(jì)算步驟:
1. 為每個(gè)節(jié)點(diǎn)計(jì)算鄰居集合
2. 對于每條邊,計(jì)算兩端節(jié)點(diǎn)鄰居集合的交集,將交集中元素個(gè)數(shù)告知兩端節(jié)點(diǎn),
該個(gè)數(shù)即對應(yīng)著節(jié)點(diǎn)關(guān)聯(lián)的三角形數(shù)。
3. 對每個(gè)節(jié)點(diǎn)合并三角形數(shù)目統(tǒng)計(jì)總數(shù),由于三角形中一個(gè)頂點(diǎn)關(guān)聯(lián)兩條邊,所以
對于同一個(gè)三角形而言,一個(gè)頂點(diǎn)計(jì)算了兩次,故最終結(jié)果需要除以2。
1.4 源碼解析
object TriangleCount {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Transform the edge data something cheap to shuffle and then canonicalize
//得到的是一個(gè)無自連邊且無重復(fù)邊的、邊是從小id指向大id的圖
val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
// Get the triangle counts
val counters = runPreCanonicalized(canonicalGraph).vertices
// Join them bath with the original graph
graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
optCounter.getOrElse(0)
}
}
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// 構(gòu)建鄰居集合
val nbrSets: VertexRDD[VertexSet] =
// 收集鄰居節(jié)點(diǎn),邊方向?yàn)镋ither,保證點(diǎn)的入邊和出邊連接的鄰居點(diǎn)都會(huì)被收集
graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(nbrs.length)
var i = 0
while (i < nbrs.length) {
// prevent self cycle
if (nbrs(i) != vid) {
set.add(nbrs(i))
}
i += 1
}
set
}
// 更新圖中頂點(diǎn)的屬性為鄰居點(diǎn)集合
val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}
def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
// 在邊上操作源點(diǎn)和終點(diǎn)的鄰居集合是,遍歷較小的集合,加快遍歷速度
val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
(ctx.srcAttr, ctx.dstAttr)
} else {
(ctx.dstAttr, ctx.srcAttr)
}
val iter = smallSet.iterator
var counter: Int = 0
while (iter.hasNext) {
val vid = iter.next()
if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) {
counter += 1
}
}
ctx.sendToSrc(counter)
ctx.sendToDst(counter)
}
// 沿著圖中的邊計(jì)算兩個(gè)頂點(diǎn)的鄰居集合的交集,并為每個(gè)頂點(diǎn)合并消息(消息為三角形個(gè)數(shù))
val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// 算法為每個(gè)三角形計(jì)算了兩次,所以結(jié)果是偶數(shù)
require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.")
dblCount / 2 //注意最后需要除以2,每個(gè)三角形被計(jì)算了兩遍
}
}
}