備注:本文簡(jiǎn)單實(shí)現(xiàn)了一個(gè)計(jì)數(shù)功能的攔截器,針對(duì)每個(gè)event用線程安全的AtomicLong類進(jìn)行計(jì)數(shù),并將計(jì)數(shù)count寫入到輸出的header中。
flume版本:1.6
JDK:1.7
1編碼
編寫攔截器,只需要寫一個(gè)實(shí)現(xiàn)Interceptor接口類,在該類中還要實(shí)現(xiàn)一個(gè)Builder的靜態(tài)類,builder類用來(lái)實(shí)例化interceptor,并將Context實(shí)例配置給攔截器。
在idea中新建項(xiàng)目,依賴包導(dǎo)入:

Jdk要設(shè)置成1.7版本的,否則運(yùn)行的時(shí)候會(huì)報(bào)錯(cuò),因?yàn)閒lume1.6是jdk1.7編譯的。
代碼如下,實(shí)現(xiàn)Interceptor,然后實(shí)現(xiàn)靜態(tài)類Builder:
package com.open01.flume.interceptors;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
/**
* Created by caolch on 2017/3/9.
*/
public class TestInterceptor implements Interceptor{
private final String headerKey;
private static final String CONF_HEADER_KEY = "header";
private static final String DEFAULT_HEADER = "count";
private final AtomicLong currentCount;
private TestInterceptor(Context ctx) {
headerKey = ctx.getString(CONF_HEADER_KEY,DEFAULT_HEADER);
currentCount = new AtomicLong(0);
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
long count = currentCount.incrementAndGet();
event.getHeaders().put(headerKey,String.valueOf(count));
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event e:events) {
intercept(e);
}
return events;
}
@Override
public void close() {
}
public static class CounterInterceptorBuilder implements Builder {
private Context ctx;
@Override
public Interceptor build() {
return new TestInterceptor(ctx);
}
@Override
public void configure(Context context) {
this.ctx = context;
}
}
}
方法intercept(Event event)是具體執(zhí)行解析的方法,將count自增1,然后寫入到該條event的headers中。
2配置
編譯項(xiàng)目生成jar包,將jar包放入到flume的lib目錄下。
配置conf文件如下:

為sources指定自定義的攔截器,配置的時(shí)候一定要寫類的全路徑,并且用”$“符號(hào)分割加上自定義Builder的類名。
執(zhí)行flume的agent,在telnet端輸入數(shù)據(jù),可以看到每條event的header中都會(huì)添加count。
