Spark GraphX

Spark GraphX概述

GraphX是Spark的一個組件,專門用來表示圖以及進(jìn)行圖的并行計算。GraphX通過重新定義了圖的抽象概念來拓展了RDD:定向多圖,其屬性附加到每個頂點(diǎn)和邊。為了支持圖計算,GraphX公開了一系列基本運(yùn)算符(比如:mapVertices、mapEdges、subgraph)已經(jīng)優(yōu)化后的Pregel API變種。此外,還包含越來越多的圖計算和構(gòu)建器,以簡化圖形分析任務(wù)。GraphX在圖頂點(diǎn)信息和邊信息存儲上做了優(yōu)化,使得圖計算框架性能相對于原生RDD是想較大的提升,接近或達(dá)到GraphLab等專業(yè)圖計算平臺性能。GraphX最大的貢獻(xiàn)是,在Spark之上提供一站式數(shù)據(jù)解決方案,可以方便且高效地完成圖計算的一整套流水作業(yè)。


Spark GraphX概述.png

圖的相關(guān)術(shù)語

圖是一種較線性表和樹更為復(fù)雜的數(shù)據(jù)結(jié)構(gòu),圖表達(dá)的是多對多的關(guān)系。

如下圖所示,G1是一個簡單的圖,其中V1、V2、V3、V4被稱為頂點(diǎn)(Vertex),任意兩個頂點(diǎn)之間的通路被稱為邊(Edge),它可以由(V1,V2)有序?qū)肀硎荆@時稱G1位有向圖,意味著邊是有方向的,若以無序?qū)肀硎疽粭l表,則該圖為無向圖,如G2

圖的相關(guān)術(shù)語.png

在G1中,與頂點(diǎn)相關(guān)聯(lián)的邊的數(shù)量,被稱為頂點(diǎn)的度(Degree)。其中,以頂點(diǎn)為起點(diǎn)的邊的數(shù)量被稱為該訂單的出度(OutDegree),以頂點(diǎn)為終點(diǎn)的邊的數(shù)量被稱為該頂點(diǎn)的入度(InDegree)。

以G1的V1舉例,V1的度是3,啟動出度為2,入度為1。在無向圖G2中,如過任意兩個頂點(diǎn)之間是聯(lián)通的,則稱G2為連通圖(Connected Graph)。在有向圖G1中,如果任意兩個訂單Vm、Vn且m != n,從Vm到Vn以及Vn到Vm之間都存在通路,則稱G1為強(qiáng)連通圖(Strongly Conneted Graph)。如果任意兩個頂點(diǎn)之間若存在通路,則稱為路徑(Path),用一個頂點(diǎn)序列表示,若第一個頂點(diǎn)和最后一個頂點(diǎn)相同,則稱為回路或者環(huán)(Cycle)

圖數(shù)據(jù)庫與圖計算

Neo4j 是一個比較老牌的開源圖數(shù)據(jù)庫,目前在業(yè)界的使用也較為廣泛,它提供了一種簡單易學(xué)的查詢語言 Cypher。Neo4j 支持交互式查詢,查詢效率很高。能夠迅速從整網(wǎng)中找出符合特定模式的子網(wǎng),供隨后分析之用,適用于OLTP 場景。

Neo4j 是圖數(shù)據(jù)庫,偏向于存儲和查詢。能存儲關(guān)聯(lián)關(guān)系比較復(fù)雜,實體之間的連接豐富。比如社交網(wǎng)絡(luò)、知識圖譜、金融風(fēng)控等領(lǐng)域的數(shù)據(jù)。擅長從某個點(diǎn)或某些點(diǎn)出發(fā),根據(jù)特定條件在復(fù)雜的關(guān)聯(lián)關(guān)系中找到目標(biāo)點(diǎn)或邊。如在社交網(wǎng)絡(luò)中找到某個點(diǎn)三步以內(nèi)能認(rèn)識的人,這些人可以認(rèn)為是潛在朋友。數(shù)據(jù)量限定在一定范圍內(nèi),能短時完成的查詢就是所謂的OLTP操作。

