Spark GraphX概述
GraphX是Spark的一個組件,專門用來表示圖以及進(jìn)行圖的并行計算。GraphX通過重新定義了圖的抽象概念來拓展了RDD:定向多圖,其屬性附加到每個頂點(diǎn)和邊。為了支持圖計算,GraphX公開了一系列基本運(yùn)算符(比如:mapVertices、mapEdges、subgraph)已經(jīng)優(yōu)化后的Pregel API變種。此外,還包含越來越多的圖計算和構(gòu)建器,以簡化圖形分析任務(wù)。GraphX在圖頂點(diǎn)信息和邊信息存儲上做了優(yōu)化,使得圖計算框架性能相對于原生RDD是想較大的提升,接近或達(dá)到GraphLab等專業(yè)圖計算平臺性能。GraphX最大的貢獻(xiàn)是,在Spark之上提供一站式數(shù)據(jù)解決方案,可以方便且高效地完成圖計算的一整套流水作業(yè)。

圖的相關(guān)術(shù)語
圖是一種較線性表和樹更為復(fù)雜的數(shù)據(jù)結(jié)構(gòu),圖表達(dá)的是多對多的關(guān)系。
如下圖所示,G1是一個簡單的圖,其中V1、V2、V3、V4被稱為頂點(diǎn)(Vertex),任意兩個頂點(diǎn)之間的通路被稱為邊(Edge),它可以由(V1,V2)有序?qū)肀硎荆@時稱G1位有向圖,意味著邊是有方向的,若以無序?qū)肀硎疽粭l表,則該圖為無向圖,如G2

在G1中,與頂點(diǎn)相關(guān)聯(lián)的邊的數(shù)量,被稱為頂點(diǎn)的度(Degree)。其中,以頂點(diǎn)為起點(diǎn)的邊的數(shù)量被稱為該訂單的出度(OutDegree),以頂點(diǎn)為終點(diǎn)的邊的數(shù)量被稱為該頂點(diǎn)的入度(InDegree)。
以G1的V1舉例,V1的度是3,啟動出度為2,入度為1。在無向圖G2中,如過任意兩個頂點(diǎn)之間是聯(lián)通的,則稱G2為連通圖(Connected Graph)。在有向圖G1中,如果任意兩個訂單Vm、Vn且m != n,從Vm到Vn以及Vn到Vm之間都存在通路,則稱G1為強(qiáng)連通圖(Strongly Conneted Graph)。如果任意兩個頂點(diǎn)之間若存在通路,則稱為路徑(Path),用一個頂點(diǎn)序列表示,若第一個頂點(diǎn)和最后一個頂點(diǎn)相同,則稱為回路或者環(huán)(Cycle)
圖數(shù)據(jù)庫與圖計算
Neo4j 是一個比較老牌的開源圖數(shù)據(jù)庫,目前在業(yè)界的使用也較為廣泛,它提供了一種簡單易學(xué)的查詢語言 Cypher。Neo4j 支持交互式查詢,查詢效率很高。能夠迅速從整網(wǎng)中找出符合特定模式的子網(wǎng),供隨后分析之用,適用于OLTP 場景。
Neo4j 是圖數(shù)據(jù)庫,偏向于存儲和查詢。能存儲關(guān)聯(lián)關(guān)系比較復(fù)雜,實體之間的連接豐富。比如社交網(wǎng)絡(luò)、知識圖譜、金融風(fēng)控等領(lǐng)域的數(shù)據(jù)。擅長從某個點(diǎn)或某些點(diǎn)出發(fā),根據(jù)特定條件在復(fù)雜的關(guān)聯(lián)關(guān)系中找到目標(biāo)點(diǎn)或邊。如在社交網(wǎng)絡(luò)中找到某個點(diǎn)三步以內(nèi)能認(rèn)識的人,這些人可以認(rèn)為是潛在朋友。數(shù)據(jù)量限定在一定范圍內(nèi),能短時完成的查詢就是所謂的OLTP操作。
Neo4j 查詢與插入速度較快,沒有分布式版本,容量有限,而且一旦圖變得非常大, 如數(shù)十億頂點(diǎn),數(shù)百億邊,查詢速度將變得緩慢。Neo4j 分為社區(qū)版和企業(yè)版,企業(yè)版有一些高級功能,需要授權(quán),價格昂貴。
比較復(fù)雜的分析和算法,如基于圖的聚類,PageRank 算法等,這類計算任務(wù)對于圖數(shù)據(jù)庫來說就很難勝任了,主要由一些圖挖掘技術(shù)來負(fù)責(zé)。
Pregel 是 Google 于 2010 年在 SIGMOD 會議上發(fā)表的《Pregel: A System for Large-Scale Graph Processing》論文中提到的海量并行圖挖掘的抽象框架,Pregel 與 Dremel 一樣,是 Google 新三駕馬車之一,它基于 BSP 模型(Bulk Synchronous Parallel,整體同步并行計算模型),將計算分為若干個超步(super step),在超步內(nèi),通過消息來傳播頂點(diǎn)之間的狀態(tài)。Pregel 可以看成是同步計 算,即等所有頂點(diǎn)完成處理后再進(jìn)行下一輪的超步,Spark 基于 Pregel 論文實現(xiàn)的 海量并行圖挖掘框架 GraphX。
圖計算模式
目前基于圖的并行計算框架已經(jīng)有很多,比如來自Google的Pregel、來自Apache開源的圖計算框架Giraph / HAMA以及最為著名的GraphLab,其中Pregel、HAMA和 Giraph都是非常類似的,都是基于BSP模式。
BSP即整體同步并行,它將計算分成一系列超步的迭代。從縱向上看,它是一個串行模式,而從橫向上看,它是一個并行的模式,每兩個超步之間設(shè)置一個柵欄 (barrier),即整體同步點(diǎn),確定所有并行的計算都完成后再啟動下一輪超步。

