1. 啟動并登錄NiFi
1.1 在本機啟動nifi
./bin/nifi.sh start
1.2 登錄nifi
打開瀏覽器,訪問xxx.xxx.xxx.xxx:8080(默認端口是8080)
2. 構(gòu)建Processor
2.1 ExecuteSQL
拖一個Processor到面板中,選擇ExecuteSQL類型
在processor上點擊右鍵-->選擇configure-->選擇properties

選擇Database Connection Pooling Service屬性值為:DBCPConnectionPool,然后再點擊右側(cè)的箭頭,進入項目的Configuration界面,配置數(shù)據(jù)庫連接的具體信息。

在項目的Configuration頁面中,選擇新生成的DBCPConnectionPool條目,點擊右側(cè)的齒輪按鈕,進行相關(guān)配置。

依次填寫:
-
Database Connection URL:
數(shù)據(jù)庫連接字符串,如:
jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true -
Database Driver Class Name:
數(shù)據(jù)庫連接驅(qū)動類,這里是:com.mysql.jdbc.Driver
-
Database Driver Location(s):
數(shù)據(jù)庫連接驅(qū)動jar包的位置,如:/home/hadoop/nifi/mysql-connector-java-5.1.39-bin.jar
-
Database User:
數(shù)據(jù)庫用戶名,如:root
-
Password:
數(shù)據(jù)庫密碼,如:root
回到ExecuteSQL的properties,設(shè)置SQL select query為想要查詢的SQL語句,設(shè)置Max Wait Time為10 seconds。

2.2 ConvertAvroToJson
從ExecuteSQL里出來的是avro格式的數(shù)據(jù),要先將其轉(zhuǎn)化成json格式,再導(dǎo)入HBase。拖一個ConvertAvroToJson Processor到界面。然后,從ExecuteSQL連一條線到ConvertAvroToJson,關(guān)系為success。

2.3 SplitJson
從上一步輸出的數(shù)據(jù)是由多條記錄構(gòu)成的整體,需要將其分割成獨立的單條數(shù)據(jù),再依次導(dǎo)入HBase。
拖入一個SplitJson processor到界面中,然后從ConvertAvroToJson連一條線到SplitJson,關(guān)系為success。
配置SplitJson,在properties頁,將JsonPath Expression設(shè)置為*

2.4 PutHbaseJson
這一步將分割好的json格式的Mysql記錄添加到HBase中。
拖入一個PutHbaseJson processor到界面中,右鍵點擊,選擇configure,進行配置。
在properties頁,配置HBase Client Service為 HBase_1_1_2_ClientService

點擊右側(cè)的小箭頭,增加HbaseClientService,然后點擊齒輪按鈕,在properites中依次設(shè)置:
-
ZooKeeper Quorum :
Zookeeper的地址,如:192.168.2.xxx
-
Zookeeper client port:
Zookeeper的端口,一般是:2181
-
Zookeeper znode parent:
Zookeeper中的znode,一般是:/hbase


配置完成后,點擊小閃電,讓配置生效

回到PutHbaseJson的properties,配置RowIdentifier。依次設(shè)置:
-
Table Name:
數(shù)據(jù)導(dǎo)入到HBase的表名稱,需要是已存在的表
-
Row Identifier Field Name:
作為HBase表行鍵的字段名稱,一般設(shè)置為mysql中的主鍵
-
Row Identified Encoding Strategy:
行鍵編碼方式,根據(jù)行鍵類型選擇String或Binary
-
Column Famlily:
數(shù)據(jù)導(dǎo)入到HBase表的列族名稱

從SplitJosn連一條線到PutHbaseJson,關(guān)系為split
2.5 LogAttribute
拖入一個LogAttribute到界面中,從其他所有processor連一條線到LogAttribute,關(guān)系全部選則為failure,其中splitJson的original關(guān)系也連接到LogAttribute。

接下來定義數(shù)據(jù)流的終止方式:
(1)點擊LogAttribute的confiure,在setting標(biāo)簽中,勾選右側(cè)Automatically Terminate Relationships的success選項。

(2)點擊PutHbaseJson的confiure,在setting標(biāo)簽中,勾選右側(cè)Automatically Terminate Relationships的success選項。

3. 檢查并開始導(dǎo)入
完成以上步驟后,整體結(jié)構(gòu)圖類似于下圖所示:

點擊空白處,再點擊左側(cè)Operate面板的齒輪按鈕,檢查數(shù)據(jù)庫連接

確保mysql與hbase的連接都處于enable狀態(tài),如果沒有,則可以查看左側(cè)的錯誤提示,根據(jù)提示修改錯誤。

最后,根據(jù)需要設(shè)置ExecuteSQL processor的scheduling選項,默認的執(zhí)行間隔是0秒,即不間斷的執(zhí)行SQL語句,會導(dǎo)致從Mysql中讀出大量重復(fù)數(shù)據(jù)。如果僅僅需要將一次SQL查詢的結(jié)果導(dǎo)入HBase,建議將該值設(shè)置大一些,等待執(zhí)行完畢后手動結(jié)束即可;如果需要定期執(zhí)行,則應(yīng)設(shè)置合適的執(zhí)行間隔時間。

以上檢查完成后,在Operate面板中點擊run按鈕,開始進行從mysql到hbase的轉(zhuǎn)換,轉(zhuǎn)換完成后,點擊stop按鈕停止轉(zhuǎn)換。也可單獨控制每一個Processor的啟動與停止?fàn)顟B(tài),以便調(diào)試。
