1. 前言
最近有需求要定時(shí)監(jiān)控文件,如果文件內(nèi)容發(fā)生變化,就要?jiǎng)討B(tài)地獲取新內(nèi)容,于是就準(zhǔn)備使用 env.readFile方法,
(1)當(dāng)你監(jiān)控一個(gè)文件時(shí),當(dāng)文件內(nèi)容發(fā)生變化,會(huì)將文件的整個(gè)內(nèi)容作為流輸出。
(2)當(dāng)你監(jiān)控一個(gè)目錄的時(shí)候,發(fā)現(xiàn)當(dāng)你復(fù)制一個(gè)文件,而沒(méi)有改變這個(gè)文件的內(nèi)容,并不會(huì)監(jiān)控到也不會(huì)輸出內(nèi)容,只有某文件內(nèi)容發(fā)生改變時(shí),才會(huì)將該文件的所有內(nèi)容輸出(也不是這個(gè)目錄下的所有文件內(nèi)容輸出)
因此,上述內(nèi)容底層到底是如何實(shí)現(xiàn)的呢?是根據(jù)什么判定該文件或者該目錄發(fā)生了變化呢?
2.源碼分析
(1) env.readFile方法
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation)
具體實(shí)現(xiàn)中是調(diào)用方法
createFileInput(FileInputFormat<OUT> inputFormat, TypeInformation<OUT> typeInfo, String sourceName, FileProcessingMode monitoringMode, long interval)

(2)ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction(inputFormat, monitoringMode, this.getParallelism(), interval);
ContinuousFileMonitoringFunction類(lèi)的作用:
這是一個(gè)單個(gè)(非并行)監(jiān)視任務(wù),主要負(fù)責(zé):
a、監(jiān)視用戶提供的路徑
b、確定應(yīng)該進(jìn)一步讀取和處理哪些文件
c、創(chuàng)建與那些文件相對(duì)應(yīng)的文件輸入片
d、將它們分配給下游任務(wù)進(jìn)一步處理(分片分配到下游任務(wù),可以超過(guò)單個(gè)并行度)
注意:分片被轉(zhuǎn)發(fā)到下游時(shí),基于它們所屬文件的修改時(shí)間,以便按修改時(shí)間升序進(jìn)行讀取。
2.1 該類(lèi)的構(gòu)造器

其中,定義了分片后下游讀取的并行度readerParallelism(算子定義的并行度和1中取最大值);
還有默認(rèn)的全局修改時(shí)間globalModificationTime(初始是無(wú)窮小值)
另外,一次只能監(jiān)控一個(gè)路徑,不能同時(shí)監(jiān)控多個(gè)不同的路徑。
2.2 run方法
run方法是實(shí)現(xiàn)接口SourceFunction實(shí)現(xiàn)的方法,是用來(lái)向下游輸出元素的。
a、首先是根據(jù)文件路徑進(jìn)行初始化,獲取checkpoint鎖
這個(gè)鎖的作用是保證對(duì)state進(jìn)行checkpoint和update的操作,與元素的輸出不是同步完成的,需要在synchronized塊中。
@Override
public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
Path p = new Path(path);
FileSystem fileSystem = FileSystem.get(p.toUri());
if (!fileSystem.exists(p)) {
throw new FileNotFoundException("The provided file path " + path + " does not exist.");
}
b、根據(jù)watchType,決定如何向下游輸出元素。
PROCESS_CONTINUOUSLY類(lèi)型:
while循環(huán),在獲得checkpointLock時(shí),調(diào)用monitorDirAndForwardSplits方法,并且線程睡眠interval毫秒,再去再次調(diào)用monitorDirAndForwardSplits方法。PROCESS_ONCE類(lèi)型:
調(diào)用一次monitorDirAndForwardSplits方法,將isRunning置為false。
switch (watchType) {
case PROCESS_CONTINUOUSLY:
while (isRunning) {
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
}
Thread.sleep(interval);
}
break;
case PROCESS_ONCE:
synchronized (checkpointLock) {
if (globalModificationTime == Long.MIN_VALUE) {
monitorDirAndForwardSplits(fileSystem, context);
globalModificationTime = Long.MAX_VALUE;
}
isRunning = false;
}
break;
default:
isRunning = false;
throw new RuntimeException("Unknown WatchType" + watchType);
2.3 monitorDirAndForwardSplits方法
Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(path));
Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);
for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
long modificationTime = splits.getKey();
for (TimestampedFileInputSplit split: splits.getValue()) {
LOG.info("Forwarding split: " + split);
context.collect(split);
}
// update the global modification time
globalModificationTime = Math.max(globalModificationTime, modificationTime);
}
a、首先調(diào)用listEligibleFiles方法
FileStatus[] statuses=fileSystem.listStatus(path)獲取路徑下所有文件夾/文件的狀態(tài);
文件類(lèi)型,獲取最新的修改時(shí)間,調(diào)用shouldIgnore方法(該方法里判定modificationTime <= globalModificationTime時(shí),返回true,應(yīng)該忽略)
就是說(shuō)某個(gè)文件的修改時(shí)間,是小于等于全局最近修改時(shí)間的時(shí)候,就會(huì)忽略了,認(rèn)為該文件并沒(méi)有再次被修改。
所以說(shuō),在路徑下新復(fù)制一個(gè)文件,取決于這個(gè)被復(fù)制的文件的上次修改時(shí)間,與全局修改時(shí)間的大小,如果更大,則不會(huì)被忽略,相反會(huì)被忽略。-
目錄類(lèi)型,遞歸調(diào)用listEligibleFiles方法
b、從a步驟中獲取內(nèi)容又發(fā)生變化的文件后,調(diào)用getInputSplitsSortedByModTime
- 其中有createInputSplits方法,該方法比較復(fù)雜,定義在對(duì)應(yīng)的FileInputFormat類(lèi)中,該方法的作用主要是給文件計(jì)算分片,返回的是FileInputSplit列表。
詳細(xì)內(nèi)容請(qǐng)參考另一篇文章 Flink在加載文件數(shù)據(jù)源時(shí),如何創(chuàng)建分片呢?