每一個超步包含三部分內(nèi)容:
- 計算 cumpute:每一個Processor利用上一個超步傳過來的消息和本地的數(shù)據(jù)進(jìn)行本地計算
- 消息傳遞:每一個Processor計算完畢后,將消息傳遞給與之關(guān)聯(lián)的其他Processors
- 整體同步點(diǎn):用整體同步,確定所有的計算和消息傳遞都進(jìn)行完畢后,進(jìn)入下一個超步
Spark GraphX 基礎(chǔ)
架構(gòu)
存儲模式
核心數(shù)據(jù)結(jié)構(gòu)
GraphX 與 Spark 其他組件相比相對獨(dú)立,擁有自己的核心數(shù)據(jù)結(jié)構(gòu)與算子。
GraphX架構(gòu)

GraphX的整體架構(gòu)可以分為三個部分:
- 算法層:基于Pregel接口實現(xiàn)了常用的圖算法,包括PageRank、SVDPlusPlus、TriangeleCount、ConnectedComponents、StonglyConnectedConponents等算法
- 接口層:在底層RDD基礎(chǔ)之上實現(xiàn)了Pregel模型BSP模式的計算接口
- 底層:圖計算的核心類,包含:VertexRDD、EdgeRDD、RDD[EdgeTriplet]
存儲模式
巨型圖的存儲總體上有邊分割和點(diǎn)分割兩種存儲方式。2013年,GraphLab2.0將其 存儲方式由邊分割變?yōu)辄c(diǎn)分割,在性能上取得重大提升,目前基本上被業(yè)界廣泛接受 并使用。
- 邊分割(Edge-Cut):每個頂點(diǎn)存儲一次,但有的邊會被打斷分到兩臺機(jī)器上。這樣做的好處就是節(jié)省存儲空間;壞處是對圖進(jìn)行基于邊計算時,對于一條兩個頂點(diǎn)被分到不同的機(jī)器上的邊來說,需要跨機(jī)器傳輸數(shù)據(jù),內(nèi)網(wǎng)通信流量大
- 點(diǎn)分割(Vertex-Cut):每條邊只存儲一次,都會出現(xiàn)一臺機(jī)器上,鄰居多的點(diǎn)會被復(fù)制到多臺機(jī)器上,增加了存儲開銷,同時會引發(fā)數(shù)據(jù)同步問題。好處是可以大幅減少內(nèi)網(wǎng)通信量。

