使用NiFi將數(shù)據(jù)從Mysql導(dǎo)入至HBase

1. 啟動并登錄NiFi

1.1 在本機啟動nifi

./bin/nifi.sh start

1.2 登錄nifi

打開瀏覽器,訪問xxx.xxx.xxx.xxx:8080(默認端口是8080)

2. 構(gòu)建Processor

2.1 ExecuteSQL

拖一個Processor到面板中,選擇ExecuteSQL類型

在processor上點擊右鍵-->選擇configure-->選擇properties

image.png

選擇Database Connection Pooling Service屬性值為:DBCPConnectionPool,然后再點擊右側(cè)的箭頭,進入項目的Configuration界面,配置數(shù)據(jù)庫連接的具體信息。

image.png

在項目的Configuration頁面中,選擇新生成的DBCPConnectionPool條目,點擊右側(cè)的齒輪按鈕,進行相關(guān)配置。

image.png

依次填寫:

  • Database Connection URL:

    數(shù)據(jù)庫連接字符串,如:
    jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true

  • Database Driver Class Name:

    數(shù)據(jù)庫連接驅(qū)動類,這里是:com.mysql.jdbc.Driver

  • Database Driver Location(s):

    數(shù)據(jù)庫連接驅(qū)動jar包的位置,如:/home/hadoop/nifi/mysql-connector-java-5.1.39-bin.jar

  • Database User:

    數(shù)據(jù)庫用戶名,如:root

  • Password:

    數(shù)據(jù)庫密碼,如:root

回到ExecuteSQL的properties,設(shè)置SQL select query為想要查詢的SQL語句,設(shè)置Max Wait Time為10 seconds。

image.png

2.2 ConvertAvroToJson

從ExecuteSQL里出來的是avro格式的數(shù)據(jù),要先將其轉(zhuǎn)化成json格式,再導(dǎo)入HBase。拖一個ConvertAvroToJson Processor到界面。然后,從ExecuteSQL連一條線到ConvertAvroToJson,關(guān)系為success。

image.png

2.3 SplitJson

從上一步輸出的數(shù)據(jù)是由多條記錄構(gòu)成的整體,需要將其分割成獨立的單條數(shù)據(jù),再依次導(dǎo)入HBase。

拖入一個SplitJson processor到界面中,然后從ConvertAvroToJson連一條線到SplitJson,關(guān)系為success。

配置SplitJson,在properties頁,將JsonPath Expression設(shè)置為*

image.png

2.4 PutHbaseJson

這一步將分割好的json格式的Mysql記錄添加到HBase中。

拖入一個PutHbaseJson processor到界面中,右鍵點擊,選擇configure,進行配置。

在properties頁,配置HBase Client Service為 HBase_1_1_2_ClientService

image.png

點擊右側(cè)的小箭頭,增加HbaseClientService,然后點擊齒輪按鈕,在properites中依次設(shè)置:

  • ZooKeeper Quorum :

    Zookeeper的地址,如:192.168.2.xxx

  • Zookeeper client port:

    Zookeeper的端口,一般是:2181

  • Zookeeper znode parent:

    Zookeeper中的znode,一般是:/hbase

image.png
image.png

配置完成后,點擊小閃電,讓配置生效

image.png

回到PutHbaseJson的properties,配置RowIdentifier。依次設(shè)置:

  • Table Name:

    數(shù)據(jù)導(dǎo)入到HBase的表名稱,需要是已存在的表

  • Row Identifier Field Name:

    作為HBase表行鍵的字段名稱,一般設(shè)置為mysql中的主鍵

  • Row Identified Encoding Strategy:

    行鍵編碼方式,根據(jù)行鍵類型選擇String或Binary

  • Column Famlily:

    數(shù)據(jù)導(dǎo)入到HBase表的列族名稱

image.png

從SplitJosn連一條線到PutHbaseJson,關(guān)系為split

2.5 LogAttribute

拖入一個LogAttribute到界面中,從其他所有processor連一條線到LogAttribute,關(guān)系全部選則為failure,其中splitJson的original關(guān)系也連接到LogAttribute。

image.png

接下來定義數(shù)據(jù)流的終止方式:

(1)點擊LogAttribute的confiure,在setting標(biāo)簽中,勾選右側(cè)Automatically Terminate Relationships的success選項。

image.png

(2)點擊PutHbaseJson的confiure,在setting標(biāo)簽中,勾選右側(cè)Automatically Terminate Relationships的success選項。

image.png

3. 檢查并開始導(dǎo)入

完成以上步驟后,整體結(jié)構(gòu)圖類似于下圖所示:

image.png

點擊空白處,再點擊左側(cè)Operate面板的齒輪按鈕,檢查數(shù)據(jù)庫連接

image.png

確保mysql與hbase的連接都處于enable狀態(tài),如果沒有,則可以查看左側(cè)的錯誤提示,根據(jù)提示修改錯誤。

image.png

最后,根據(jù)需要設(shè)置ExecuteSQL processor的scheduling選項,默認的執(zhí)行間隔是0秒,即不間斷的執(zhí)行SQL語句,會導(dǎo)致從Mysql中讀出大量重復(fù)數(shù)據(jù)。如果僅僅需要將一次SQL查詢的結(jié)果導(dǎo)入HBase,建議將該值設(shè)置大一些,等待執(zhí)行完畢后手動結(jié)束即可;如果需要定期執(zhí)行,則應(yīng)設(shè)置合適的執(zhí)行間隔時間。

image.png

以上檢查完成后,在Operate面板中點擊run按鈕,開始進行從mysql到hbase的轉(zhuǎn)換,轉(zhuǎn)換完成后,點擊stop按鈕停止轉(zhuǎn)換。也可單獨控制每一個Processor的啟動與停止?fàn)顟B(tài),以便調(diào)試。

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容