flume自定義攔截器學(xué)習(xí)

備注:本文簡(jiǎn)單實(shí)現(xiàn)了一個(gè)計(jì)數(shù)功能的攔截器,針對(duì)每個(gè)event用線程安全的AtomicLong類進(jìn)行計(jì)數(shù),并將計(jì)數(shù)count寫入到輸出的header中。
flume版本:1.6
JDK:1.7

1編碼

編寫攔截器,只需要寫一個(gè)實(shí)現(xiàn)Interceptor接口類,在該類中還要實(shí)現(xiàn)一個(gè)Builder的靜態(tài)類,builder類用來(lái)實(shí)例化interceptor,并將Context實(shí)例配置給攔截器。
在idea中新建項(xiàng)目,依賴包導(dǎo)入:

Jdk要設(shè)置成1.7版本的,否則運(yùn)行的時(shí)候會(huì)報(bào)錯(cuò),因?yàn)閒lume1.6是jdk1.7編譯的。
代碼如下,實(shí)現(xiàn)Interceptor,然后實(shí)現(xiàn)靜態(tài)類Builder:

package com.open01.flume.interceptors;

import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
/**
 * Created by caolch on 2017/3/9.
 */
public class TestInterceptor implements Interceptor{
    private final String headerKey;
    private static final String CONF_HEADER_KEY = "header";
    private static final String DEFAULT_HEADER = "count";
    private final AtomicLong currentCount;

    private TestInterceptor(Context ctx) {
        headerKey = ctx.getString(CONF_HEADER_KEY,DEFAULT_HEADER);
        currentCount = new AtomicLong(0);
    }

    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        long count = currentCount.incrementAndGet();
        event.getHeaders().put(headerKey,String.valueOf(count));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event e:events) {
            intercept(e);
        }
        return events;
    }

    @Override
    public void close() {
    }
    public static class CounterInterceptorBuilder implements Builder {
        private Context ctx;

        @Override
        public Interceptor build() {
            return new TestInterceptor(ctx);
        }

        @Override
        public void configure(Context context) {
            this.ctx = context;
        }
    }
}

方法intercept(Event event)是具體執(zhí)行解析的方法,將count自增1,然后寫入到該條event的headers中。

2配置

編譯項(xiàng)目生成jar包,將jar包放入到flume的lib目錄下。
配置conf文件如下:

為sources指定自定義的攔截器,配置的時(shí)候一定要寫類的全路徑,并且用”$“符號(hào)分割加上自定義Builder的類名。
執(zhí)行flume的agent,在telnet端輸入數(shù)據(jù),可以看到每條event的header中都會(huì)添加count。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,502評(píng)論 19 139
  • 攔截器是Struts2框架的核心,它主要完成解析請(qǐng)求參數(shù)、將請(qǐng)求參數(shù)賦值給Action屬性、執(zhí)行數(shù)據(jù)校驗(yàn)、文件上傳...
    重山楊閱讀 4,074評(píng)論 2 13
  • iOS網(wǎng)絡(luò)架構(gòu)討論梳理整理中。。。 其實(shí)如果沒(méi)有APIManager這一層是沒(méi)法使用delegate的,畢竟多個(gè)單...
    yhtang閱讀 5,466評(píng)論 1 23
  • Struts2的核心在于它復(fù)雜的攔截器,幾乎70%的工作都是由攔截器完成的。比如我們之前用于將上傳的文件對(duì)應(yīng)于ac...
    Single_YAM閱讀 636評(píng)論 0 6
  • 徹夜難眠,輾轉(zhuǎn)反側(cè),沉浸在傷感回憶中難以自拔。去年今日,我和他分手告別,彼此沒(méi)有回頭,沒(méi)有挽留,就這樣松開(kāi)了緊...
    Aliyahfang閱讀 336評(píng)論 0 0

友情鏈接更多精彩內(nèi)容