通過 Scala 使用 Spark GraphX 入門

引子:筆者有一段時(shí)間學(xué)習(xí)使用 spark 圖算法實(shí)現(xiàn) One ID 的工作,看到一篇文章打算翻譯,今天得空可以還債了。下面便是翻譯正文。原文地址https://mapr.com/blog/how-get-started-using-apache-spark-graphx-scala/

這篇博客將幫助你在?MapR 沙箱環(huán)境開始學(xué)習(xí)Scala語(yǔ)言實(shí)現(xiàn)的Apache Spark GraphX的使用方法。GraphX 是圖并行計(jì)算的 Apache Spark 組件,基于圖理論的數(shù)學(xué)分支構(gòu)建。它是在 Spark 核心上的分布式圖計(jì)算處理框架。

圖計(jì)算的概念簡(jiǎn)介

圖是用于表示對(duì)象之間模型關(guān)系的數(shù)學(xué)結(jié)構(gòu)。圖由頂點(diǎn)和連接頂點(diǎn)的邊構(gòu)成。頂點(diǎn)是對(duì)象,而邊是對(duì)象之間的關(guān)系。

有向圖是頂點(diǎn)之間的邊是有方向的。有向圖的例子如 Twitter 上的關(guān)注者。用戶 Bob 關(guān)注了用戶 Carol ,而 Carol 并沒有關(guān)注 Bob。

正則圖是每個(gè)頂點(diǎn)都有相同數(shù)量的邊。正則圖的例子就是 Facebook 的朋友關(guān)系。如果 Bob 是 Carol 的朋友,那么 Carol 也是 Bob 的朋友。

GraphX 屬性圖

GraphX 通過彈性分布式屬性圖擴(kuò)展了 Sprak RDD。

這種屬性圖是一種有向多重圖,它有多條平行的邊。每個(gè)邊和頂點(diǎn)都有用戶定義的屬性。平行的邊允許相同頂點(diǎn)有多種關(guān)系。



軟件

本教程將運(yùn)行在 MapR 沙箱中,它會(huì)包含 Spark 。

你可以下載代碼和數(shù)據(jù)來(lái)運(yùn)行這些例子。鏈接:https://github.com/caroljmcdonald/sparkgraphxexample

啟動(dòng) spark-shell 命令后,這篇博客的例子都會(huì)運(yùn)行在 spark shell 下

你也可以運(yùn)行這些代碼作為一個(gè)獨(dú)立應(yīng)用,更多介紹在Getting Started with Spark on MapR Sandbox

啟動(dòng) Spark 交互式殼程序

登錄到 MapR 沙箱,如Getting Started with Spark on MapR Sandbox介紹,使用用戶 user01 ,密碼是 mapr 。啟動(dòng) spark shell 使用如下命令

$ spark-shell

定義頂點(diǎn)

首先我們將引入 GraphX 包

import org.apache.spark._

import org.apache.spark.rdd.RDD

import org.apache.spark.graphx._

我們定義機(jī)場(chǎng)為頂點(diǎn)。頂點(diǎn)有 id 和 相關(guān)屬性。每個(gè)頂點(diǎn)的構(gòu)成如下

頂點(diǎn) id -> Id [Long]

頂點(diǎn)屬性 -> name[String]

機(jī)場(chǎng)頂點(diǎn)表格

我們使用上面的屬性定義一個(gè) RDD 來(lái)表示頂點(diǎn)

val vertices=Array((1L, ("SFO")),(2L, ("ORD")),(3L,("DFW")))

val vRDD= sc.parallelize(vertices)

vRDD.take(1)

val nowhere = “nowhere"

定義邊

邊是機(jī)場(chǎng)之間的路線。每條邊必須有一個(gè)起點(diǎn),一個(gè)目的地,并且可以有屬性。在我們的例子里,邊的構(gòu)成如下

邊起點(diǎn) id -> src [Long]

邊終點(diǎn) id -> dest [Long]

邊屬性距離 -> distance [Long]

路線的邊表

我們使用上面用于描述邊的屬性定義一個(gè)RDD。邊的RDD數(shù)據(jù)形式如 [src id, dest id, distance]。

val edges = Array(Edge(1L,2L,1800),Edge(2L,3L,800),Edge(3L,1L,1400))

val eRDD= sc.parallelize(edges)?

eRDD.take(2)

創(chuàng)建屬性圖

想要?jiǎng)?chuàng)建一個(gè)圖,你需要有 Vertex RDD, Edge RDD 和 一個(gè)默認(rèn)頂點(diǎn)。

