引子:筆者有一段時(shí)間學(xué)習(xí)使用 spark 圖算法實(shí)現(xiàn) One ID 的工作,看到一篇文章打算翻譯,今天得空可以還債了。下面便是翻譯正文。原文地址https://mapr.com/blog/how-get-started-using-apache-spark-graphx-scala/
這篇博客將幫助你在?MapR 沙箱環(huán)境開始學(xué)習(xí)Scala語(yǔ)言實(shí)現(xiàn)的Apache Spark GraphX的使用方法。GraphX 是圖并行計(jì)算的 Apache Spark 組件,基于圖理論的數(shù)學(xué)分支構(gòu)建。它是在 Spark 核心上的分布式圖計(jì)算處理框架。
圖計(jì)算的概念簡(jiǎn)介
圖是用于表示對(duì)象之間模型關(guān)系的數(shù)學(xué)結(jié)構(gòu)。圖由頂點(diǎn)和連接頂點(diǎn)的邊構(gòu)成。頂點(diǎn)是對(duì)象,而邊是對(duì)象之間的關(guān)系。

有向圖是頂點(diǎn)之間的邊是有方向的。有向圖的例子如 Twitter 上的關(guān)注者。用戶 Bob 關(guān)注了用戶 Carol ,而 Carol 并沒有關(guān)注 Bob。

正則圖是每個(gè)頂點(diǎn)都有相同數(shù)量的邊。正則圖的例子就是 Facebook 的朋友關(guān)系。如果 Bob 是 Carol 的朋友,那么 Carol 也是 Bob 的朋友。
GraphX 屬性圖
GraphX 通過彈性分布式屬性圖擴(kuò)展了 Sprak RDD。
這種屬性圖是一種有向多重圖,它有多條平行的邊。每個(gè)邊和頂點(diǎn)都有用戶定義的屬性。平行的邊允許相同頂點(diǎn)有多種關(guān)系。


軟件
本教程將運(yùn)行在 MapR 沙箱中,它會(huì)包含 Spark 。
你可以下載代碼和數(shù)據(jù)來(lái)運(yùn)行這些例子。鏈接:https://github.com/caroljmcdonald/sparkgraphxexample
啟動(dòng) spark-shell 命令后,這篇博客的例子都會(huì)運(yùn)行在 spark shell 下
你也可以運(yùn)行這些代碼作為一個(gè)獨(dú)立應(yīng)用,更多介紹在Getting Started with Spark on MapR Sandbox
啟動(dòng) Spark 交互式殼程序
登錄到 MapR 沙箱,如Getting Started with Spark on MapR Sandbox介紹,使用用戶 user01 ,密碼是 mapr 。啟動(dòng) spark shell 使用如下命令
$ spark-shell
定義頂點(diǎn)
首先我們將引入 GraphX 包
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
我們定義機(jī)場(chǎng)為頂點(diǎn)。頂點(diǎn)有 id 和 相關(guān)屬性。每個(gè)頂點(diǎn)的構(gòu)成如下
頂點(diǎn) id -> Id [Long]
頂點(diǎn)屬性 -> name[String]
機(jī)場(chǎng)頂點(diǎn)表格

我們使用上面的屬性定義一個(gè) RDD 來(lái)表示頂點(diǎn)
val vertices=Array((1L, ("SFO")),(2L, ("ORD")),(3L,("DFW")))
val vRDD= sc.parallelize(vertices)
vRDD.take(1)
val nowhere = “nowhere"
定義邊
邊是機(jī)場(chǎng)之間的路線。每條邊必須有一個(gè)起點(diǎn),一個(gè)目的地,并且可以有屬性。在我們的例子里,邊的構(gòu)成如下
邊起點(diǎn) id -> src [Long]
邊終點(diǎn) id -> dest [Long]
邊屬性距離 -> distance [Long]
路線的邊表