Neo4j 查詢與插入速度較快,沒有分布式版本,容量有限,而且一旦圖變得非常大, 如數(shù)十億頂點(diǎn),數(shù)百億邊,查詢速度將變得緩慢。Neo4j 分為社區(qū)版和企業(yè)版,企業(yè)版有一些高級功能,需要授權(quán),價格昂貴。

比較復(fù)雜的分析和算法,如基于圖的聚類,PageRank 算法等,這類計算任務(wù)對于圖數(shù)據(jù)庫來說就很難勝任了,主要由一些圖挖掘技術(shù)來負(fù)責(zé)。

Pregel 是 Google 于 2010 年在 SIGMOD 會議上發(fā)表的《Pregel: A System for Large-Scale Graph Processing》論文中提到的海量并行圖挖掘的抽象框架,Pregel 與 Dremel 一樣,是 Google 新三駕馬車之一,它基于 BSP 模型(Bulk Synchronous Parallel,整體同步并行計算模型),將計算分為若干個超步(super step),在超步內(nèi),通過消息來傳播頂點(diǎn)之間的狀態(tài)。Pregel 可以看成是同步計 算,即等所有頂點(diǎn)完成處理后再進(jìn)行下一輪的超步,Spark 基于 Pregel 論文實現(xiàn)的 海量并行圖挖掘框架 GraphX。

圖計算模式

目前基于圖的并行計算框架已經(jīng)有很多,比如來自Google的Pregel、來自Apache開源的圖計算框架Giraph / HAMA以及最為著名的GraphLab,其中Pregel、HAMA和 Giraph都是非常類似的,都是基于BSP模式。

BSP即整體同步并行,它將計算分成一系列超步的迭代。從縱向上看,它是一個串行模式,而從橫向上看,它是一個并行的模式,每兩個超步之間設(shè)置一個柵欄 (barrier),即整體同步點(diǎn),確定所有并行的計算都完成后再啟動下一輪超步。

圖計算模式.png

每一個超步包含三部分內(nèi)容:

  • 計算 cumpute:每一個Processor利用上一個超步傳過來的消息和本地的數(shù)據(jù)進(jìn)行本地計算
  • 消息傳遞:每一個Processor計算完畢后,將消息傳遞給與之關(guān)聯(lián)的其他Processors
  • 整體同步點(diǎn):用整體同步,確定所有的計算和消息傳遞都進(jìn)行完畢后,進(jìn)入下一個超步

Spark GraphX 基礎(chǔ)

架構(gòu)
存儲模式
核心數(shù)據(jù)結(jié)構(gòu)

GraphX 與 Spark 其他組件相比相對獨(dú)立,擁有自己的核心數(shù)據(jù)結(jié)構(gòu)與算子。

GraphX架構(gòu)

GraphX架構(gòu).png

GraphX的整體架構(gòu)可以分為三個部分:

  • 算法層:基于Pregel接口實現(xiàn)了常用的圖算法,包括PageRank、SVDPlusPlus、TriangeleCount、ConnectedComponents、StonglyConnectedConponents等算法
  • 接口層:在底層RDD基礎(chǔ)之上實現(xiàn)了Pregel模型BSP模式的計算接口
  • 底層:圖計算的核心類,包含:VertexRDD、EdgeRDD、RDD[EdgeTriplet]

存儲模式

巨型圖的存儲總體上有邊分割和點(diǎn)分割兩種存儲方式。2013年,GraphLab2.0將其 存儲方式由邊分割變?yōu)辄c(diǎn)分割,在性能上取得重大提升,目前基本上被業(yè)界廣泛接受 并使用。

  • 邊分割(Edge-Cut):每個頂點(diǎn)存儲一次,但有的邊會被打斷分到兩臺機(jī)器上。這樣做的好處就是節(jié)省存儲空間;壞處是對圖進(jìn)行基于邊計算時,對于一條兩個頂點(diǎn)被分到不同的機(jī)器上的邊來說,需要跨機(jī)器傳輸數(shù)據(jù),內(nèi)網(wǎng)通信流量大
  • 點(diǎn)分割(Vertex-Cut):每條邊只存儲一次,都會出現(xiàn)一臺機(jī)器上,鄰居多的點(diǎn)會被復(fù)制到多臺機(jī)器上,增加了存儲開銷,同時會引發(fā)數(shù)據(jù)同步問題。好處是可以大幅減少內(nèi)網(wǎng)通信量。

