Flink中與流廣播配合使用的readFile解析

1. 前言

最近有需求要定時(shí)監(jiān)控文件,如果文件內(nèi)容發(fā)生變化,就要?jiǎng)討B(tài)地獲取新內(nèi)容,于是就準(zhǔn)備使用 env.readFile方法,
(1)當(dāng)你監(jiān)控一個(gè)文件時(shí),當(dāng)文件內(nèi)容發(fā)生變化,會(huì)將文件的整個(gè)內(nèi)容作為流輸出。
(2)當(dāng)你監(jiān)控一個(gè)目錄的時(shí)候,發(fā)現(xiàn)當(dāng)你復(fù)制一個(gè)文件,而沒(méi)有改變這個(gè)文件的內(nèi)容,并不會(huì)監(jiān)控到也不會(huì)輸出內(nèi)容,只有某文件內(nèi)容發(fā)生改變時(shí),才會(huì)將該文件的所有內(nèi)容輸出(也不是這個(gè)目錄下的所有文件內(nèi)容輸出)

因此,上述內(nèi)容底層到底是如何實(shí)現(xiàn)的呢?是根據(jù)什么判定該文件或者該目錄發(fā)生了變化呢?

2.源碼分析

(1) env.readFile方法
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation)
具體實(shí)現(xiàn)中是調(diào)用方法
createFileInput(FileInputFormat<OUT> inputFormat, TypeInformation<OUT> typeInfo, String sourceName, FileProcessingMode monitoringMode, long interval)


(2)ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction(inputFormat, monitoringMode, this.getParallelism(), interval);
ContinuousFileMonitoringFunction類(lèi)的作用:
這是一個(gè)單個(gè)(非并行)監(jiān)視任務(wù),主要負(fù)責(zé):
a、監(jiān)視用戶提供的路徑
b、確定應(yīng)該進(jìn)一步讀取和處理哪些文件
c、創(chuàng)建與那些文件相對(duì)應(yīng)的文件輸入片
d、將它們分配給下游任務(wù)進(jìn)一步處理(分片分配到下游任務(wù),可以超過(guò)單個(gè)并行度)
注意:分片被轉(zhuǎn)發(fā)到下游時(shí),基于它們所屬文件的修改時(shí)間,以便按修改時(shí)間升序進(jìn)行讀取。

2.1 該類(lèi)的構(gòu)造器


其中,定義了分片后下游讀取的并行度readerParallelism(算子定義的并行度和1中取最大值);
還有默認(rèn)的全局修改時(shí)間globalModificationTime(初始是無(wú)窮小值)
另外,一次只能監(jiān)控一個(gè)路徑,不能同時(shí)監(jiān)控多個(gè)不同的路徑。

2.2 run方法
run方法是實(shí)現(xiàn)接口SourceFunction實(shí)現(xiàn)的方法,是用來(lái)向下游輸出元素的。

a、首先是根據(jù)文件路徑進(jìn)行初始化,獲取checkpoint鎖
這個(gè)鎖的作用是保證對(duì)state進(jìn)行checkpoint和update的操作,與元素的輸出不是同步完成的,需要在synchronized塊中。

@Override
    public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
        Path p = new Path(path);
        FileSystem fileSystem = FileSystem.get(p.toUri());
        if (!fileSystem.exists(p)) {
            throw new FileNotFoundException("The provided file path " + path + " does not exist.");
        }

b、根據(jù)watchType,決定如何向下游輸出元素。

  • PROCESS_CONTINUOUSLY類(lèi)型:
    while循環(huán),在獲得checkpointLock時(shí),調(diào)用monitorDirAndForwardSplits方法,并且線程睡眠interval毫秒,再去再次調(diào)用monitorDirAndForwardSplits方法。

  • PROCESS_ONCE類(lèi)型:
    調(diào)用一次monitorDirAndForwardSplits方法,將isRunning置為false。