雖然兩種方法互有利弊,但現(xiàn)在是點(diǎn)分割占上風(fēng),各種分布式圖計算框架都將自己底層的存儲形式變成了點(diǎn)分割。主要原因有以下兩個:
- 磁盤價格下降,存儲空間不再是問題,而內(nèi)網(wǎng)的通信資源沒有突破性進(jìn)展,集群計算時內(nèi)網(wǎng)帶寬是寶貴的,時間比磁盤更珍貴。這點(diǎn)就類似于常見的空間換時間的策略;
- 在當(dāng)前的應(yīng)用場景中,絕大多數(shù)網(wǎng)絡(luò)都是“無尺度網(wǎng)絡(luò)”,遵循冪律分布,不同點(diǎn)的鄰居數(shù)量相差非常懸殊。而邊分割會使那些多鄰居的點(diǎn)所相連的邊大多數(shù)被分到不同的機(jī)器上,這樣的數(shù)據(jù)分布會使得內(nèi)網(wǎng)帶寬更加捉襟見肘,于是邊分割存儲方式被漸漸拋棄了;
核心數(shù)據(jù)結(jié)構(gòu)
核心數(shù)據(jù)結(jié)構(gòu)包括:graph、vertices、edges、triplets
GraphX API 的開發(fā)語言目前僅支持 Scala。GraphX 的核心數(shù)據(jù)結(jié)構(gòu) Graph 由 RDD 封裝而成。
Graph
GraphX用屬性圖的方法表示圖,頂點(diǎn)有屬性,邊有屬性。存儲結(jié)構(gòu)采用邊集數(shù)據(jù)的形式,即一個頂點(diǎn)表,一個邊表,如下圖所示:

頂點(diǎn)ID是非常重要的字段,他不光是頂點(diǎn)的唯一標(biāo)識符,也是描述邊的唯一手段,訂單表與邊表實際上就是RDD,他們分別是VertexRDD與EdgeRDD。在Spark的源碼中,Graph類如下:

- vertices為頂點(diǎn)表,VD為訂單屬性類型
- edges為邊表,ED為邊屬性類型
- 可以通過Graph的vertices與edges成員直接得到頂點(diǎn)RDD與邊RDD
- 頂點(diǎn)RDD類型是VerticeRDD,繼承自RDD[(VertexId,VD)]
- 邊RDD類型為EdgeRDD,繼承自RDD[Edge[ED]]
vertices
vertices對應(yīng)著名為 VertexRDD 的RDD。這個RDD由頂點(diǎn)id和頂點(diǎn)屬性兩個成員變量。

VertexRDD繼承自 RDD[(VertexId, VD)],這里VertexId表示頂點(diǎn)id,VD表示頂點(diǎn)所 帶的屬性的類別。
VertexId實際上是一個Long類型

edges
edges對應(yīng)著EdgeRDD。這個RDD擁有三個成員變量,分別是源頂點(diǎn)id、目標(biāo)頂點(diǎn)id以及邊屬性。

Edge代表邊,由 源頂點(diǎn)id、目標(biāo)頂點(diǎn)id、以及邊的屬性構(gòu)成。

triplets
triplets 表示邊點(diǎn)三元組,如下圖所示(其中圓柱形分別代表頂點(diǎn)屬性與邊屬性):

通過 triplets 成員,用戶可以直接獲取到起點(diǎn)頂點(diǎn)、起點(diǎn)頂點(diǎn)屬性、終點(diǎn)頂點(diǎn)、終點(diǎn)頂點(diǎn)屬性、邊與邊屬性信息。triplets 的生成可以由邊表與頂點(diǎn)表通過 ScrId 與 DstId 連接而成。
triplets對應(yīng)著EdgeTriplet。它是一個三元組視圖,這個視圖邏輯上將頂點(diǎn)和邊的屬 性保存為一個RDD[EdgeTriplet[VD, ED]]。

Spark GraphX 計算
圖的定義
屬性操作
轉(zhuǎn)換操作
結(jié)構(gòu)操作
關(guān)聯(lián)操作
聚合操作
Pregel API
引入依賴:
<!-- graphx -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
案例一:圖的基本操作

