Flume Taildir Source源碼修改---監(jiān)聽目錄

一、需求

flume taildir源碼只支持監(jiān)控一級目錄下的文件,能不能支持監(jiān)聽子目錄下的所有文件
flume版本:1.9.0

二、思路

源碼中org.apache.flume.source.taildir.TaildirMatcher

private List<File> getMatchingFilesNoCache() {
    List<File> result = Lists.newArrayList();
    try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {
      for (Path entry : stream) {
        result.add(entry.toFile());
      }
    } catch (IOException e) {
      logger.error("I/O exception occurred while listing parent directory. " +
                   "Files already matched will be returned. " + parentDir.toPath(), e);
    }
    return result;
  }

這段代碼就是獲取滿足條件的文件,并添加到result中。但是Files.newDirectoryStream方法只能獲取當前目錄下的。
所有我們需要實現(xiàn)目錄遞歸,獲取子目錄下的文件

三、實現(xiàn)

try {
            Files.walkFileTree(parentDir.toPath(),  new SimpleFileVisitor<Path>() {
                @Override
                public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                    DirectoryStream<Path> stream = Files.newDirectoryStream(dir, fileFilter);
                    for (Path entry : stream) {
                        result.add(entry.toFile());
                    }

                    return FileVisitResult.CONTINUE;
                }
            });
        } catch (IOException e) {
            logger.error("I/O exception occurred while listing parent directory. " +
                    "Files already matched will be returned. " + parentDir.toPath(), e);
        }

四、測試

  • 編譯
cd apache-flume-1.9.0-src\flume-ng-sources\flume-taildir-source
mvn clean package
  • 測試
#define agent
taildir-hdfs-agent.sources=taildir-source
taildir-hdfs-agent.channels=taildir-memory-channel
taildir-hdfs-agent.sinks=hdfs-sink

#define source
taildir-hdfs-agent.sources.taildir-source.type=TAILDIR
taildir-hdfs-agent.sources.taildir-source.filegroups=f1
taildir-hdfs-agent.sources.taildir-source.filegroups.f1=/root/data/.*log.*
taildir-hdfs-agent.sources.taildir-source.positionFile=/root/position/taildir_position.json

#define channel
taildir-hdfs-agent.channels.taildir-memory-channel.type=memory

#define sink 
taildir-hdfs-agent.sinks.hdfs-sink.type=logger


#bind source and sink to channel
taildir-hdfs-agent.sources.taildir-source.channels=taildir-memory-channel
taildir-hdfs-agent.sinks.hdfs-sink.channel=taildir-memory-channel
./bin/flume-ng agent -n taildir-hdfs-agent -f /root/apache-flume-1.9.0-bin/bin/taildir-memory-logger.conf -c /root/apache-flume-1.9.0-bin/bin/conf -Dflume.root.logger=INFO,console
.
├── a.log
└── sub
    └── b.log
echo "2" >> b.log 
2019-10-31 15:08:17,071 INFO sink.LoggerSink: Event: { headers:{} body: 32                                              2 }
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容