hugegraph 整合 spark+graphX

當(dāng)數(shù)據(jù)量太大的時(shí)候,對(duì)hugegraph 進(jìn)行一些統(tǒng)計(jì)查詢(xún)或者算法遍歷的時(shí)候,經(jīng)常會(huì)超時(shí),或者時(shí)間很久。
這個(gè)時(shí)候需要借助與大數(shù)據(jù)相關(guān)的技術(shù)。hugegraph 之前提供了一個(gè)hugegraph-spark的組件,
但是變態(tài)的是,目前該組件已經(jīng)商業(yè)化,還好其團(tuán)隊(duì)在git上大概介紹了實(shí)現(xiàn)的思路。

  • 使用hugegraph 的切片查詢(xún)功能。 因?yàn)榈讓邮褂胏assandra,可以先查出整個(gè)集群有多少個(gè)切片。
  • 然后使用并行框架并發(fā)的去遍歷每個(gè)切片,這樣就可以快速的把整個(gè)集群的節(jié)點(diǎn)和邊獲取出來(lái)。
  • 把所有的節(jié)點(diǎn)和邊導(dǎo)入到HDFS中。
  • 編寫(xiě)代碼把導(dǎo)入的文件轉(zhuǎn)化為graphx 支持的RDD格式,并生成graph對(duì)象,然后就可以調(diào)用graph對(duì)象的方法。
  • 把整個(gè)程序打包成jar包提交給spark平臺(tái),就可以分布式的調(diào)用graphx的相關(guān)方法,比如統(tǒng)計(jì)節(jié)點(diǎn)。

我們構(gòu)建了一個(gè)3000萬(wàn)個(gè)節(jié)點(diǎn),4000萬(wàn)邊的數(shù)據(jù)集。在10個(gè)節(jié)點(diǎn)的spark集群上,大概需要1分鐘。

大概的實(shí)現(xiàn):

  • 使用java+scala的混合方式。
  • java線(xiàn)程池讀遍歷hugegraph分片,并導(dǎo)入數(shù)據(jù)到hdfs
edgeShards.forEach(shard->
            {
                try
                {
                    producerSemaphore.acquire();
                } catch (InterruptedException e)
                {
                    throw new RuntimeException("can't acquire consumer semaphore");
                }

                HugeEdgeHandler edgeHandler = new HugeEdgeHandler(shard,graphQueryDao,graphSerializer,
                        hadoopProperties,hdfsDir);
                CompletableFuture.runAsync(
                        edgeHandler,
                        producerExecutor
                ).whenComplete((r,e)->{ producerSemaphore.release();});
            }
        );
        
    @Override
    public void run()
    {
        //TODO add Retry
        List<Edge> edges = graphQueryDao.getEdgesByShard(this.currentShard);
        log.info("add {} edges to queue using shard {}", edges.size(), this.currentShard);
        Collection<String> lines = edges.stream().map(e-> graphxLine(graphSerializer.writeEdge(e))).collect(Collectors.toList());
        HadoopFileUtils.writeFromLocalToHdfs(hadoopProperties, lines.iterator(), getHdfsFileName());
        log.info("write {} lines into  hdfs file {}",lines.size(), getHdfsFileName());
    }
  • 構(gòu)建GraphX的RDD對(duì)象,生成Graph并調(diào)用算法,這部分由scala實(shí)現(xiàn)。
object GraphXTraversal {
  def run(context : SparkContext, vertexFileName: String, edgeFileName: String, traversalFileName: String): Unit = {
    val vertexRdd : RDD[(VertexId, String)] = context.textFile(vertexFileName)
      .map(line => {
        val parts = line.split("\t")
        (parts(1).toLong, parts(0))
      })
    val edgeRdd : RDD[Edge[String]] = context.textFile(edgeFileName)
      .map(line => {
        val parts = line.split("\t")
        new Edge(parts(1).toLong, parts(2).toLong, parts(0))
      })

    val graph : Graph[String, String]  =  Graph(vertexRdd, edgeRdd)

    val vConunt = graph.vertices.count();
    val eCount = graph.edges.count();
    val output=context.makeRDD(List(vConunt,eCount));
    output.saveAsTextFile(traversalFileName);
  }


}
  • java+scala混合打包,pom.xml 的配置是關(guān)鍵,需要加入scala插件,并且顯示指定main方法,否則 spark-submit的時(shí)候會(huì)報(bào) class not found 的問(wèn)題。
<plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.4.1</version>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <skipAssembly>false</skipAssembly>
            </configuration>
            <executions>
                <execution>
                    <id>package</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

  • spark 集群自己調(diào)用hugegraph并生成rdd
    使用如下的方式 讓spark 的集群并發(fā)獲取shard的信息直接構(gòu)建,RDD,這樣是最快的。
 JavaRDD<Edge<String>> edgeRDD = rddShards.flatMap((shard)->
        {
            HugeClient  Xcv =new HugeClient("http://10.20.205.167:8080", "hugegraph",1200);
            List<com.baidu.hugegraph.structure.graph.Edge> baiduEdges =  Xcv.traverser().edges(shard);
            List<Edge<String>> sparkEdge = Lists.newArrayList();
            baiduEdges.stream().forEach(e->
            {
                sparkEdge.add(new Edge<String>(Long.parseLong(String.valueOf(e.source()))
                        ,Long.parseLong(String.valueOf(e.target())),e.label()));
            });
            return sparkEdge.iterator();
        }
        );

但是打包運(yùn)行的時(shí)候遇到了各種問(wèn)題

  • 其中一個(gè)是 Hugeclient checkVersion失敗。

原因 hugegraph的組件啟動(dòng)的時(shí)候會(huì)去 自己的包里面 manifest.mf 里面找一個(gè) implemnt-version的變量。
看和程序的版本是否一致。 hugegraph-client, hugegraph-common 的包里都有這個(gè)文件。

但是當(dāng)使用 maven assembly 和maven shade的時(shí)候, 依賴(lài)的包都會(huì)被解壓, manifest 都沒(méi)有copy過(guò)來(lái),或者說(shuō)被覆蓋了。

造成 hugegraph啟動(dòng)的時(shí)候一直就是 version 無(wú)法match。

  • 解決的辦法, 不能unpack 依賴(lài)的jar 包,否則manifest信息會(huì)丟失。
    在不unpack 的情況下,只能將jar 拷貝到目錄,然后指定 classpath。
 <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                         <addClasspath>true</addClasspath>
                         <classpathPrefix>lib/</classpathPrefix>
                         <mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
                     </manifest>
                   </archive>
                </configuration>
            </plugin>
        <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-dependency-plugin</artifactId>
             <version>2.10</version>
             <executions>
                 <execution>
                     <id>copy-dependencies</id>
                     <phase>package</phase>
                     <goals>
                         <goal>copy-dependencies</goal>
                     </goals>
                     <configuration>
                         <outputDirectory>${project.build.directory}/lib</outputDirectory>
                     </configuration>
                 </execution>
             </executions>
         </plugin>

在spak-submit的時(shí)候,指定 --jar a.jar,b.jar myapplication.jar

--jar 參數(shù)一定要在 myapplication.jar 之前。 為啥還有這種限制,真的是無(wú)語(yǔ)了??泳薅?。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容