找到 出度=入度 的人員\找出5到各頂點(diǎn)的最短距離 即鏈接操作與聚合操作需要重新聽課
package com.hhb.spark.graphx
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.reflect.ClassTag
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-25 14:25
**/
object GraphXExample1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
//定義頂點(diǎn)的數(shù)據(jù)(訂單,信息)
val vertexArr: Array[(VertexId, (String, Int))] = Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50)))
//定義邊的數(shù)據(jù)(起點(diǎn),終點(diǎn),邊屬性)
val edgeArray: Array[Edge[Int]] = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 6),
Edge(4L, 1L, 1),
Edge(5L, 2L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)
//根據(jù)頂點(diǎn)數(shù)據(jù)生成RDD
val vertexRDD = sc.makeRDD(vertexArr)
//根據(jù)邊數(shù)據(jù)生成RDD
val edgeRDD = sc.makeRDD(edgeArray)
//生成圖信息
val graph: Graph[(String, Int), Int] = Graph.apply(vertexRDD, edgeRDD)
println("=======================找出圖中所有年年齡大于30的頂點(diǎn)============================")
//屬性操作示例,
//找出圖中所有年年齡大于30的頂點(diǎn)
//所有頂點(diǎn)的屬性信息
// val vertices: VertexRDD[(String, Int)] = graph.vertices
// filter(pred: Tuple2[VertexId, VD] => Boolean)
graph.vertices
.filter { case (_, (_, point)) => point > 30 }
.foreach(println)
println("=======================找出圖中屬性大于5的邊============================")
//找出圖中屬性大于5的邊
// val edges: EdgeRDD[Int] = graph.edges
graph.edges
.filter(_.attr > 5)
.foreach(println)
println("=======================列出邊屬性>5的tripltes============================")
//列出邊屬性>5的tripltes
// val triplets: RDD[EdgeTriplet[(String, Int), Int]] = graph.triplets
graph.triplets
.filter(_.attr > 5)
.foreach(println)
//屬性操作,degress操作
//找出圖中最大的出度、入度、度數(shù)
println("=======================找出圖中最大的出度============================")
//所有的
// val degrees: VertexRDD[Int] = graph.outDegrees
val outMaxDegrees: (VertexId, Int) = graph.outDegrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(outMaxDegrees)
println("=======================找出圖中最大的入度============================")
val inMaxDegrees = graph.inDegrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(inMaxDegrees)
println("=======================找出圖中最大的度數(shù)============================")
val maxDegrees: (VertexId, Int) = graph.degrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(maxDegrees)
// 轉(zhuǎn)換操作
println("=======================頂點(diǎn)的轉(zhuǎn)換操作。所有人的年齡加 10============================")
// 頂點(diǎn)的轉(zhuǎn)換操作。所有人的年齡加 10
graph.vertices
.map { case (id, (name, age)) => (id, (name, age + 10)) }
.foreach(println)
println("=======================頂點(diǎn)的轉(zhuǎn)換操作。所有人的年齡加 10============================")
graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)) }.vertices.foreach(println)
// 邊的轉(zhuǎn)換操作。邊的屬性*2
println("=======================邊的轉(zhuǎn)換操作。邊的屬性*2============================")
graph.edges
.map(x => x.attr * 2)
.foreach(println(_))
// 結(jié)構(gòu)操作
// 頂點(diǎn)年齡 > 30 的子圖
// def subgraph(
// epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
// vpred: (VertexId, VD) => Boolean = ((v, d) => true))
// : Graph[VD, ED]
println("=======================頂點(diǎn)年齡 > 30 的子圖============================")
graph.subgraph(vpred = (_, VD) => VD._2 > 30).triplets.foreach(println(_))
//找出出度 == 入度的人員,鏈接操作,思路:圖 + 頂點(diǎn)的出度+頂點(diǎn)的入度
println("=======================找出出度 == 入度的人員============================")
//創(chuàng)建一個新圖,頂點(diǎn)VD的數(shù)據(jù)類型為User,并從graph做類型轉(zhuǎn)換
val g: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0) }
//(2,User(Bob,27,0,0))
//(4,User(David,42,0,0))
//(6,User(Fran,50,0,0))
//(3,User(Charlie,65,0,0))
//(5,User(Ed,55,0,0))
//(1,User(Alice,28,0,0))
// value1.vertices.foreach(println)
// def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
// (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null) : Graph[VD2, ED]
// val degrees: VertexRDD[Int] = graph.inDegrees
//使用新圖關(guān)聯(lián)入度的數(shù)據(jù)
val userGraph = g.outerJoinVertices(graph.inDegrees) {
//
case (id, info, inDe) => User(info.name, info.age, inDe.getOrElse(0), info.outDegrees)
}.outerJoinVertices(graph.outDegrees) {
case (id, info, outDe) => User(info.name, info.age, info.inDegrees, outDe.getOrElse(0))
}
val value: VertexRDD[User] = userGraph.vertices.filter { case (id, u) => u.inDegrees == u.outDegrees }
value.foreach(println(_))
//找出5到各頂點(diǎn)的最短距離
sc.stop()
}
case class User(name: String, age: Int, inDegrees: Int, outDegrees: Int)
}
Pregel API
圖本身是遞歸數(shù)據(jù)結(jié)構(gòu),頂點(diǎn)的屬性依賴于他們鄰居的屬性,這些鄰居屬性有依賴于自己的鄰居屬性。許多重要的圖算法都是迭代的重新計算每個頂點(diǎn)的屬性,直到滿足某個確定的條件。一系列的圖并發(fā)抽象被提出來用來表達(dá)這些迭代算法。
Graphx公開了一個類似Pregel的操作。

