當(dāng)數(shù)據(jù)量太大的時(shí)候,對(duì)hugegraph 進(jìn)行一些統(tǒng)計(jì)查詢(xún)或者算法遍歷的時(shí)候,經(jīng)常會(huì)超時(shí),或者時(shí)間很久。
這個(gè)時(shí)候需要借助與大數(shù)據(jù)相關(guān)的技術(shù)。hugegraph 之前提供了一個(gè)hugegraph-spark的組件,
但是變態(tài)的是,目前該組件已經(jīng)商業(yè)化,還好其團(tuán)隊(duì)在git上大概介紹了實(shí)現(xiàn)的思路。
- 使用hugegraph 的切片查詢(xún)功能。 因?yàn)榈讓邮褂胏assandra,可以先查出整個(gè)集群有多少個(gè)切片。
- 然后使用并行框架并發(fā)的去遍歷每個(gè)切片,這樣就可以快速的把整個(gè)集群的節(jié)點(diǎn)和邊獲取出來(lái)。
- 把所有的節(jié)點(diǎn)和邊導(dǎo)入到HDFS中。
- 編寫(xiě)代碼把導(dǎo)入的文件轉(zhuǎn)化為graphx 支持的RDD格式,并生成graph對(duì)象,然后就可以調(diào)用graph對(duì)象的方法。
- 把整個(gè)程序打包成jar包提交給spark平臺(tái),就可以分布式的調(diào)用graphx的相關(guān)方法,比如統(tǒng)計(jì)節(jié)點(diǎn)。
我們構(gòu)建了一個(gè)3000萬(wàn)個(gè)節(jié)點(diǎn),4000萬(wàn)邊的數(shù)據(jù)集。在10個(gè)節(jié)點(diǎn)的spark集群上,大概需要1分鐘。
大概的實(shí)現(xiàn):
- 使用java+scala的混合方式。
- java線(xiàn)程池讀遍歷hugegraph分片,并導(dǎo)入數(shù)據(jù)到hdfs
edgeShards.forEach(shard->
{
try
{
producerSemaphore.acquire();
} catch (InterruptedException e)
{
throw new RuntimeException("can't acquire consumer semaphore");
}
HugeEdgeHandler edgeHandler = new HugeEdgeHandler(shard,graphQueryDao,graphSerializer,
hadoopProperties,hdfsDir);
CompletableFuture.runAsync(
edgeHandler,
producerExecutor
).whenComplete((r,e)->{ producerSemaphore.release();});
}
);
@Override
public void run()
{
//TODO add Retry
List<Edge> edges = graphQueryDao.getEdgesByShard(this.currentShard);
log.info("add {} edges to queue using shard {}", edges.size(), this.currentShard);
Collection<String> lines = edges.stream().map(e-> graphxLine(graphSerializer.writeEdge(e))).collect(Collectors.toList());
HadoopFileUtils.writeFromLocalToHdfs(hadoopProperties, lines.iterator(), getHdfsFileName());
log.info("write {} lines into hdfs file {}",lines.size(), getHdfsFileName());
}
- 構(gòu)建GraphX的RDD對(duì)象,生成Graph并調(diào)用算法,這部分由scala實(shí)現(xiàn)。
object GraphXTraversal {
def run(context : SparkContext, vertexFileName: String, edgeFileName: String, traversalFileName: String): Unit = {
val vertexRdd : RDD[(VertexId, String)] = context.textFile(vertexFileName)
.map(line => {
val parts = line.split("\t")
(parts(1).toLong, parts(0))
})
val edgeRdd : RDD[Edge[String]] = context.textFile(edgeFileName)
.map(line => {
val parts = line.split("\t")
new Edge(parts(1).toLong, parts(2).toLong, parts(0))
})
val graph : Graph[String, String] = Graph(vertexRdd, edgeRdd)
val vConunt = graph.vertices.count();
val eCount = graph.edges.count();
val output=context.makeRDD(List(vConunt,eCount));
output.saveAsTextFile(traversalFileName);
}
}
- java+scala混合打包,pom.xml 的配置是關(guān)鍵,需要加入scala插件,并且顯示指定main方法,否則 spark-submit的時(shí)候會(huì)報(bào) class not found 的問(wèn)題。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<archive>
<manifest>
<mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<skipAssembly>false</skipAssembly>
</configuration>
<executions>
<execution>
<id>package</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
- spark 集群自己調(diào)用hugegraph并生成rdd
使用如下的方式 讓spark 的集群并發(fā)獲取shard的信息直接構(gòu)建,RDD,這樣是最快的。
JavaRDD<Edge<String>> edgeRDD = rddShards.flatMap((shard)->
{
HugeClient Xcv =new HugeClient("http://10.20.205.167:8080", "hugegraph",1200);
List<com.baidu.hugegraph.structure.graph.Edge> baiduEdges = Xcv.traverser().edges(shard);
List<Edge<String>> sparkEdge = Lists.newArrayList();
baiduEdges.stream().forEach(e->
{
sparkEdge.add(new Edge<String>(Long.parseLong(String.valueOf(e.source()))
,Long.parseLong(String.valueOf(e.target())),e.label()));
});
return sparkEdge.iterator();
}
);
但是打包運(yùn)行的時(shí)候遇到了各種問(wèn)題
- 其中一個(gè)是 Hugeclient checkVersion失敗。
原因 hugegraph的組件啟動(dòng)的時(shí)候會(huì)去 自己的包里面 manifest.mf 里面找一個(gè) implemnt-version的變量。
看和程序的版本是否一致。 hugegraph-client, hugegraph-common 的包里都有這個(gè)文件。
但是當(dāng)使用 maven assembly 和maven shade的時(shí)候, 依賴(lài)的包都會(huì)被解壓, manifest 都沒(méi)有copy過(guò)來(lái),或者說(shuō)被覆蓋了。
造成 hugegraph啟動(dòng)的時(shí)候一直就是 version 無(wú)法match。
- 解決的辦法, 不能unpack 依賴(lài)的jar 包,否則manifest信息會(huì)丟失。
在不unpack 的情況下,只能將jar 拷貝到目錄,然后指定 classpath。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
在spak-submit的時(shí)候,指定 --jar a.jar,b.jar myapplication.jar
--jar 參數(shù)一定要在 myapplication.jar 之前。 為啥還有這種限制,真的是無(wú)語(yǔ)了??泳薅?。