Abstract:
本文章將展示如何將一個(gè)包含多條數(shù)據(jù)的文本文件保存到數(shù)據(jù)庫(kù)中,每條數(shù)據(jù)對(duì)應(yīng)數(shù)據(jù)庫(kù)中的一條記錄
整體流程

下面將依次詳細(xì)說(shuō)明每個(gè)核心processor的配置以及完成的功能
- GetFile:讀取文本文件中的內(nèi)容,這個(gè)文件是http://files.grouplens.org/datasets/movielens/ml-10m-README.html中movies.dat ,大體內(nèi)容長(zhǎng)著個(gè)樣子O(∩_∩)O
每一行為一條記錄,包含三個(gè)部分,并用雙分號(hào)分割
-
ExecuteScript就是一個(gè)執(zhí)行特定腳本文件的處理器,腳本內(nèi)容在Script body中指定,同時(shí)可以指定腳本的語(yǔ)言。我選用的是groovy編寫的腳本,跟java非常類似,并且支持java的語(yǔ)法
因?yàn)檩斎胛募母袷讲环螩SV json等格式,因此我們需要對(duì)其進(jìn)行格式轉(zhuǎn)換,將每一行的內(nèi)容通過(guò)”::”分割,然后采用”;”進(jìn)行拼接(或者直接將”::”替換為”;”),也就是轉(zhuǎn)換為csv格式。同時(shí)對(duì)可以自定義csv表頭
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
def flowFile = session.get()
if (!flowFile) return
flowFile = session.write(flowFile, { inputStream, outputStream ->
def stringBuilder = new StringBuilder()
// 添加csv表頭
stringBuilder.append("id;director;type\n")
def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def words = tellTaleHeart.split("::|\\n")
def length=words.length
def count=0
for(int i=0;i<length;i++){
String word= words[i]
if(word!=null&&word.length()>0){
stringBuilder.append(word)
count=count+1
if(count!=0&&count%3==0)
stringBuilder.append("\n")
else
stringBuilder.append(";")
}
}
outputStream.write(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', 'movies')
session.transfer(flowFile, REL_SUCCESS)
-
AddSchemaNameAttribute:對(duì)處理的flowfile添加一個(gè)屬性
直接點(diǎn)擊右上角的+號(hào),添加schema.name這個(gè)屬性,并把value設(shè)置為movies
-
convertRecord:將處理后的用”;”分割的數(shù)據(jù)轉(zhuǎn)化為json格式
Record Reader:CSVReader(根據(jù)所要讀入數(shù)據(jù)的格式進(jìn)行設(shè)定)
點(diǎn)擊右側(cè)的箭頭,對(duì)CSVReader的屬性進(jìn)行設(shè)定

主要涉及到一下兩個(gè)屬性:
1.Schema Access Strategy: 在之前的一個(gè)處理器中我們給flowfile添加了schema屬性,因此選擇Use Schema Name Property
1. Schema Registry:選擇 AvroSchemaRegistry,并點(diǎn)擊右側(cè)的箭頭進(jìn)行配置:

這里property跟之前添加的schema.name相同,對(duì)于value需要根據(jù)輸入數(shù)據(jù)的屬性進(jìn)行設(shè)置
例如我們之前的輸入數(shù)據(jù),每一行包含三個(gè)部分,我們分別命名為id,director,filmType
{
"type":"record",
"name":"MovieRecord",
"fields":[
{"name":"id","type":"long"},
{"name":"director","type":["null","string"]},
{"name":"filmType","type":["null","string"]}
]
}
-
Value Separator:這里選擇的是”;”
Record Writer:JsonRecordSetWriter(根據(jù)處理后的數(shù)據(jù)格式設(shè)定)
設(shè)置方法跟CSVReader類似
* SplitJson:將json進(jìn)行拆分。現(xiàn)在得到的是一個(gè)大的Json記錄,現(xiàn)在需要將每一條記錄單獨(dú)封裝為一個(gè)flowfile,并insert到數(shù)據(jù)庫(kù)中

JsonPath Expression: 這個(gè)需要根據(jù)當(dāng)前flowfile內(nèi)容中的格式進(jìn)行設(shè)定
我們當(dāng)前輸入數(shù)據(jù)的格式如下,因此我們的JsonPath Expression為$.*
[
{"id":115,"director":"Happiness Is in the Field (Bonheur est dans le pr茅, Le) (1995)","filmType":"Comedy"},
{"id":116,"director":"Anne Frank Remembered (1995)","filmType":"Documentary"},
{"id":117,"director":"Young Poisoner's Handbook, The (1995)","filmType":"Crime|Drama"},
{"id":118,"director":"If Lucy Fell (1996)","filmType":"Comedy|Romance"},
{"id":119,"director":"Steal Big, Steal Little (1995)","filmType":"Comedy"},{"id":120,"director":"Race the Sun (1996)","filmType":"Drama"},{"id":121,"director":"Boys of St. Vincent, The (1992)","filmType":"Drama"},
....
]
-
ConvertToCSV(抱歉,實(shí)際上這個(gè)是ConvertJsonToSQL 處理器)
這部分之前涉及過(guò),可以參考上一篇博客
- PutSQL:執(zhí)行生成的sql語(yǔ)句,也就是執(zhí)行sql語(yǔ)句將數(shù)據(jù)插入到數(shù)據(jù)庫(kù)中





