【NiFi csv to mysql系列】三寫入數(shù)據(jù)庫(kù)

Abstract:

本文章將展示如何將一個(gè)包含多條數(shù)據(jù)的文本文件保存到數(shù)據(jù)庫(kù)中,每條數(shù)據(jù)對(duì)應(yīng)數(shù)據(jù)庫(kù)中的一條記錄

整體流程

下面將依次詳細(xì)說(shuō)明每個(gè)核心processor的配置以及完成的功能

每一行為一條記錄,包含三個(gè)部分,并用雙分號(hào)分割
  • ExecuteScript就是一個(gè)執(zhí)行特定腳本文件的處理器,腳本內(nèi)容在Script body中指定,同時(shí)可以指定腳本的語(yǔ)言。我選用的是groovy編寫的腳本,跟java非常類似,并且支持java的語(yǔ)法


因?yàn)檩斎胛募母袷讲环螩SV json等格式,因此我們需要對(duì)其進(jìn)行格式轉(zhuǎn)換,將每一行的內(nèi)容通過(guò)”::”分割,然后采用”;”進(jìn)行拼接(或者直接將”::”替換為”;”),也就是轉(zhuǎn)換為csv格式。同時(shí)對(duì)可以自定義csv表頭

import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*

def flowFile = session.get()
if (!flowFile) return
flowFile = session.write(flowFile, { inputStream, outputStream ->
    def stringBuilder = new StringBuilder()
//    添加csv表頭
    stringBuilder.append("id;director;type\n")
    def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    def words = tellTaleHeart.split("::|\\n")
    def length=words.length
    def count=0
    for(int i=0;i<length;i++){
        String word= words[i]
        if(word!=null&&word.length()>0){
            stringBuilder.append(word)
            count=count+1
            if(count!=0&&count%3==0)
                stringBuilder.append("\n")
            else
                stringBuilder.append(";")
        }
    }
outputStream.write(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', 'movies')
session.transfer(flowFile, REL_SUCCESS)
  • AddSchemaNameAttribute:對(duì)處理的flowfile添加一個(gè)屬性


直接點(diǎn)擊右上角的+號(hào),添加schema.name這個(gè)屬性,并把value設(shè)置為movies
  • convertRecord:將處理后的用”;”分割的數(shù)據(jù)轉(zhuǎn)化為json格式


Record Reader:CSVReader(根據(jù)所要讀入數(shù)據(jù)的格式進(jìn)行設(shè)定) 
點(diǎn)擊右側(cè)的箭頭,對(duì)CSVReader的屬性進(jìn)行設(shè)定 
主要涉及到一下兩個(gè)屬性: 
1.Schema Access Strategy: 在之前的一個(gè)處理器中我們給flowfile添加了schema屬性,因此選擇Use Schema Name Property 

1.  Schema Registry:選擇 AvroSchemaRegistry,并點(diǎn)擊右側(cè)的箭頭進(jìn)行配置: 
    這里property跟之前添加的schema.name相同,對(duì)于value需要根據(jù)輸入數(shù)據(jù)的屬性進(jìn)行設(shè)置 
    例如我們之前的輸入數(shù)據(jù),每一行包含三個(gè)部分,我們分別命名為id,director,filmType
{
    "type":"record",
    "name":"MovieRecord",
    "fields":[
        {"name":"id","type":"long"},
        {"name":"director","type":["null","string"]},
        {"name":"filmType","type":["null","string"]}
    ]
}
  1. Value Separator:這里選擇的是”;”
    Record Writer:JsonRecordSetWriter(根據(jù)處理后的數(shù)據(jù)格式設(shè)定)


設(shè)置方法跟CSVReader類似 

*   SplitJson:將json進(jìn)行拆分。現(xiàn)在得到的是一個(gè)大的Json記錄,現(xiàn)在需要將每一條記錄單獨(dú)封裝為一個(gè)flowfile,并insert到數(shù)據(jù)庫(kù)中 
   ![](https://upload-images.jianshu.io/upload_images/12325689-ed6b0a8bc3d11478.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


    JsonPath Expression: 這個(gè)需要根據(jù)當(dāng)前flowfile內(nèi)容中的格式進(jìn)行設(shè)定 
    我們當(dāng)前輸入數(shù)據(jù)的格式如下,因此我們的JsonPath Expression為$.*
[
{"id":115,"director":"Happiness Is in the Field (Bonheur est dans le pr茅, Le) (1995)","filmType":"Comedy"},
{"id":116,"director":"Anne Frank Remembered (1995)","filmType":"Documentary"},
{"id":117,"director":"Young Poisoner's Handbook, The (1995)","filmType":"Crime|Drama"},
{"id":118,"director":"If Lucy Fell (1996)","filmType":"Comedy|Romance"},
{"id":119,"director":"Steal Big, Steal Little (1995)","filmType":"Comedy"},{"id":120,"director":"Race the Sun (1996)","filmType":"Drama"},{"id":121,"director":"Boys of St. Vincent, The (1992)","filmType":"Drama"},
....
]
  • ConvertToCSV(抱歉,實(shí)際上這個(gè)是ConvertJsonToSQL 處理器)


這部分之前涉及過(guò),可以參考上一篇博客
  • PutSQL:執(zhí)行生成的sql語(yǔ)句,也就是執(zhí)行sql語(yǔ)句將數(shù)據(jù)插入到數(shù)據(jù)庫(kù)中
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容