存儲模式.png

雖然兩種方法互有利弊,但現(xiàn)在是點(diǎn)分割占上風(fēng),各種分布式圖計算框架都將自己底層的存儲形式變成了點(diǎn)分割。主要原因有以下兩個:

  • 磁盤價格下降,存儲空間不再是問題,而內(nèi)網(wǎng)的通信資源沒有突破性進(jìn)展,集群計算時內(nèi)網(wǎng)帶寬是寶貴的,時間比磁盤更珍貴。這點(diǎn)就類似于常見的空間換時間的策略;
  • 在當(dāng)前的應(yīng)用場景中,絕大多數(shù)網(wǎng)絡(luò)都是“無尺度網(wǎng)絡(luò)”,遵循冪律分布,不同點(diǎn)的鄰居數(shù)量相差非常懸殊。而邊分割會使那些多鄰居的點(diǎn)所相連的邊大多數(shù)被分到不同的機(jī)器上,這樣的數(shù)據(jù)分布會使得內(nèi)網(wǎng)帶寬更加捉襟見肘,于是邊分割存儲方式被漸漸拋棄了;

核心數(shù)據(jù)結(jié)構(gòu)

核心數(shù)據(jù)結(jié)構(gòu)包括:graph、vertices、edges、triplets

GraphX API 的開發(fā)語言目前僅支持 Scala。GraphX 的核心數(shù)據(jù)結(jié)構(gòu) Graph 由 RDD 封裝而成。

Graph

GraphX用屬性圖的方法表示圖,頂點(diǎn)有屬性,邊有屬性。存儲結(jié)構(gòu)采用邊集數(shù)據(jù)的形式,即一個頂點(diǎn)表,一個邊表,如下圖所示:

Graph.png

頂點(diǎn)ID是非常重要的字段,他不光是頂點(diǎn)的唯一標(biāo)識符,也是描述邊的唯一手段,訂單表與邊表實際上就是RDD,他們分別是VertexRDD與EdgeRDD。在Spark的源碼中,Graph類如下:

Graph2.png
  • vertices為頂點(diǎn)表,VD為訂單屬性類型
  • edges為邊表,ED為邊屬性類型
  • 可以通過Graph的vertices與edges成員直接得到頂點(diǎn)RDD與邊RDD
  • 頂點(diǎn)RDD類型是VerticeRDD,繼承自RDD[(VertexId,VD)]
  • 邊RDD類型為EdgeRDD,繼承自RDD[Edge[ED]]
vertices

vertices對應(yīng)著名為 VertexRDD 的RDD。這個RDD由頂點(diǎn)id和頂點(diǎn)屬性兩個成員變量。

vertices.png

VertexRDD繼承自 RDD[(VertexId, VD)],這里VertexId表示頂點(diǎn)id,VD表示頂點(diǎn)所 帶的屬性的類別。

VertexId實際上是一個Long類型

vertices2.png
edges

edges對應(yīng)著EdgeRDD。這個RDD擁有三個成員變量,分別是源頂點(diǎn)id、目標(biāo)頂點(diǎn)id以及邊屬性。

edges.png

Edge代表邊,由 源頂點(diǎn)id、目標(biāo)頂點(diǎn)id、以及邊的屬性構(gòu)成。

edges2.png
triplets

triplets 表示邊點(diǎn)三元組,如下圖所示(其中圓柱形分別代表頂點(diǎn)屬性與邊屬性):

triplets.png

通過 triplets 成員,用戶可以直接獲取到起點(diǎn)頂點(diǎn)、起點(diǎn)頂點(diǎn)屬性、終點(diǎn)頂點(diǎn)、終點(diǎn)頂點(diǎn)屬性、邊與邊屬性信息。triplets 的生成可以由邊表與頂點(diǎn)表通過 ScrId 與 DstId 連接而成。

triplets對應(yīng)著EdgeTriplet。它是一個三元組視圖,這個視圖邏輯上將頂點(diǎn)和邊的屬 性保存為一個RDD[EdgeTriplet[VD, ED]]。