注意這里使用的數(shù)據(jù)結(jié)果為T(mén)reeMap結(jié)構(gòu),key就是修改時(shí)間的時(shí)間戳,value是文件輸入分片 TimestampedFileInputSplit實(shí)例的列表。
因?yàn)門(mén)reeMap存儲(chǔ),所以會(huì)將最新時(shí)間戳放在最前面。
c、獲取到TreeMap之后,進(jìn)行遍歷,將分片分配給下游,更新全局更新時(shí)間。

(3)根據(jù)文件格式創(chuàng)建ContinuousFileReaderOperator實(shí)例
讀取從前面ContinuousFileMonitoringFunction返回的的修改的文件分片(TimestampedFileInputSplit實(shí)例);
與ContinuousFileMonitoringFunction相反并行度為1,此運(yùn)算符的并行度>1;
一旦收到文件splits描述符,它就會(huì)被放入隊(duì)列中,并具有另一個(gè)線程讀取拆分的實(shí)際數(shù)據(jù)。 這種架構(gòu)允許將從發(fā)出檢查點(diǎn)障礙的那一側(cè)讀取線程,從而消除任何潛在的背壓。
(4)最后返回?cái)?shù)據(jù)源

3.總結(jié)
flink定時(shí)監(jiān)控文件在某些場(chǎng)景十分有用,比如說(shuō)動(dòng)態(tài)修改某配置文件,在風(fēng)險(xiǎn)監(jiān)控場(chǎng)景動(dòng)態(tài)修改告警規(guī)則等,再與流廣播配合使用那就更香了,達(dá)到不需要停止實(shí)時(shí)任務(wù)而能夠改變某些配置。
至于問(wèn)題所說(shuō),作者在進(jìn)行實(shí)踐中,復(fù)制某個(gè)文件之后,目錄下明明多了一個(gè)文件,卻并沒(méi)有將該文件的內(nèi)容讀取輸出,相信大家看了源碼分析的內(nèi)容應(yīng)該有了答案:
因?yàn)閺?fù)制的文件,該文件的最新修改時(shí)間是與源文件保持一致的,并不是你復(fù)制的時(shí)間,所以這個(gè)修改時(shí)間可能就小于全局更新時(shí)間(globalModificationTime),而不被認(rèn)為是新文件,所以在listEligibleFiles那一步就被忽略了。
如果你是
(1)新創(chuàng)建的文件,
(2)或者說(shuō)復(fù)制的源文件的修改時(shí)間是大于globalModificationTime,
(3)再或者你復(fù)制文件后,編輯下文件,更新下文件修改時(shí)間
最后肯定會(huì)輸出該文件的內(nèi)容。
