Hello GraphX

本文將通過一個(gè)簡單樣例來講解,Spark GraphX中的一些基本概念和常規(guī)操作。

樣例

首先需要在pom中配置GraphX的依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-graphx_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

完整的樣例代碼

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object HelloGraphX {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Hello GraphX")
    val sc = new SparkContext(conf)

    // Create an RDD for the vertices
    val users: RDD[(VertexId, (String, String))] =
      sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
        (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
    // Create an RDD for edges
    val relationships: RDD[Edge[String]] =
      sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
        Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(5L, 0L, "colleague")))
    // Define a default user in case there are relationship with missing user
    val defaultUser = ("John Doe", "Missing")
    // Build the initial Graph
    val graph = Graph(users, relationships, defaultUser)
    // Notice that there is a user 0 (for which we have no information) connected to users 5 (franklin).
    graph.triplets.map(
      triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
    ).collect.foreach(println(_))
    // Remove missing vertices as well as the edges to connected to them
    val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
    // The valid subgraph will disconnect users 5 by removing user 0
    validGraph.vertices.collect.foreach(println(_))
    validGraph.triplets.map(
      triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
    ).collect.foreach(println(_))

    println("Count all users which are prof:"+validGraph.vertices.filter { case (id, (name, pos)) => pos == "prof" }.count)

    println("Count all the edges where src < dst:"+graph.edges.filter(e => e.srcId < e.dstId).count)

    sc.stop()
  }
}

最后會(huì)將結(jié)果輸出到driver的日志中

rxin is the collab of jgonzal
franklin is the advisor of rxin
istoica is the colleague of franklin
franklin is the colleague of John Doe
franklin is the pi of jgonzal

......

istoica is the colleague of franklin
rxin is the collab of jgonzal
franklin is the advisor of rxin
franklin is the pi of jgonzal

......

Count all users which are prof:2

......

Count all the edges where src < dst:3

詳解

  1. 樣例中的第一步是構(gòu)建了一個(gè)圖,這里用到了最簡單的構(gòu)建圖的方式,就是通過Graph的構(gòu)造方法,分別將頂點(diǎn)的RDD和邊的RDD作為參數(shù)傳入。構(gòu)建的關(guān)系圖如下:


由于邊的定義中有Edge(5L, 0L, "colleague"),但是并沒有定義0這個(gè)頂點(diǎn),所以在構(gòu)建圖的時(shí)候會(huì)使用Graph構(gòu)造方法中的第三個(gè)參數(shù),將頂點(diǎn)0的屬性默認(rèn)設(shè)置為("John Doe", "Missing")。

  1. 過濾掉屬性為"Missing"的頂點(diǎn),得到一個(gè)新的圖,然后通過下面的代碼,將這個(gè)圖遍歷一次
    validGraph.vertices.collect.foreach(println(_))
    validGraph.triplets.map(
      triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
    ).collect.foreach(println(_))
  1. 通過graph.verticesgraph.edges可以將一個(gè)圖分別拆分成頂點(diǎn)視圖和邊的視圖,他們的返回值分別是VertexRDDEdgeRDD。所以通過RDD的一些方法,可以進(jìn)一步對(duì)頂點(diǎn)或者邊進(jìn)行過濾。用戶中屬性是pro的只有5和2,所以Count all users which are prof:2;同樣在Edge Table中DstId大于SrcId的有3個(gè)。

總結(jié)

  1. 通過一個(gè)簡單的例子,展示了如何使用GraphX的API構(gòu)建一個(gè)圖,以及基本的操作
  2. 本文測(cè)試環(huán)境是基于HDP-2.6.0.3,文中的樣例參考http://spark.apache.org/docs/1.6.3/graphx-programming-guide.html
  3. 在生產(chǎn)環(huán)境中構(gòu)建圖的數(shù)據(jù)源更多可能是來自某些外部的文件,也就是說需要通過下面這種方式加載一個(gè)圖,這種情況的完整代碼請(qǐng)參加附錄
    // Load my user data and parse into tuples of user id and attribute list
    val users = (sc.textFile("/tmp/users.txt")
      .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

    // Parse the edge data which is already in userId -> userId format
    val followerGraph = GraphLoader.edgeListFile(sc, "/tmp/followers.txt")

附錄

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

object GraphXExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(s"GraphX Example")
    val sc = new SparkContext(conf)

    // Load my user data and parse into tuples of user id and attribute list
    val users = (sc.textFile("/tmp/users.txt")
      .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

    // Parse the edge data which is already in userId -> userId format
    val followerGraph = GraphLoader.edgeListFile(sc, "/tmp/followers.txt")

    // Attach the user attributes
    val graph = followerGraph.outerJoinVertices(users) {
      case (uid, deg, Some(attrList)) => attrList
      // Some users may not have attributes so we set them as empty
      case (uid, deg, None) => Array.empty[String]
    }

    // Restrict the graph to users with usernames and names
    val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

    // Compute the PageRank
    val pagerankGraph = subgraph.pageRank(0.001)

    // Get the attributes of the top pagerank users
    val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
      case (uid, attrList, Some(pr)) => (pr, attrList.toList)
      case (uid, attrList, None) => (0.0, attrList.toList)
    }

    println(s"The result is :${userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")}")

    sc.stop()
  }
}

樣例中有兩個(gè)數(shù)據(jù)文件
users.txt

1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys

followers.txt

2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7

要將這兩個(gè)數(shù)據(jù)文件放到hdfs的/tmp路徑下面。最后會(huì)將結(jié)果輸出到driver的日志中

The result is :(1,(1.453834747463902,List(BarackObama, Barack Obama)))
(2,(1.3857595353443166,List(ladygaga, Goddess of Love)))
(7,(1.2892158818481694,List(odersky, Martin Odersky)))
(3,(0.9936187772892124,List(jeresig, John Resig)))
(6,(0.697916749785472,List(matei_zaharia, Matei Zaharia)))
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容