flume-ng添加自定義攔截器

業(yè)務(wù)場景:收集nginx日志中個(gè)別信息進(jìn)入kafka,為了避免kafka壓力過大,這里優(yōu)化了兩點(diǎn)
  • 刷選掉不需要分析的數(shù)據(jù)進(jìn)入kafka
  • 盡量把消息均勻分布在不同的broker上
刷選數(shù)據(jù)
  • 過濾掉不需要的數(shù)據(jù)
  • 自定義Interceptor
<!-- 這里是maven配置 -->
<!-- 我們用的是1.6.0版本 -->
 <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
       <version>1.6.0</version>
    </dependency>
//只保留兩個(gè)接口的數(shù)據(jù)
package deng.yb.flume_ng_Interceptor;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

public class MyInterceptor implements Interceptor {
    /**
     * epp接口-request
     */
    private final String EPP_REQUEST = "POST /api/sky_server_data_app/track/user_time HTTP/1.1";

    /**
     * app接口-request
     */
    private final String APP_REQUEST = "POST /api/sky_server_data_app/code/app_code HTTP/1.1";

    public void close() {
    }

    public void initialize() {
    }

    public Event intercept(Event event) {
        String body = new String(event.getBody(), Charsets.UTF_8);
        if (body.indexOf(EPP_REQUEST) > -1 || body.indexOf(APP_REQUEST) > -1) {
            event.setBody(body.toString().getBytes());
            return event;
        }
        return null;
    }

    public List<Event> intercept(List<Event> events) {
        List<Event> intercepted = new ArrayList<>(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }

    public static class Builder implements Interceptor.Builder {

        public void configure(Context arg0) {
            // TODO Auto-generated method stub
        }

        public Interceptor build() {
            return new MyInterceptor();
        }

    }
}

  • cdh flume配置修改,agent添加以下配置
epplog.sources.r1.interceptors=i1
epplog.sources.r1.interceptors.i1.type= deng.yb.flume_ng_Interceptor.MyInterceptor$Builder
  • 把自定義程序打好jar包放進(jìn)$FLUME_HOME/lib文件夾下
  • 重啟
  • 這樣flume到kafka的數(shù)據(jù)就是帥選的信息后的,避免了大量沒用信息到kafka導(dǎo)致IO問題
kafka均衡負(fù)載
  • 需要把消息均勻分布在不同brokers上,避免單臺broker節(jié)點(diǎn)壓力過大
  • 官方文檔這樣說
Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.
  • 大概意思是 - kafka-sink是從header里的key參數(shù)來確定將數(shù)據(jù)發(fā)到kafka的哪個(gè)分區(qū)中。如果為null,那么就會隨機(jī)發(fā)布至分區(qū)中。但我測試的結(jié)果是flume發(fā)布的數(shù)據(jù)會發(fā)布到一個(gè)分區(qū)中的
  • 向flume添加攔截器,會為每個(gè)event的head添加一個(gè)隨機(jī)唯一的key,我們需要向header中寫上隨機(jī)的key,然后數(shù)據(jù)才會真正的向kafka分區(qū)進(jìn)行隨機(jī)發(fā)布
  • flume的agent添加和修改以下配置
epplog.sources.r1.interceptors=i1 i2
epplog.sources.r1.interceptors.i1.type= deng.yb.flume_ng_Interceptor.MyInterceptor$Builder

epplog.sources.r1.interceptors.i2.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
epplog.sources.r1.interceptors.i2.headerName=key
epplog.sources.r1.interceptors.i2.preserveExisting=false
  • 創(chuàng)建topic
#分區(qū)數(shù)需要根據(jù)brokers的數(shù)量決定,最好是brokers的整數(shù)倍
kafka-topics --create  --zookeeper bi-slave1:2181,bi-slave2:2181,bi-master:2181 --replication-factor 1 --partitions 3 --topic epplog1
  • 修改flume的sink的topic,重啟flume

  • 看到消息


    測試結(jié)果.png
  • 可以看到,消息自動(dòng)uuid和帥選后的信息

  • 查看不同brokers該topic的分區(qū)
    1分區(qū)


    1分區(qū).png

    2分區(qū)


    2分區(qū).png

    3分區(qū)
    3分區(qū).png
  • 分區(qū)名格式為 topic-分區(qū)索引,索引從0開始算

  • 能看到,消息已經(jīng)相對均勻分布在3個(gè)分區(qū),也就是三臺機(jī)器上面,從而達(dá)到kafka負(fù)載均衡

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,899評論 13 425
  • kafka的定義:是一個(gè)分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,539評論 1 15
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,985評論 4 54
  • 2018年3月29日 星期四 天氣晴 (366) 一年一度的春季運(yùn)動(dòng)會在今天拉開了帷幕!為了這次運(yùn)動(dòng)會,我們家...
    倩軒兒閱讀 153評論 0 2

友情鏈接更多精彩內(nèi)容