triplets2.png

Spark GraphX 計算

圖的定義
屬性操作
轉(zhuǎn)換操作
結(jié)構(gòu)操作
關(guān)聯(lián)操作
聚合操作
Pregel API

引入依賴:

<!-- graphx -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-graphx_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

案例一:圖的基本操作

Spark GraphX 計算.png

找到 出度=入度 的人員\找出5到各頂點(diǎn)的最短距離 即鏈接操作與聚合操作需要重新聽課

package com.hhb.spark.graphx

import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.reflect.ClassTag

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-25 14:25
 **/
object GraphXExample1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)

    sc.setLogLevel("warn")
    //定義頂點(diǎn)的數(shù)據(jù)(訂單,信息)
    val vertexArr: Array[(VertexId, (String, Int))] = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50)))


    //定義邊的數(shù)據(jù)(起點(diǎn),終點(diǎn),邊屬性)
    val edgeArray: Array[Edge[Int]] = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 6),
      Edge(4L, 1L, 1),
      Edge(5L, 2L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )

    //根據(jù)頂點(diǎn)數(shù)據(jù)生成RDD
    val vertexRDD = sc.makeRDD(vertexArr)
    //根據(jù)邊數(shù)據(jù)生成RDD
    val edgeRDD = sc.makeRDD(edgeArray)

    //生成圖信息
    val graph: Graph[(String, Int), Int] = Graph.apply(vertexRDD, edgeRDD)
    println("=======================找出圖中所有年年齡大于30的頂點(diǎn)============================")
    //屬性操作示例,
    //找出圖中所有年年齡大于30的頂點(diǎn)
    //所有頂點(diǎn)的屬性信息
    // val vertices: VertexRDD[(String, Int)] = graph.vertices
    // filter(pred: Tuple2[VertexId, VD] => Boolean)
    graph.vertices
      .filter { case (_, (_, point)) => point > 30 }
      .foreach(println)
    println("=======================找出圖中屬性大于5的邊============================")
    //找出圖中屬性大于5的邊
    //    val edges: EdgeRDD[Int] = graph.edges
    graph.edges
      .filter(_.attr > 5)
      .foreach(println)
    println("=======================列出邊屬性>5的tripltes============================")
    //列出邊屬性>5的tripltes
    //    val triplets: RDD[EdgeTriplet[(String, Int), Int]] = graph.triplets
    graph.triplets
      .filter(_.attr > 5)
      .foreach(println)
    //屬性操作,degress操作
    //找出圖中最大的出度、入度、度數(shù)
    println("=======================找出圖中最大的出度============================")
    //所有的
    //    val degrees: VertexRDD[Int] = graph.outDegrees
    val outMaxDegrees: (VertexId, Int) = graph.outDegrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(outMaxDegrees)
    println("=======================找出圖中最大的入度============================")
    val inMaxDegrees = graph.inDegrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(inMaxDegrees)
    println("=======================找出圖中最大的度數(shù)============================")
    val maxDegrees: (VertexId, Int) = graph.degrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(maxDegrees)
    // 轉(zhuǎn)換操作
    println("=======================頂點(diǎn)的轉(zhuǎn)換操作。所有人的年齡加 10============================")
    // 頂點(diǎn)的轉(zhuǎn)換操作。所有人的年齡加 10
    graph.vertices
      .map { case (id, (name, age)) => (id, (name, age + 10)) }
      .foreach(println)
    println("=======================頂點(diǎn)的轉(zhuǎn)換操作。所有人的年齡加 10============================")
    graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)) }.vertices.foreach(println)

    // 邊的轉(zhuǎn)換操作。邊的屬性*2
    println("=======================邊的轉(zhuǎn)換操作。邊的屬性*2============================")
    graph.edges
      .map(x => x.attr * 2)
      .foreach(println(_))
    // 結(jié)構(gòu)操作
    // 頂點(diǎn)年齡 > 30 的子圖
    //    def subgraph(
    //                  epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
    //                  vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    //    : Graph[VD, ED]
    println("=======================頂點(diǎn)年齡 > 30 的子圖============================")
    graph.subgraph(vpred = (_, VD) => VD._2 > 30).triplets.foreach(println(_))


    //找出出度 == 入度的人員,鏈接操作,思路:圖 + 頂點(diǎn)的出度+頂點(diǎn)的入度
    println("=======================找出出度 == 入度的人員============================")
    //創(chuàng)建一個新圖,頂點(diǎn)VD的數(shù)據(jù)類型為User,并從graph做類型轉(zhuǎn)換
    val g: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0) }
    //(2,User(Bob,27,0,0))
    //(4,User(David,42,0,0))
    //(6,User(Fran,50,0,0))
    //(3,User(Charlie,65,0,0))
    //(5,User(Ed,55,0,0))
    //(1,User(Alice,28,0,0))
    //    value1.vertices.foreach(println)


    // def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
    //  (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null) : Graph[VD2, ED]
    //    val degrees: VertexRDD[Int] = graph.inDegrees
    //使用新圖關(guān)聯(lián)入度的數(shù)據(jù)
    val userGraph = g.outerJoinVertices(graph.inDegrees) {
      //
      case (id, info, inDe) => User(info.name, info.age, inDe.getOrElse(0), info.outDegrees)
    }.outerJoinVertices(graph.outDegrees) {
      case (id, info, outDe) => User(info.name, info.age, info.inDegrees, outDe.getOrElse(0))
    }
    val value: VertexRDD[User] = userGraph.vertices.filter { case (id, u) => u.inDegrees == u.outDegrees }
    value.foreach(println(_))


    //找出5到各頂點(diǎn)的最短距離


    sc.stop()
  }
  case class User(name: String, age: Int, inDegrees: Int, outDegrees: Int)
}
Pregel API

