flume監(jiān)控inode變化的文件

由于要實時讀取redis的AOF文件,但是flume的taildir source在監(jiān)控文件的時候,如果文件的inode變化了,那么會出現(xiàn)重復(fù)讀取數(shù)據(jù)的情況,這里可以通過修改flume taildir源碼解決,只針對讀一個文件的情況。

  1. 去flume官網(wǎng)下載flume源碼下載
  2. 解壓后在idea中打開如下

配置好maven,到flume-ng-source中找到ReliableTaildirEventReader

  1. 找到updateTailFiles方法
/**
     * Update tailFiles mapping if a new file is created or appends are detected
     * to the existing file.
     */
    public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
        pass
        ...
        ...
        for (TaildirMatcher taildir : taildirCache) {
            long inode = getInode(f);
            TailFile tf = tailFiles.get(inode);
            //判斷是否是新文件,inode或者文件名不同就認(rèn)為是新文件
            if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
                long startPos = skipToEnd ? f.length() : 0;
                System.out.println(tf);
                if (tf != null) {
                    inode = tf.getInode();
                }
                //找到這行,startPos是讀取文件的位置,當(dāng)有新文件時會從0開始讀
                //tf = openFile(f, headers, inode, startPos);
                //改成??,f.length()是此時讀到的位置
                tf = openFile(f, headers, inode, f.length());
            } else {

繼續(xù)找到TaildirSource類

private String toPosInfoJson() {
    @SuppressWarnings("rawtypes")
    List<Map> posInfos = Lists.newArrayList();
    for (Long inode : existingInodes) {
        TailFile tf = reader.getTailFiles().get(inode);
        //這里會將新的inode寫到位置文件中
        //posInfos.add(ImmutableMap.of("inode", inode, "pos", tf.getPos(), "file", tf.getPath()));
        //改成??,inode=0
        posInfos.add(ImmutableMap.of("inode", 0, "pos", tf.getPos(), "file", tf.getPath()));
    }
    return new Gson().toJson(posInfos);
}

右側(cè)找到package打包,或者進入到項目目錄用mvn package打包,復(fù)制target中的flume-taildir-source-1.8.0.jar到flume中的lib下即可,這樣即使文件的inode變化,也可以繼續(xù)讀
另外,我是把taildir這個包又復(fù)制了一份,這樣在flume的配置中

a1.sources.r1.type = org.apache.flume.source.taildir2.TaildirSource
直接指定修改后的類名,這樣不會影響原來的TAILDIR

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容