創(chuàng)建屬性圖名為 graph?

val graph = Graph(vRDD,eRDD, nowhere)

graph.vertices.collect.foreach(println)

graph.edges.collect.foreach(println)

1. 這有多少個(gè)飛機(jī)場(chǎng)?

val numairports = graph.numVertices

2. 這有多少路線?

val numroutes = graph.numEdges

3. 哪些線路大于 1000 英里?

graph.edges.filter { case Edge(src,?dst, prop) => prop > 1000?}.collect.foreach(println)

4. 邊三元組類繼承自 Edge 類通過增加 srcAttr 和 dstAttr 成員,各自包含了源和目的屬性。

graph.triplets.take(3).foreach(println)

5. 排序并打印最長(zhǎng)距離路線

graph.triplets.sortBy(_.attr, ascending = false).map(triplet => “Dsitance?“ + triplet.attr.toString + “ from?“ + triplet.srcAttr + “ to?“ + triplet.dstAttr + “.”).collect.foreach(println)

使用 GraphX?分析真正的航班數(shù)據(jù)

場(chǎng)景

我們的數(shù)據(jù)來(lái)自http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time。我們使用 2015 年 1 月的航班信息。對(duì)于每一趟航班,我們有如下的信息。

在這個(gè)場(chǎng)景,我們將會(huì)以航班作為頂點(diǎn),路線作為邊。我們興趣點(diǎn)在可視化航班和路線,并且想要看到將要起飛和到達(dá)的數(shù)量數(shù)據(jù)。

你可以從如下鏈接下載代碼和數(shù)據(jù)去運(yùn)行這些例子。

https://github.com/caroljmcdonald/sparkgraphxexample

登錄 MapR 沙箱,可參考?Getting Started with Spark on MapR Sandbox,使用用戶id user01,密碼 mapr。使用 scp 拷貝樣例數(shù)據(jù)文件 rita2014jan.csv 到你的沙箱 home 文件夾 /user/user01?

啟動(dòng) Spark shell?

$ spark-shell

定義頂點(diǎn)

首先我們將引入 GraphX 軟件包

import org.apache.spark._

import org.apache.spark.rdd.RDD

import org.apache.spark.util.IntParam

import org.apache.spark.graphx._

import org.apache.spark.graphx.util.GraphGenerators

下面我們使用 Scala case class 定義對(duì)應(yīng) csv 數(shù)據(jù)文件的航班模式

case class Flight(dofM:String, dofW:String, carrier:String, tailnum:String, flnum:Int, org_id:Long, origin:String, dest_id:Long, dest:String, crsdeptime:Double, deptime:Double, depdelaymins:Double, crsarrtime:Double, arrtime:Double, arrdelay:Double,crselapsedtime:Double,dist:Int)

下面的函數(shù)從數(shù)據(jù)文件中解釋每一行數(shù)據(jù)到飛行類中。

def parseFlight(str: String): Flight = {

val line = str.split(",")

Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong, line(6), line(7).toLong, line(8), line(9).toDouble, line(10).toDouble, line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble, line(16).toInt)

}

下面我們將載入 csv 文件到彈性分布式數(shù)據(jù)集(RDD)。RDD 有 transformations 和 actions 兩種操作, first() 操作會(huì)返回 RDD 的第一個(gè)元素

val textRDD = sc.textFile("/user/user01/data/rita2014jan.csv”)

val flightsRDD = textRDD.map(parseFlight).cache()

我們定義飛機(jī)場(chǎng)為頂點(diǎn)。頂點(diǎn)具有屬性,每個(gè)頂點(diǎn)的屬性如下:

Airport name?[String]

飛機(jī)場(chǎng)頂點(diǎn)表

我們使用上面的屬性定義一個(gè) RDD 用來(lái)表示頂點(diǎn)

val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct

airports.take(1)

val nowhere = “nowwhere”

val airportMap = airports.map { case ((org_id),name) => (org_id -> name)}.collect.toList.toMap

定義邊

邊是機(jī)場(chǎng)之間的路線。每條邊必須有一個(gè)起點(diǎn),一個(gè)目的地,并且可以有屬性。在我們的例子里,邊的構(gòu)成如下

邊起點(diǎn) id -> src [Long]

邊終點(diǎn) id -> dest [Long]

邊屬性距離 -> distance [Long]

路線的邊表

我們使用上面用于描述邊的屬性定義一個(gè)RDD。邊的RDD數(shù)據(jù)形式如 [src id, dest id, distance]。

val routes = flightsRDD.map(flight => ((flight.org_id, flight.dest_id), flight.dist)).distinctdistinct

routes.take(2)

val edges = routes.map { case((org_id, dest_id),distance) => Edge(org_id.toLong, dest_id.toLong, distance)}

edges.take(1)

創(chuàng)建屬性圖

想要?jiǎng)?chuàng)建一個(gè)圖,你需要有 Vertex RDD, Edge RDD 和 一個(gè)默認(rèn)頂點(diǎn)。

