摘要: 主要介紹如何通過官方 ETL 工具 Exchange 將業(yè)務(wù)線上數(shù)據(jù)從 Neo4j 直接導(dǎo)入到 Nebula Graph 以及在導(dǎo)入過程中遇到的問題和優(yōu)化方法。
本文首發(fā)于 Nebula 論壇:https://discuss.nebula-graph.com.cn/t/topic/2044

1 背景
隨著業(yè)務(wù)數(shù)據(jù)量不斷增長(zhǎng),業(yè)務(wù)對(duì)圖數(shù)據(jù)庫在線數(shù)據(jù)實(shí)時(shí)更新寫入和查詢的效率要求也不斷增加。Neo4j 存在明顯性能不足,Neo4j 社區(qū)開源版本只支持單機(jī)部署,擴(kuò)展能力存在比較大的問題,無法滿足讀寫性能的線性擴(kuò)展以及讀寫分離的業(yè)務(wù)需求,并且開源版本 Neo4j 對(duì)點(diǎn)和邊的總數(shù)據(jù)量也有限制;而 Neo4j 企業(yè)版因果集群也存在單機(jī)主節(jié)點(diǎn) Cypher 實(shí)時(shí)寫入的性能瓶頸。
相比于 Neo4j,Nebula Graph 最大的特色便是采用 shared-nothing 分布式的架構(gòu),無單主寫入瓶頸問題,讀寫支持線性擴(kuò)展,擅長(zhǎng)處理千億節(jié)點(diǎn)、萬億條邊的超大規(guī)模數(shù)據(jù)集。
本文主要介紹如何通過官方 ETL 工具 Exchange 將業(yè)務(wù)線上數(shù)據(jù)從 Neo4j 直接導(dǎo)入到 Nebula Graph 以及在導(dǎo)入過程中遇到的問題和優(yōu)化方法。其中絕大部分問題都已經(jīng)通過論壇發(fā)帖的方式得到社區(qū)的支持和解決,本文會(huì)結(jié)合問題進(jìn)行逐一列舉。
2 部署環(huán)境
系統(tǒng)環(huán)境:
- CPU name:Intel(R) Xeon(R) Silver 4114 CPU @ 2.20GHz
- CPU Cores:40
- Memory Size:376 GB
- Disk:HDD
- System:CentOS Linux release 7.4.1708 (Core)
軟件環(huán)境:
- Neo4j:3.4 版本,五節(jié)點(diǎn)因果集群
- Nebula Graph:
- 版本: nebula-graph v1.1.0 源碼編譯安裝,
- 部署:?jiǎn)闻_(tái)服務(wù)器部署三節(jié)點(diǎn) Nebula Graph 集群。
- Exchange:nebula-java v1.1.0 源碼編譯 jar 包
- 數(shù)倉環(huán)境:
- hadoop-2.7.4
- spark-2.3.1
注意:?jiǎn)闻_(tái)機(jī)器部署 Nebula 多節(jié)點(diǎn)的端口分配:每個(gè) storage 還會(huì)將用戶配置的端口號(hào) + 1的端口作為內(nèi)部使用。請(qǐng)參考論壇帖子 nebula從neo4j導(dǎo)入數(shù)據(jù)出現(xiàn)Get UUID Failed錯(cuò)誤
3 全量 & 增量數(shù)據(jù)導(dǎo)入
3.1 全量導(dǎo)入
根據(jù) Neo4j 點(diǎn)和邊的屬性信息創(chuàng)建 Nebula Graph 的 Tag 和 Edge 結(jié)構(gòu),這里需要注意一點(diǎn),業(yè)務(wù)可能會(huì)根據(jù)不同需求只在部分點(diǎn)和邊上增加 Neo4j 點(diǎn)和邊的屬性信息,其他點(diǎn)和邊對(duì)應(yīng)的屬性為 NULL,所以需要先跟業(yè)務(wù)明確一下點(diǎn)和邊的全部屬性信息,避免遺漏屬性。Nebula Graph 的 Schema 信息類似 MySQL,支持 Create 和 Alter 添加屬性,并且所有的 Tag 和 Edge 的元數(shù)據(jù)信息是一致的。
1、Nebula Graph 創(chuàng)建 Tag 和 Edge
# 示例
# 創(chuàng)建圖空間,10 個(gè)分區(qū),3 個(gè) storage 副本。
CREATE SPACE test(partition_num=10,replica_factor=3);
# 選擇圖空間 test
USE test;
# 創(chuàng)建標(biāo)簽 tagA
CREATE TAG tagA(vid string, field-a0 string, field-a1 bool, field-a2 double);
# 創(chuàng)建標(biāo)簽 tagB
CREATE TAG tagB(vid string, field-b0 string, field-b1 bool, field-b2 double);
# 創(chuàng)建邊類型 edgeAB
CREATE EDGE edgeAB(vid string, field-e0 string, field-e1 bool, field-e2 double);
2、Exchange 導(dǎo)入配置文件
- Exchange 配置目前不支持
bolt+routing的方式連接neo4j,如果是因果集群,可以選擇一個(gè)從節(jié)點(diǎn)進(jìn)行bolt方式直連讀取數(shù)據(jù),減少集群壓力。 - 我們業(yè)務(wù)的 Neo4j 數(shù)據(jù)點(diǎn)和邊的 vid 是 string 類型,Nebula v1.x 版本還不支持 string 直接當(dāng)做 vid(v2.0支持),考慮到官方文檔中的描述:“當(dāng)點(diǎn)數(shù)量到達(dá)十億級(jí)別時(shí),用 hash 函數(shù)生成 vid 有一定的沖突概率。因此 Nebula Graph 提供 UUID 函數(shù)來避免大量點(diǎn)時(shí)的 vid 沖突。” 選擇了uuid() 作為轉(zhuǎn)化函數(shù),但是導(dǎo)入效率要比 hash 低,而且 uuid() 在未來版本可能存在兼容問題。
- partition: 是指 Exchange 從 Neo4j 拉取數(shù)據(jù)的分頁個(gè)數(shù)。
- batch: 是指批量插入 Nebula 的 batch 大小。
{
# Spark relation config
spark: {
app: {
name: Spark Writer
}
driver: {
cores: 1
maxResultSize: 1G
}
cores {
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["xxx.xxx.xxx.xx:3699"]
meta:["xxx.xxx.xxx.xx:45500"]
}
user: user
pswd: password
space: test
connection {
timeout: 3000
retry: 3
}
execution {
retry: 3
}
error: {
max: 32
output: /tmp/errors
}
rate: {
limit: 1024
timeout: 1000
}
}
# Processing tags
tags: [
# Loading tag from neo4j
{
name: tagA
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (n:tagA) where id(n) < 300000000 return n.vid as vid, n.field-a0 as field-a0, n.field-a1 as field-a1, n.field-a2 as field-a2 order by id(n)"
fields: [vid, field-a0, field-a1, field-a2]
nebula.fields: [vid, field-a0, field-a1, field-a2]
vertex: {
field: vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
# Loading tag from neo4j
{
name: tagB
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (n:tagB) where id(n) < 300000000 return n.vid as vid, n.field-b0 as field-b0, n.field-b1 as field-b1, n.field-b2 as field-b2 order by id(n)"
fields: [vid, field-b0, field-b1, field-b2]
nebula.fields: [vid, field-b0, field-b1, field-b2]
vertex: {
field: vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
]
# Processing edges
edges: [
# Loading edges from neo4j
{
name: edgeAB
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) where id(r) < 300000000 return n.vid as vid, n.field-e0 as field-e0, n.field-e1 as field-e1, n.field-e2 as field-e2 order by id(r)"
fields: [vid, field-e0, field-e1, field-e2]
nebula.fields: [vid, field-e0, field-e1, field-e2]
source: {
field: a.vid
policy: "uuid"
}
target: {
field: b.vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
]
}
3、執(zhí)行導(dǎo)入命令
nohup spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local" exchange-1.1.0.jar -c test.conf > test.log &
4、查看導(dǎo)入 Nebula Graph 的數(shù)據(jù)量
./bin/db_dump --space=test --db_path=./data/storage/nebula/ --meta_server=127.0.0.1:45500 -limit 0 --mode=stat --tags=tagA,tagB --edges=edgeAB
注意:Nebula 1.x 版本目前還只能用 db_dump 統(tǒng)計(jì),2.0 會(huì)支持 nGQL 命令的方式統(tǒng)計(jì)數(shù)量。
3.2 增量導(dǎo)入
增量數(shù)據(jù)導(dǎo)入主要是通過 Neo4j 內(nèi)部點(diǎn)和邊的自增 id() 進(jìn)行切割,在導(dǎo)入配置文件 exec 項(xiàng)執(zhí)行 Neo4j Cypher 語句時(shí)增加 id() 范圍限制,但前提是需要業(yè)務(wù)停掉刪數(shù)據(jù)操作,因?yàn)樵隽繉?dǎo)入時(shí),如果之前的數(shù)據(jù)被刪除后 Neo4j 會(huì)復(fù)用 id(),這會(huì)導(dǎo)致復(fù)用 id() 的增量數(shù)據(jù)導(dǎo)入時(shí)查詢不到造成數(shù)據(jù)丟失。當(dāng)然業(yè)務(wù)如果有條件支持 Neo4j Nebula 雙寫的話,增量導(dǎo)入就不會(huì)出現(xiàn)這種問題。
exec: "match (n:user) where id(n) >= 300000000 and id(n) < 400000000 return xxx order by id(n)"
請(qǐng)參考論壇帖子 neo4j到nebula如何做增量導(dǎo)入
3.3 導(dǎo)入問題及解決
使用 Exchange 導(dǎo)入過程中遇到兩個(gè)問題,及時(shí)的得到官方 @nicole 的支持和解決,具體請(qǐng)參考下面兩個(gè)帖子:
- nebula從neo4j導(dǎo)入數(shù)據(jù),部分屬性帶回車,拼insert報(bào)錯(cuò),有什么好辦法解決嗎?
- 使用Exchange 從neo4j導(dǎo)入nebula,label中有些頂點(diǎn)的屬性值是null,導(dǎo)致導(dǎo)入失敗
問題 1:Exchange 不支持「換行回車」等特殊字符的轉(zhuǎn)義。如下 string 數(shù)據(jù)中帶有回車,在拼接 insert 語句插入時(shí)會(huì)因?yàn)閾Q行導(dǎo)致插入失敗。

PR:https://github.com/vesoft-inc/nebula-java/pull/203 已經(jīng)合入 exchange v1.0 分支
問題 2:Exchange 不支持屬性為 NULL 的數(shù)據(jù)導(dǎo)入。前文 3.1 中提到,業(yè)務(wù)可能會(huì)根據(jù)不同需求為某些點(diǎn)和邊增加屬性,這時(shí)其他點(diǎn)和邊屬性則是 NULL,這樣在使用 Exchange 導(dǎo)入時(shí)會(huì)報(bào)錯(cuò)。

參考帖子 2 給出的修改建議解決:修改 com.vesoft.nebula.tools.importer.processor.Processor#extraValue,增加 NULL 類型的轉(zhuǎn)化值。
case NullType => {
fieldTypeMap(field) match {
case StringType => ""
case IntegerType => 0
case LongType => 0L
case DoubleType => 0.0
case BooleanType => false
}
}
4 導(dǎo)入效率優(yōu)化
關(guān)于導(dǎo)入效率的優(yōu)化,請(qǐng)參考下面兩個(gè)帖子:
- 關(guān)于使用Exchange從neo4j導(dǎo)入nebula的性能問題
- 使用exchange并發(fā) spark-submit –master “l(fā)ocal\[16\]” 報(bào)錯(cuò)
優(yōu)化 1:通過適當(dāng)增加導(dǎo)入配置中的 partition 和 batch 值,提升導(dǎo)入效率。
優(yōu)化 2:如果是 string 類型做 vid 的話,1.x 版本盡量使用 hash() 函數(shù)轉(zhuǎn)化,2.0 版本會(huì)支持 string id 類型;如果是int類型做vid的話,可以直接使用,不用轉(zhuǎn)化效率更高。
優(yōu)化 3:官方建議 spark-submit 提交命令 master 配置改為 yarn-cluster, 若不使用 yarn,可配置成 spark://ip:port;我們是通過 spark-submit --master "local[16]"的方式增加 spark 并發(fā),導(dǎo)入效率比使用 "local" 提升 4 倍+,測(cè)試環(huán)境單機(jī)三節(jié)點(diǎn) HDD 盤 IO 峰值能到 200-300 MB/s。但在指定 --master "local[16]" 并發(fā)導(dǎo)入時(shí)遇到 hadoop 緩存問題,采用增加 hdfs 配置 fs.hdfs.impl.disable.cache=true 后重啟 hadoop 解決。具體請(qǐng)參考第二個(gè)帖子。
5 總結(jié)
使用 Exchange 從 Neo4j 導(dǎo)入 Nebula Graph 過程中遇到一些問題,通過積極與社區(qū)進(jìn)行溝通得到了官方 @nicole 及其他小伙伴的快速響應(yīng)和大力支持,這一點(diǎn)在 Neo4j 導(dǎo)入 Nebula Graph 的實(shí)踐過程中起到了十分關(guān)鍵的作用,感謝社區(qū)的大力支持。期待支持 openCypher 的 Nebula Graph 2.0。
6 參考鏈接
- https://nebula-graph.com.cn/posts/how-to-import-data-from-neo4j-to-nebula-graph/
- https://github.com/vesoft-inc/nebula-java/tree/v1.0
- https://docs.nebula-graph.com.cn/manual-CN/2.query-language/2.functions-and-operators/uuid/
- http://arganzheng.life/hadoop-filesystem-closed-exception.html