switch (watchType) {
            case PROCESS_CONTINUOUSLY:
                while (isRunning) {
                    synchronized (checkpointLock) {
                        monitorDirAndForwardSplits(fileSystem, context);
                    }
                    Thread.sleep(interval);
                }

                break;
            case PROCESS_ONCE:
                synchronized (checkpointLock) {
                    if (globalModificationTime == Long.MIN_VALUE) {
                        monitorDirAndForwardSplits(fileSystem, context);
                        globalModificationTime = Long.MAX_VALUE;
                    }
                    isRunning = false;
                }
                break;
            default:
                isRunning = false;
                throw new RuntimeException("Unknown WatchType" + watchType);

2.3 monitorDirAndForwardSplits方法

        Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(path));
        Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);

        for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
            long modificationTime = splits.getKey();
            for (TimestampedFileInputSplit split: splits.getValue()) {
                LOG.info("Forwarding split: " + split);
                context.collect(split);
            }
            // update the global modification time
            globalModificationTime = Math.max(globalModificationTime, modificationTime);
        }

a、首先調(diào)用listEligibleFiles方法
FileStatus[] statuses=fileSystem.listStatus(path)獲取路徑下所有文件夾/文件的狀態(tài);

  • 文件類(lèi)型,獲取最新的修改時(shí)間,調(diào)用shouldIgnore方法(該方法里判定modificationTime <= globalModificationTime時(shí),返回true,應(yīng)該忽略)
    就是說(shuō)某個(gè)文件的修改時(shí)間,是小于等于全局最近修改時(shí)間的時(shí)候,就會(huì)忽略了,認(rèn)為該文件并沒(méi)有再次被修改。
    所以說(shuō),在路徑下新復(fù)制一個(gè)文件,取決于這個(gè)被復(fù)制的文件的上次修改時(shí)間,與全局修改時(shí)間的大小,如果更大,則不會(huì)被忽略,相反會(huì)被忽略。

  • 目錄類(lèi)型,遞歸調(diào)用listEligibleFiles方法


b、從a步驟中獲取內(nèi)容又發(fā)生變化的文件后,調(diào)用getInputSplitsSortedByModTime


注意這里使用的數(shù)據(jù)結(jié)果為T(mén)reeMap結(jié)構(gòu),key就是修改時(shí)間的時(shí)間戳,value是文件輸入分片 TimestampedFileInputSplit實(shí)例的列表。
因?yàn)門(mén)reeMap存儲(chǔ),所以會(huì)將最新時(shí)間戳放在最前面。

c、獲取到TreeMap之后,進(jìn)行遍歷,將分片分配給下游,更新全局更新時(shí)間。



(3)根據(jù)文件格式創(chuàng)建ContinuousFileReaderOperator實(shí)例
讀取從前面ContinuousFileMonitoringFunction返回的的修改的文件分片(TimestampedFileInputSplit實(shí)例);
與ContinuousFileMonitoringFunction相反并行度為1,此運(yùn)算符的并行度>1;
一旦收到文件splits描述符,它就會(huì)被放入隊(duì)列中,并具有另一個(gè)線程讀取拆分的實(shí)際數(shù)據(jù)。 這種架構(gòu)允許將從發(fā)出檢查點(diǎn)障礙的那一側(cè)讀取線程,從而消除任何潛在的背壓。

(4)最后返回?cái)?shù)據(jù)源


3.總結(jié)

flink定時(shí)監(jiān)控文件在某些場(chǎng)景十分有用,比如說(shuō)動(dòng)態(tài)修改某配置文件,在風(fēng)險(xiǎn)監(jiān)控場(chǎng)景動(dòng)態(tài)修改告警規(guī)則等,再與流廣播配合使用那就更香了,達(dá)到不需要停止實(shí)時(shí)任務(wù)而能夠改變某些配置。
至于問(wèn)題所說(shuō),作者在進(jìn)行實(shí)踐中,復(fù)制某個(gè)文件之后,目錄下明明多了一個(gè)文件,卻并沒(méi)有將該文件的內(nèi)容讀取輸出,相信大家看了源碼分析的內(nèi)容應(yīng)該有了答案:
因?yàn)閺?fù)制的文件,該文件的最新修改時(shí)間是與源文件保持一致的,并不是你復(fù)制的時(shí)間,所以這個(gè)修改時(shí)間可能就小于全局更新時(shí)間(globalModificationTime),而不被認(rèn)為是新文件,所以在listEligibleFiles那一步就被忽略了。
如果你是
(1)新創(chuàng)建的文件,
(2)或者說(shuō)復(fù)制的源文件的修改時(shí)間是大于globalModificationTime,
(3)再或者你復(fù)制文件后,編輯下文件,更新下文件修改時(shí)間
最后肯定會(huì)輸出該文件的內(nèi)容。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容