我們使用上面用于描述邊的屬性定義一個(gè)RDD。邊的RDD數(shù)據(jù)形式如 [src id, dest id, distance]。
val edges = Array(Edge(1L,2L,1800),Edge(2L,3L,800),Edge(3L,1L,1400))
val eRDD= sc.parallelize(edges)?
eRDD.take(2)
創(chuàng)建屬性圖
想要?jiǎng)?chuàng)建一個(gè)圖,你需要有 Vertex RDD, Edge RDD 和 一個(gè)默認(rèn)頂點(diǎn)。
創(chuàng)建屬性圖名為 graph?
val graph = Graph(vRDD,eRDD, nowhere)
graph.vertices.collect.foreach(println)
graph.edges.collect.foreach(println)
1. 這有多少個(gè)飛機(jī)場(chǎng)?
val numairports = graph.numVertices
2. 這有多少路線?
val numroutes = graph.numEdges
3. 哪些線路大于 1000 英里?
graph.edges.filter { case Edge(src,?dst, prop) => prop > 1000?}.collect.foreach(println)
4. 邊三元組類繼承自 Edge 類通過增加 srcAttr 和 dstAttr 成員,各自包含了源和目的屬性。
graph.triplets.take(3).foreach(println)
5. 排序并打印最長(zhǎng)距離路線
graph.triplets.sortBy(_.attr, ascending = false).map(triplet => “Dsitance?“ + triplet.attr.toString + “ from?“ + triplet.srcAttr + “ to?“ + triplet.dstAttr + “.”).collect.foreach(println)
使用 GraphX?分析真正的航班數(shù)據(jù)
場(chǎng)景
我們的數(shù)據(jù)來(lái)自http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time。我們使用 2015 年 1 月的航班信息。對(duì)于每一趟航班,我們有如下的信息。

