neo4j與spark 的結(jié)合

image.png

image.png

正常來說 neo4j是用來圖存儲(chǔ)的,neo4j企業(yè)版 的性能遠(yuǎn)遠(yuǎn)高于 社區(qū)版,畢竟是收費(fèi)的,不過 只要下載到就可以使用了,我已經(jīng)用上了,非常棒。
spark 是用來 做 圖計(jì)算的,Graphx,其實(shí) spark 和Neo4j 有交叉點(diǎn),在圖論算法上都可以用上,

我們?cè)谑褂?neo4j 和 spark 結(jié)合的時(shí)候
1.首先 如果你的neo4j 是需要賬號(hào)密碼登錄的話,你就應(yīng)該 在項(xiàng)目中配置一下,
兩三種方式

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.spark.Neo4j
import org.neo4j.spark._
import collection.JavaConversions._
 val spark=SparkSession.builder().appName("play")
.master("local[*]")
.config("spark.neo4j.bolt.url", "bolt://localhost:7687")
                        .config("spark.neo4j.bolt.user", "neo4j")
                        .config("spark.neo4j.bolt.password", "hortmt")
                .getOrCreate()
 import spark.implicits._
 val neo=Neo4j(spark.sparkContext)
//這個(gè)是使用sparkSession配置

下一個(gè)是使用 sparkConf配置

 val conf = new SparkConf().setAppName("neoej")
                .setMaster("local[*]")
                .set("spark.neo4j.bolt.url", "bolt://localhost:7687")
                .set("spark.neo4j.bolt.user", "neo4j")
                .set("spark.neo4j.bolt.password", "hortmt")
        val sc =new SparkContext(conf)

        val neo=Neo4j(sc)

3 另外還有一種是通過 Neo4jConfig 來做配置

val sparkSession = SparkSession.builder()
              .master("local[*]")
                .appName("LoadDataToNeo4j")
                    .getOrCreate();

  val sc = sparkSession.sparkContext

  val config = Neo4jConfig("localhost:","neo4j",Option("root"))
  Neo4j(sc).cypher("CREATE (c:Client {id:1230}) return c").loadRdd
  sparkSession.close()
  1. 我們最常見的就是 把 neo4j 的node 轉(zhuǎn)化為 spark 里的rdd 或者 dataframe 或者 graph ,其中 rdd 又分為 四種 ,普通rdd ,noderdd ,rowrdd,relrdd
    在整這個(gè)的時(shí)候首先 遇到兩個(gè)障礙 ,第一是轉(zhuǎn)化 ,第二是把 圖的屬性 提取出來變成 一個(gè)case class ,

比如我現(xiàn)在有一份 csv格式的 用戶通話記錄,header 里有五個(gè)屬性
{.user,.other,.direction,.duration,.timestamp}) ,大概有 一萬條記錄
首先我先把 這個(gè)加載存儲(chǔ)到 neo4j,注意 這個(gè)csv文件最好放在 neo 安裝根目錄下的import目錄下,否則加載會(huì)出錯(cuò),或者通過 http 下載 加載也可以

參考
https://raw.githubusercontent.com/data-commons/prep-buddy/b2e307f5f5261124ef2a97c4ff9225019804d864/data/calls.csv

https://raw.githubusercontent.com/data-commons/prep-buddy/b2e307f5f5261124ef2a97c4ff9225019804d864/data/calls_with_header.csv

LOAD CSV WITH HEADERS FROM "file:///calls_with_header.csv" AS line  
create (:person {user:line.user, other:line.other, direction:line.direction, duration:line.duration, timestamp:line.timestamp})

然后為這些節(jié)點(diǎn)創(chuàng)建關(guān)系 ,通過 撥打電話 呼叫 被叫 錯(cuò)過 來創(chuàng)建relationship

match (from:person),(to:person) where (from.other=to.user and from.direction="Outgoing")  
merge (from)-[r:rel{direction:from.direction,duration:from.duration,timestamp:from.timestamp}]->(to) 

match (from:person),(to:person) where (from.user=to.other and from.direction="Incoming")  
merge (from)-[r:rel{duration:from.direction,duration:from.duration,timestamp:from.timestamp}]->(to) 

單單創(chuàng)建關(guān)系 企業(yè)版比社區(qū)版至少要快五倍,
現(xiàn)在 Neo 里面有數(shù)據(jù)了 ,我們?nèi)?shù)據(jù)就可以。
大家可以看到,我們的節(jié)點(diǎn)其實(shí)就是一個(gè) person的實(shí)例,那我們?cè)谵D(zhuǎn)化為rdd的時(shí)候 ,我們希望 rdd 其實(shí)wrapper的是person class ,所以我們首先建了一個(gè)case class

case class Person(user: String,other:String,direction:String,duration:String,timestamp:String)

然后我們的 執(zhí)行Neo 的cypher語句

val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n{.user,.other,.direction,.duration,.timestamp}").loadNodeRdds

其實(shí) loadRowRdd 也是可以的

  val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n{.user,.other,.direction,.duration,.timestamp}").loadRowRdd

通過調(diào)試和出錯(cuò) 結(jié)果發(fā)現(xiàn) rdd里row 放的其實(shí)不是person對(duì)象,而是
java.util.Collections.UnmodifiableMap,可笑的是 這個(gè)類還是 一個(gè)私有類,不過通過發(fā)現(xiàn)它繼承了 java的 java.util.Map
我們需要把它強(qiáng)制轉(zhuǎn)化為java 的 map ,注意scala的map 是不可以直接轉(zhuǎn)化的