創(chuàng)建屬性圖名為 graph

val graph = Graph(airports, edges, nowhere)?

graph.vertices.take(2)

graph.edges.take(2)

6. 有多少個(gè)飛機(jī)場(chǎng)?

val numairports = graph.numVertices

7. 有多少路線?

val numroutes = graph.numEdges

8. 有多少路線距離大于 1000 英里

graph.edges.filter { case ( Edge(org_id, dest_id, distance)?) => distance > 1000?}.take(3)

9. 邊三元組類繼承自 Edge 類通過增加 srcAttr 和 dstAttr 成員,各自包含了源和目的屬性。

graph.triplets.take(3).foreach(println)

10. 排序并打印最長(zhǎng)距離路線

graph.triplets.sortBy(_.attr, ascending = false).map(triplet => “Dsitance?“ + triplet.attr.toString + “ from?“ + triplet.srcAttr + “ to?“ + triplet.dstAttr + “.”).take(10).foreach(println)

11. 計(jì)算最高度的頂點(diǎn)

def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b}

val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)

val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)

val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)airportMap(10397)

12. 哪個(gè)航班收入最高?

val maxIncoming = graph.inDegrees.collect.sortWith(_._2 > _._2).map(x => (airportMap(x._1), x._2)).take(3)

maxIncoming.foreach(println)

val maxout = graph.outDegrees.join(airports).sortBy(_._2._1, ascending = false).take(3)

maxout.foreach(println)

PageRank

另一個(gè) GraphX 運(yùn)算符是PageRank,它基于谷歌的 PageRank 算法。

PageRank 衡量每個(gè)頂點(diǎn)在圖中的重要性,它是通過判斷哪個(gè)頂點(diǎn)有最多的邊。在我們的例子里,我們使用 PageRank 去決定哪個(gè)航班是最重要的,衡量的方式是計(jì)算哪個(gè)航班與其他航班有最多的連接。

我們需要指定一個(gè)容忍誤差,用來(lái)衡量收斂

13. 使用 PageRank 判斷哪個(gè)是最重要的航班?

val ranks = graph.pageRank(0.1).vertices

val temp = ranks.join(airports)

temp.take(1)

val temp2 = temp.sortBy(_._2._1, false)

temp2.take(2)

val impAirports = temp2.map(_._2._2)

impAirports.take(4)

Pregel

很多重要的圖形算法是迭代算法,因?yàn)轫旤c(diǎn)的屬性依賴于它鄰居們的屬性,而鄰居的屬性又依賴它們鄰居的屬性。 Pregel 是一個(gè)迭代圖處理模型,由谷歌開發(fā),它使用頂點(diǎn)之間傳遞的消息進(jìn)行一系列的迭代。GraphX 實(shí)現(xiàn)了類似 Pregel 塊同步消息傳遞 API。

使用 GraphX 實(shí)現(xiàn)的 Pregel ,頂點(diǎn)只能發(fā)送消息給相鄰的頂點(diǎn)。

Pregel 運(yùn)算符會(huì)執(zhí)行一些列的超級(jí)步驟。在每一個(gè)超級(jí)步驟:

· 頂點(diǎn)接收前面一個(gè)超級(jí)步驟的入站消息和

· 計(jì)算每個(gè)頂點(diǎn)屬性的新值

· 發(fā)送消息給下一個(gè)超級(jí)步驟的相鄰頂點(diǎn)

當(dāng)沒有信息保留時(shí),Pregel 操作符會(huì)結(jié)束迭代,并返回最終的圖。

下面的代碼使用 Pregel 用下列公式計(jì)算最便宜的機(jī)票。

50 + distance / 20

想要學(xué)習(xí)更多?

GraphX Programming Guide

MapR announces Free Complete Apache Spark Training and Developer Certification

Free Spark On Demand Training

Get Certified on Spark with MapR Spark Certification

MapR Certified Spark Developer Study Guide

Programming Guide -Apache Spark Developer Cheat Sheet

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容