vprog:用戶定義的頂點(diǎn)運(yùn)行程序。它作用于每一個頂點(diǎn),負(fù)責(zé)接收進(jìn)來的信息,并計算新的頂點(diǎn)值
sendMsg:發(fā)送消息
mergeMsg:合并消息
案例二:連通圖算法

package com.hhb.spark.graphx
import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @date: 2020-11-25 18:07
**/
object GraphXExample2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init)
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "data/graph.dat")
//(VertexId,info)
//(11,1)
//(4,1)
//(45067,1)
//(431250,1)
//(1,1)
//(6,1)
//(3,1)
//(10,1)
//(7,1)
//(2,1)
//(9,1)
//(9111,1)
//(5,1)
//把文件里面所有的數(shù)據(jù)都默認(rèn)設(shè)置為頂點(diǎn),屬性給個默認(rèn)值1(無用)
graph.vertices.foreach(println(_))
println("===" * 20)
//起點(diǎn),終點(diǎn),邊屬性,邊屬性為默認(rèn)值(無用)
//Edge(4,45067,1)
//Edge(1,431250,1)
//Edge(6,45067,1)
//Edge(7,45067,1)
//Edge(2,431250,1)
//Edge(9,9111,1)
//Edge(3,431250,1)
//Edge(10,9111,1)
//Edge(4,431250,1)
//Edge(11,9111,1)
//Edge(5,45067,1)
//Edge(5,431250,1)
graph.edges.foreach(println(_))
println("===" * 20)
//生成連通圖
//(頂點(diǎn)信息,出始點(diǎn))
(11, 9)
(45067, 1)
(1, 1)
(3, 1)
(7, 1)
(9, 9)
(9111, 9)
(4, 1)
(5, 1)
(431250, 1)
(6, 1)
(10, 9)
(2, 1)
graph.connectedComponents()
.vertices
//按照元組的第二個元素排序,第二個元素就是出始點(diǎn)
.sortBy(_._2)
.foreach(println(_))
sc.stop()
}
}
案例三:尋找相同的用戶合并信息
假設(shè):
假設(shè)有五個不同信息可以作為用戶標(biāo)識,分別為:1X、2X、3X、4X、5X;
每次可以選擇使用若干為字段作為標(biāo)識
部分標(biāo)識可能發(fā)生變化,如:12 => 13 或 24 => 25
根據(jù)以上規(guī)則,判斷以下標(biāo)識是否代表同一用戶:
- 11-21-32、12-22-33 (X)
- 11-21-32、11-21-52 (OK)
- 21-32、11-21-33 (OK)
- 11-21-32、32-48 (OK)
問題:在以下數(shù)據(jù)中,找到同一用戶,合并相同用戶的數(shù)據(jù)
- 對于用戶標(biāo)識(id):合并后去重
- 對于用戶的信息:key相同,合并權(quán)重