在這個(gè)場(chǎng)景,我們將會(huì)以航班作為頂點(diǎn),路線作為邊。我們興趣點(diǎn)在可視化航班和路線,并且想要看到將要起飛和到達(dá)的數(shù)量數(shù)據(jù)。
你可以從如下鏈接下載代碼和數(shù)據(jù)去運(yùn)行這些例子。
https://github.com/caroljmcdonald/sparkgraphxexample
登錄 MapR 沙箱,可參考?Getting Started with Spark on MapR Sandbox,使用用戶id user01,密碼 mapr。使用 scp 拷貝樣例數(shù)據(jù)文件 rita2014jan.csv 到你的沙箱 home 文件夾 /user/user01?
啟動(dòng) Spark shell?
$ spark-shell
定義頂點(diǎn)
首先我們將引入 GraphX 軟件包
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
下面我們使用 Scala case class 定義對(duì)應(yīng) csv 數(shù)據(jù)文件的航班模式
case class Flight(dofM:String, dofW:String, carrier:String, tailnum:String, flnum:Int, org_id:Long, origin:String, dest_id:Long, dest:String, crsdeptime:Double, deptime:Double, depdelaymins:Double, crsarrtime:Double, arrtime:Double, arrdelay:Double,crselapsedtime:Double,dist:Int)
下面的函數(shù)從數(shù)據(jù)文件中解釋每一行數(shù)據(jù)到飛行類中。
def parseFlight(str: String): Flight = {
val line = str.split(",")
Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong, line(6), line(7).toLong, line(8), line(9).toDouble, line(10).toDouble, line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble, line(16).toInt)
}
下面我們將載入 csv 文件到彈性分布式數(shù)據(jù)集(RDD)。RDD 有 transformations 和 actions 兩種操作, first() 操作會(huì)返回 RDD 的第一個(gè)元素
val textRDD = sc.textFile("/user/user01/data/rita2014jan.csv”)
val flightsRDD = textRDD.map(parseFlight).cache()
我們定義飛機(jī)場(chǎng)為頂點(diǎn)。頂點(diǎn)具有屬性,每個(gè)頂點(diǎn)的屬性如下:
Airport name?[String]
飛機(jī)場(chǎng)頂點(diǎn)表

我們使用上面的屬性定義一個(gè) RDD 用來(lái)表示頂點(diǎn)
val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct
airports.take(1)
val nowhere = “nowwhere”
val airportMap = airports.map { case ((org_id),name) => (org_id -> name)}.collect.toList.toMap
定義邊
邊是機(jī)場(chǎng)之間的路線。每條邊必須有一個(gè)起點(diǎn),一個(gè)目的地,并且可以有屬性。在我們的例子里,邊的構(gòu)成如下
邊起點(diǎn) id -> src [Long]
邊終點(diǎn) id -> dest [Long]
邊屬性距離 -> distance [Long]
路線的邊表

我們使用上面用于描述邊的屬性定義一個(gè)RDD。邊的RDD數(shù)據(jù)形式如 [src id, dest id, distance]。
val routes = flightsRDD.map(flight => ((flight.org_id, flight.dest_id), flight.dist)).distinctdistinct
routes.take(2)
val edges = routes.map { case((org_id, dest_id),distance) => Edge(org_id.toLong, dest_id.toLong, distance)}
edges.take(1)
創(chuàng)建屬性圖
想要?jiǎng)?chuàng)建一個(gè)圖,你需要有 Vertex RDD, Edge RDD 和 一個(gè)默認(rèn)頂點(diǎn)。
創(chuàng)建屬性圖名為 graph
val graph = Graph(airports, edges, nowhere)?
graph.vertices.take(2)
graph.edges.take(2)
6. 有多少個(gè)飛機(jī)場(chǎng)?
val numairports = graph.numVertices
7. 有多少路線?
val numroutes = graph.numEdges
8. 有多少路線距離大于 1000 英里
graph.edges.filter { case ( Edge(org_id, dest_id, distance)?) => distance > 1000?}.take(3)
9. 邊三元組類繼承自 Edge 類通過增加 srcAttr 和 dstAttr 成員,各自包含了源和目的屬性。
graph.triplets.take(3).foreach(println)
10. 排序并打印最長(zhǎng)距離路線
graph.triplets.sortBy(_.attr, ascending = false).map(triplet => “Dsitance?“ + triplet.attr.toString + “ from?“ + triplet.srcAttr + “ to?“ + triplet.dstAttr + “.”).take(10).foreach(println)
11. 計(jì)算最高度的頂點(diǎn)
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b}
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)airportMap(10397)
12. 哪個(gè)航班收入最高?
val maxIncoming = graph.inDegrees.collect.sortWith(_._2 > _._2).map(x => (airportMap(x._1), x._2)).take(3)
maxIncoming.foreach(println)
val maxout = graph.outDegrees.join(airports).sortBy(_._2._1, ascending = false).take(3)
maxout.foreach(println)
PageRank
另一個(gè) GraphX 運(yùn)算符是PageRank,它基于谷歌的 PageRank 算法。
PageRank 衡量每個(gè)頂點(diǎn)在圖中的重要性,它是通過判斷哪個(gè)頂點(diǎn)有最多的邊。在我們的例子里,我們使用 PageRank 去決定哪個(gè)航班是最重要的,衡量的方式是計(jì)算哪個(gè)航班與其他航班有最多的連接。
我們需要指定一個(gè)容忍誤差,用來(lái)衡量收斂
13. 使用 PageRank 判斷哪個(gè)是最重要的航班?
val ranks = graph.pageRank(0.1).vertices
val temp = ranks.join(airports)
temp.take(1)
val temp2 = temp.sortBy(_._2._1, false)
temp2.take(2)
val impAirports = temp2.map(_._2._2)
impAirports.take(4)
Pregel
很多重要的圖形算法是迭代算法,因?yàn)轫旤c(diǎn)的屬性依賴于它鄰居們的屬性,而鄰居的屬性又依賴它們鄰居的屬性。 Pregel 是一個(gè)迭代圖處理模型,由谷歌開發(fā),它使用頂點(diǎn)之間傳遞的消息進(jìn)行一系列的迭代。GraphX 實(shí)現(xiàn)了類似 Pregel 塊同步消息傳遞 API。
使用 GraphX 實(shí)現(xiàn)的 Pregel ,頂點(diǎn)只能發(fā)送消息給相鄰的頂點(diǎn)。
Pregel 運(yùn)算符會(huì)執(zhí)行一些列的超級(jí)步驟。在每一個(gè)超級(jí)步驟:
· 頂點(diǎn)接收前面一個(gè)超級(jí)步驟的入站消息和
· 計(jì)算每個(gè)頂點(diǎn)屬性的新值
· 發(fā)送消息給下一個(gè)超級(jí)步驟的相鄰頂點(diǎn)
當(dāng)沒有信息保留時(shí),Pregel 操作符會(huì)結(jié)束迭代,并返回最終的圖。

下面的代碼使用 Pregel 用下列公式計(jì)算最便宜的機(jī)票。
50 + distance / 20

想要學(xué)習(xí)更多?
MapR announces Free Complete Apache Spark Training and Developer Certification
Get Certified on Spark with MapR Spark Certification
MapR Certified Spark Developer Study Guide
Programming Guide -Apache Spark Developer Cheat Sheet