圖本身是遞歸數(shù)據(jù)結(jié)構(gòu),頂點(diǎn)的屬性依賴于他們鄰居的屬性,這些鄰居屬性有依賴于自己的鄰居屬性。許多重要的圖算法都是迭代的重新計算每個頂點(diǎn)的屬性,直到滿足某個確定的條件。一系列的圖并發(fā)抽象被提出來用來表達(dá)這些迭代算法。

Graphx公開了一個類似Pregel的操作。

Pregel API.png
  • vprog:用戶定義的頂點(diǎn)運(yùn)行程序。它作用于每一個頂點(diǎn),負(fù)責(zé)接收進(jìn)來的信息,并計算新的頂點(diǎn)值

  • sendMsg:發(fā)送消息

  • mergeMsg:合并消息

案例二:連通圖算法

連通圖算法.png
package com.hhb.spark.graphx

import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @date: 2020-11-25 18:07
 **/
object GraphXExample2 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "data/graph.dat")
    //(VertexId,info)
    //(11,1)
    //(4,1)
    //(45067,1)
    //(431250,1)
    //(1,1)
    //(6,1)
    //(3,1)
    //(10,1)
    //(7,1)
    //(2,1)
    //(9,1)
    //(9111,1)
    //(5,1)
    //把文件里面所有的數(shù)據(jù)都默認(rèn)設(shè)置為頂點(diǎn),屬性給個默認(rèn)值1(無用)
    graph.vertices.foreach(println(_))
    println("===" * 20)
    //起點(diǎn),終點(diǎn),邊屬性,邊屬性為默認(rèn)值(無用)
    //Edge(4,45067,1)
    //Edge(1,431250,1)
    //Edge(6,45067,1)
    //Edge(7,45067,1)
    //Edge(2,431250,1)
    //Edge(9,9111,1)
    //Edge(3,431250,1)
    //Edge(10,9111,1)
    //Edge(4,431250,1)
    //Edge(11,9111,1)
    //Edge(5,45067,1)
    //Edge(5,431250,1)
    graph.edges.foreach(println(_))
    println("===" * 20)
    //生成連通圖
    //(頂點(diǎn)信息,出始點(diǎn))
    (11, 9)
    (45067, 1)
    (1, 1)
    (3, 1)
    (7, 1)
    (9, 9)
    (9111, 9)
    (4, 1)
    (5, 1)
    (431250, 1)
    (6, 1)
    (10, 9)
    (2, 1)
    graph.connectedComponents()
      .vertices
      //按照元組的第二個元素排序,第二個元素就是出始點(diǎn)
      .sortBy(_._2)
      .foreach(println(_))
    sc.stop()
  }
}