package com.hhb.spark.graphx
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-26 16:02
**/
object GraphXExample3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val list = List(
(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)),
(List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龍觀" -> 1.0)),
(List(41L), List("kw$天津" -> 1.0, "area$中關(guān)村" -> 1.0)),
(List(12L, 22L, 33L), List("kw$大數(shù)據(jù)" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
(List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
(List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
)
val dataRDD = sc.makeRDD(list)
//將標(biāo)示信息中的每一個元素抽取出來,作為ID
//這里使用了flatMap將數(shù)據(jù)壓平
//丟掉了標(biāo)簽信息,因為這個RDD是用來構(gòu)造頂點(diǎn),邊,tags信息用不到
//頂點(diǎn)、邊的數(shù)據(jù)要求是long,這個程序修改后我們才能使用
val dotRDD: RDD[(Long, Long)] = dataRDD.flatMap { case (buffer, _) =>
buffer.map(id => (id, buffer.mkString.toLong))
}
// dotRDD.foreach(println(_))
//(41,41)
//(12,122233)
//(22,223444)
//(34,223444)
//(33,3353)
//(44,223444)
//(21,213241)
//(32,213241)
//(41,213241)
//(22,122233)
//(33,122233)
//(11,112131)
//(21,112131)
//(31,112131)
//(53,3353)
//定義訂單的RDD,每個id都是一個訂單
val vertexesRDD = dotRDD.map(x => (x._1, 0))
//定義邊信息
val edgeRDD = dotRDD.map(x => Edge(x._1, x._2, 0))
//定義圖信息
val graph = Graph(vertexesRDD, edgeRDD)
//輸出點(diǎn)所有的信息
// graph.vertices.foreach(println(_))
//(31,0)
//(3353,0)
//(22,0)
//(32,0)
//(21,0)
//(12,0)
//(34,0)
//(11,0)
//(112131,0)
//(44,0)
//(53,0)
//(41,0)
//(223444,0)
//(213241,0)
//(122233,0)
//(33,0)
//使用強(qiáng)連通圖,生成的數(shù)據(jù)可以看見看見,是被分為了兩類
// graph.connectedComponents().vertices.foreach(println(_))
// (22,12)
//(32,11)
//(34,12)
//(21,11)
//(12,12)
//(31,11)
//(11,11)
//(112131,11)
//(3353,12)
//(53,12)
//(41,11)
//(213241,11)
//(122233,12)
//(33,12)
//(44,12)
//(223444,12)
//計算出連通點(diǎn)后,其實就是為了數(shù)據(jù)(112131,11)、(213241,11)、(41,11)、(122233,12)(223444,12)
//這樣就可以上最上面的rdd(dataRDD)將第一個集合的數(shù)據(jù)壓縮成上面的key,就可以確認(rèn)出來哪些數(shù)據(jù)為一條數(shù)據(jù)
val vertices = graph.connectedComponents().vertices
//定義中心點(diǎn)的數(shù)據(jù),即(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)) =》
// (112131,(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)))
//使用這樣的數(shù)據(jù),與vertices 進(jìn)行join,如下
val allInfoRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] = dataRDD.map { case (ids, infos) =>
(ids.mkString.toLong, (ids, infos))
}
//(223444,(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0)))))
//那么就可以通過value的第一個元素進(jìn)行合并數(shù)據(jù)
val joinRDD: RDD[(VertexId, (VertexId, (List[VertexId], List[(String, Double)])))] = vertices.join(allInfoRDD)
// value.foreach(println(_)),可以通過12、12進(jìn)行reduceBy
//(223444,(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0)))))
//(3353,(12,(List(33, 53),List((kw$hive,1.0), (kw$spark,1.0), (area$西二旗,1.0)))))
//(112131,(11,(List(11, 21, 31),List((kw$北京,1.0), (kw$上海,1.0), (area$中關(guān)村,1.0)))))
//(41,(11,(List(41),List((kw$天津,1.0), (area$中關(guān)村,1.0)))))
//(213241,(11,(List(21, 32, 41),List((kw$上海,1.0), (kw$天津,1.0), (area$回龍觀,1.0)))))
//(122233,(12,(List(12, 22, 33),List((kw$大數(shù)據(jù),1.0), (kw$spark,1.0), (area$西二旗,1.0)))))
val mergeInfo = joinRDD.map { case (_, infos) => (infos._1, infos._2) }
//(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0))))
//下面兩個步驟可以合并到一起,shuffle階段直接對數(shù)據(jù)去重、合并
val resultRDD = mergeInfo.reduceByKey { case ((ids, infos), (id, info)) =>
val newIds = ids ++ id
val newInfos = infos ++ info
(newIds, newInfos)
}
val result = resultRDD.mapValues { case (newIds, newInfos) =>
val ids = newIds.distinct
val infos: Map[String, Double] = newInfos.groupBy(_._1).mapValues(list => list.map(_._2).sum)
(ids, infos)
}
result.foreach(println(_))
//(11,(List(41, 21, 32, 11, 31),Map(area$中關(guān)村 -> 2.0, kw$北京 -> 1.0, kw$天津 -> 2.0, kw$上海 -> 2.0, area$回龍觀 -> 1.0)))
//(12,(List(33, 53, 12, 22, 34, 44),Map(kw$大數(shù)據(jù) -> 1.0, kw$spark -> 3.0, area$五道口 -> 1.0, area$西二旗 -> 2.0, kw$hive -> 1.0)))
sc.stop()
}
}