image.png

我們先是 嘗試了多遍 ,尤其是看輸出

 rawGraphnode.foreach(xd=>{
            val ne=  xd(0).asInstanceOf[java.util.Map[String,String]]
            val pe=  new Person(ne("user"),ne("other"),ne("direction"),ne("duration"),ne("timestamp"))
            println(pe.user+"&&"+pe.other+"&&"+pe.direction+"&&"+pe.duration+"&&"+pe.timestamp)
        })

當(dāng)我們發(fā)現(xiàn) 轉(zhuǎn)化是正常的,然后再轉(zhuǎn)化為 RDD[Person]

 val personRDD:RDD[Person]=  rawGraphnode.map(row=>
        {
            val rawMap= row(0).asInstanceOf[java.util.Map[String,String]]
            val pe=  new Person(rawMap("user"),rawMap("other"),rawMap("direction"),rawMap("duration"),rawMap("timestamp"))
            pe
        }
)

這里面其實(shí)最重要的是 map 轉(zhuǎn)化為case class ,這里面其實(shí)有很多種優(yōu)雅的方式實(shí)現(xiàn) ,不過我用的是最笨的,不過我這個(gè)也不太容易出錯(cuò) 。
大家參考
https://stackoverflow.com/questions/20684572/scala-convert-map-to-case-class

不過 如果大家使用上 as 語法后,其實(shí)就滅有 那么困難了

 val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n.user as user,n.other as other,n.direction as direction,n.duration as duration,n.timestamp as  timestamp").loadRowRdd
 rawGraphnode.take(10).foreach(println(_))
image.png

下面的重點(diǎn)是 把 neo4j的節(jié)點(diǎn) 轉(zhuǎn)化為 Dataframe 和 graph 的嘗試 ,分兩種
第一種可以很好按照 schme 的要求保留具體屬性 來創(chuàng)建 Dataframe

  val schme:StructType=StructType(Seq(
            StructField("user",StringType,nullable = false),
            StructField("other",StringType,nullable = false),
            StructField("direction",StringType,nullable = true),
            StructField("duration",StringType,nullable = true),
            StructField("timestamp",StringType,nullable = true)
        )
        )
val rawGNDF= spark.createDataFrame(rawGraphnode,schme)
        rawGNDF.printSchema()
image.png

第二種等于是根據(jù) case class 的屬性來的,但是 nullable 會(huì)都為true

 val dfs= spark.createDataFrame(personRDD)
        dfs.printSchema()
image.png

關(guān)于 spark sql row 對(duì)象與case class 的轉(zhuǎn)化 ,大家可以看這里
https://stackoverflow.com/questions/28166555/how-to-convert-row-of-a-scala-dataframe-into-case-class-most-efficiently

image.png
image.png

想直接轉(zhuǎn)DataFrame 其實(shí)還是有阻礙的,單個(gè)屬性可以,多個(gè)屬性就報(bào)廢了,schema的語法是 (fieldName,fieldtype),其中type 支持 String double long boolean ,如果是object integer float 他們會(huì)自動(dòng)轉(zhuǎn)化

 val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n{.user,.other,.direction,.duration,.timestamp}")
                .loadDataFrame(schema = ("user","String"),("other","double"),("direction","string"),("duration","integer"),("timestamp","boolean"))

后來我發(fā)現(xiàn) 其實(shí)是可以的,就是 需要再 查詢語句中使用 as 語法,這樣就轉(zhuǎn)化成功了

 val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n.user as user,n.other as other,n.direction as direction,n.duration as duration,n.timestamp as  timestamp")
                .loadDataFrame(schema = ("user","object"),("other","object"),("direction","string"),("duration","String"),("timestamp","String"))


        rawGraphnode.printSchema()
        rawGraphnode.show(10)
image.png
image.png

使用上 as 以后 轉(zhuǎn)化為 rowRDD 也非常方便 省力,唯一的缺點(diǎn)就是 structType 都是 string

 val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n.user as user,n.other as other,n.direction as direction,n.duration as duration,n.timestamp as  timestamp").loadRowRdd
        val dddf=rawGraphnode.first()
        println(dddf.schema)
image.png

之后的轉(zhuǎn)化為case class 也比較順手

  rawGraphnode.foreach(xd=>
        {
            val pe=new Person(xd.getAs[String]("user"),xd.getAs[String]("other"),xd.getAs[String]("direction"),xd.getAs[String]("duration"),xd.getAs("timestamp"))
            println(pe.user+"&&"+pe.other+"&&"+pe.direction+"&&"+pe.duration+"&&"+pe.timestamp)
        })

轉(zhuǎn)化為 RDD版的case Class 也非常方便

 val perRDD:RDD[Person]=rawGraphnode.map(xd=>{
            val pe=new Person(xd.getAs[String]("user"),xd.getAs[String]("other"),xd.getAs[String]("direction"),xd.getAs[String]("duration"),xd.getAs("timestamp"))
            pe
        })
        perRDD.take(10).foreach(println(_))
image.png

然后我們 嘗試造圖

val neo4j: Neo4j = Neo4j(sc=spark.sparkContext).rels(relquery)
        val graph: Graph[Long, String] = neo4j.loadGraph[Long,String]
        println(graph.vertices.count())

打印 11015 ,正確,

推薦一個(gè) GitHub的項(xiàng)目
https://github.com/luhm2017/graphx-analysis

git clone  https://github.com/luhm2017/graphx-analysis.git
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容