案例三:尋找相同的用戶合并信息

假設(shè):

  • 假設(shè)有五個不同信息可以作為用戶標(biāo)識,分別為:1X、2X、3X、4X、5X;

  • 每次可以選擇使用若干為字段作為標(biāo)識

  • 部分標(biāo)識可能發(fā)生變化,如:12 => 13 或 24 => 25

根據(jù)以上規(guī)則,判斷以下標(biāo)識是否代表同一用戶:

  • 11-21-32、12-22-33 (X)
  • 11-21-32、11-21-52 (OK)
  • 21-32、11-21-33 (OK)
  • 11-21-32、32-48 (OK)

問題:在以下數(shù)據(jù)中,找到同一用戶,合并相同用戶的數(shù)據(jù)

  • 對于用戶標(biāo)識(id):合并后去重
  • 對于用戶的信息:key相同,合并權(quán)重
統(tǒng)一用戶識別.png
package com.hhb.spark.graphx

import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-26 16:02
 **/
object GraphXExample3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    val list = List(
      (List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)),
      (List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龍觀" -> 1.0)),
      (List(41L), List("kw$天津" -> 1.0, "area$中關(guān)村" -> 1.0)),
      (List(12L, 22L, 33L), List("kw$大數(shù)據(jù)" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
      (List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
      (List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
    )

    val dataRDD = sc.makeRDD(list)

    //將標(biāo)示信息中的每一個元素抽取出來,作為ID
    //這里使用了flatMap將數(shù)據(jù)壓平
    //丟掉了標(biāo)簽信息,因為這個RDD是用來構(gòu)造頂點(diǎn),邊,tags信息用不到
    //頂點(diǎn)、邊的數(shù)據(jù)要求是long,這個程序修改后我們才能使用
    val dotRDD: RDD[(Long, Long)] = dataRDD.flatMap { case (buffer, _) =>
      buffer.map(id => (id, buffer.mkString.toLong))
    }
    //    dotRDD.foreach(println(_))
    //(41,41)
    //(12,122233)
    //(22,223444)
    //(34,223444)
    //(33,3353)
    //(44,223444)
    //(21,213241)
    //(32,213241)
    //(41,213241)
    //(22,122233)
    //(33,122233)
    //(11,112131)
    //(21,112131)
    //(31,112131)
    //(53,3353)

    //定義訂單的RDD,每個id都是一個訂單
    val vertexesRDD = dotRDD.map(x => (x._1, 0))
    //定義邊信息
    val edgeRDD = dotRDD.map(x => Edge(x._1, x._2, 0))

    //定義圖信息
    val graph = Graph(vertexesRDD, edgeRDD)
    //輸出點(diǎn)所有的信息
    // graph.vertices.foreach(println(_))
    //(31,0)
    //(3353,0)
    //(22,0)
    //(32,0)
    //(21,0)
    //(12,0)
    //(34,0)
    //(11,0)
    //(112131,0)
    //(44,0)
    //(53,0)
    //(41,0)
    //(223444,0)
    //(213241,0)
    //(122233,0)
    //(33,0)
    //使用強(qiáng)連通圖,生成的數(shù)據(jù)可以看見看見,是被分為了兩類
    //    graph.connectedComponents().vertices.foreach(println(_))
    // (22,12)
    //(32,11)
    //(34,12)
    //(21,11)
    //(12,12)
    //(31,11)
    //(11,11)
    //(112131,11)
    //(3353,12)
    //(53,12)
    //(41,11)
    //(213241,11)
    //(122233,12)
    //(33,12)
    //(44,12)
    //(223444,12)
    //計算出連通點(diǎn)后,其實就是為了數(shù)據(jù)(112131,11)、(213241,11)、(41,11)、(122233,12)(223444,12)
    //這樣就可以上最上面的rdd(dataRDD)將第一個集合的數(shù)據(jù)壓縮成上面的key,就可以確認(rèn)出來哪些數(shù)據(jù)為一條數(shù)據(jù)
    val vertices = graph.connectedComponents().vertices

    //定義中心點(diǎn)的數(shù)據(jù),即(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)) =》
    // (112131,(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)))
    //使用這樣的數(shù)據(jù),與vertices 進(jìn)行join,如下
    val allInfoRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] = dataRDD.map { case (ids, infos) =>
      (ids.mkString.toLong, (ids, infos))
    }
    //(223444,(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0)))))
    //那么就可以通過value的第一個元素進(jìn)行合并數(shù)據(jù)
    val joinRDD: RDD[(VertexId, (VertexId, (List[VertexId], List[(String, Double)])))] = vertices.join(allInfoRDD)
    //  value.foreach(println(_)),可以通過12、12進(jìn)行reduceBy
    //(223444,(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0)))))
    //(3353,(12,(List(33, 53),List((kw$hive,1.0), (kw$spark,1.0), (area$西二旗,1.0)))))
    //(112131,(11,(List(11, 21, 31),List((kw$北京,1.0), (kw$上海,1.0), (area$中關(guān)村,1.0)))))
    //(41,(11,(List(41),List((kw$天津,1.0), (area$中關(guān)村,1.0)))))
    //(213241,(11,(List(21, 32, 41),List((kw$上海,1.0), (kw$天津,1.0), (area$回龍觀,1.0)))))
    //(122233,(12,(List(12, 22, 33),List((kw$大數(shù)據(jù),1.0), (kw$spark,1.0), (area$西二旗,1.0)))))


    val mergeInfo = joinRDD.map { case (_, infos) => (infos._1, infos._2) }
    //(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0))))


    //下面兩個步驟可以合并到一起,shuffle階段直接對數(shù)據(jù)去重、合并
    val resultRDD = mergeInfo.reduceByKey { case ((ids, infos), (id, info)) =>
      val newIds = ids ++ id
      val newInfos = infos ++ info
      (newIds, newInfos)
    }
    val result = resultRDD.mapValues { case (newIds, newInfos) =>
      val ids = newIds.distinct
      val infos: Map[String, Double] = newInfos.groupBy(_._1).mapValues(list => list.map(_._2).sum)
      (ids, infos)
    }

    result.foreach(println(_))
    //(11,(List(41, 21, 32, 11, 31),Map(area$中關(guān)村 -> 2.0, kw$北京 -> 1.0, kw$天津 -> 2.0, kw$上海 -> 2.0, area$回龍觀 -> 1.0)))
    //(12,(List(33, 53, 12, 22, 34, 44),Map(kw$大數(shù)據(jù) -> 1.0, kw$spark -> 3.0, area$五道口 -> 1.0, area$西二旗 -> 2.0, kw$hive -> 1.0)))

    sc.stop()

  }

}
spark下任務(wù)三錯題1.png
spark下任務(wù)三錯題2.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 寫在前面 態(tài)度決定高度!讓優(yōu)秀成為一種習(xí)慣! 世界上沒有什么事兒是加一次班解決不了的,如果有,就加兩次?。? - ...
    夜盡天明時閱讀 36,424評論 10 33
  • 【轉(zhuǎn)載】原文地址:原文地址 概述 ??GraphX是Spark中用于圖和圖計算的組件,GraphX通過擴(kuò)展Spar...
    木亦汐閱讀 4,679評論 0 1
  • 網(wǎng)上graphx實現(xiàn)最短路徑的代碼比較多,但是都是scala版本,java版本的實現(xiàn)很少。1.創(chuàng)建圖數(shù)據(jù)使用的方法...
    寒江老翁閱讀 978評論 0 1
  • 引子:筆者有一段時間學(xué)習(xí)使用 spark 圖算法實現(xiàn) One ID 的工作,看到一篇文章打算翻譯,今天得空可以還債...
    _糖sir_閱讀 4,272評論 0 2
  • Spark GraphX GraphX簡介 主要特點(diǎn) 演化過程 應(yīng)用場景 分布式圖計算處理技術(shù)介紹 下面分別從圖數(shù)...
    raincoffee閱讀 1,408評論 0 0

友情鏈接更多精彩內(nèi)容