Flume 高級 —— source 自定義

前言

前面我們已經(jīng)說過了flume的簡單入門,這篇文章繼續(xù)深入,來熟悉下source,并通過自定義 source 來了解其工作原理,接下來的一系列文章都會以flume的各個小組件慢慢深入,歡迎和我一起學(xué)習(xí)

source 到 channel 過程

source-to-channel.png
  1. 上圖大致描述了 source 收集到數(shù)據(jù)推送到 channel 的基本過程,可以發(fā)現(xiàn)中間多出了一個 channel processor 的組件
  2. source 收集到的數(shù)據(jù)會經(jīng)過攔截器鏈進(jìn)行過濾,然后通過channel selector 發(fā)送到對應(yīng)的 channel,從中我們可以想到,如果你要對數(shù)據(jù)進(jìn)行一些特別的處理,可以自定義攔截器來進(jìn)行數(shù)據(jù)清洗,當(dāng)然不建議太過復(fù)雜的處理,否則你的 flume 將會和蝸牛一樣慢,你還可以通過 selector 來控制數(shù)據(jù)儲存的 channel。

source 是如何產(chǎn)生數(shù)據(jù)的

source 分為兩大類:PollableSource 和 EventDrivenSource,不過筆者倒是沒怎么弄清楚,這兩大類區(qū)分的目的何在?如果你有什么想法,歡迎留言指教。

  • PollableSource
public interface PollableSource extends Source {

  public Status process() throws EventDeliveryException;
  public static enum Status {READY, BACKOFF}
}
  1. 當(dāng)一個agent 啟動之后,就會不斷循環(huán)調(diào)用 process 以獲取數(shù)據(jù)
  2. 當(dāng) process 返回 READY,表示數(shù)據(jù)產(chǎn)生正常,如果是 BACKOFF 則表示異常,當(dāng)產(chǎn)生異常時候,agent 會等待一段時間再來調(diào)用 process,異常次數(shù)越多,間隔時間越長,最長不超過 5s。
  3. 自帶一個線程,工作都是在自己的獨(dú)立線程之內(nèi)的
  • EventDrivenSource
public interface EventDrivenSource extends Source
  1. 簡單的一個標(biāo)記接口,區(qū)分 PollableSource
  • 運(yùn)行流程
  1. 當(dāng)一個agent啟動時候,會開始執(zhí)行 application 的 main() 方法
  2. 進(jìn)程啟動之后,會通過 AbstractConfigurationProvider$getConfiguration解析配置文件中的各個組件和屬性
  3. 針對 source 會生成 sourceRunner 通過 supervisor 來運(yùn)行和管理其生命周期。
  4. source 的生命周期 start 方法正式開始執(zhí)行,這樣也就到了我們將要自定義代碼的實(shí)現(xiàn)執(zhí)行了。

這里只是大概說了一下流程,具體詳情還是需要自己看源碼的,我們的目的就是梳理一下整個流程,知道自己一個大概就好了,深究反而落的下乘,同時也是為了接下來自定義 source 打個基礎(chǔ),知道我們自己寫的東西是怎么運(yùn)行的。

自定義source

  1. 創(chuàng)建一個類,繼承自 AbstractSource 并實(shí)現(xiàn) Configurable 和( EventDrivenSource 或者PollableSource )
  2. 實(shí)現(xiàn)相關(guān)方法,以下是簡單的一個生成序列的source
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package com.inveno.flume;

import java.util.ArrayList;
import java.util.List;

import com.google.common.collect.ImmutableMap;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequenceSource extends AbstractSource implements Configurable ,EventDrivenSource {

    private static final Logger logger = LoggerFactory
            .getLogger(SequenceSource.class);

    private long seq;
    private int batchSize = 10;
    private List<Event> batchArrayList = new ArrayList<>();


    @Override
    public void configure(Context context) {
        //自定義配置屬性
        batchSize = context.getInteger("batchSize", 1);
        //打印自定義屬性
        ImmutableMap<String, String> map = context.getParameters();
        for (String s : map.keySet()) {
            logger.warn(s + "==============configure=============================" + map.get(s));
        }
    }

    private void process(){
        try {
            batchArrayList.add(EventBuilder.withBody(String.valueOf(seq++).getBytes()));
            if(batchArrayList.size()>=batchSize){
                getChannelProcessor().processEventBatch(batchArrayList);
                batchArrayList.clear();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void start() {
        super.start();
        //開啟一個線程來生產(chǎn)數(shù)據(jù),當(dāng)然你也可以整個線程池
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()){
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        //這里有個java知識點(diǎn) ,InterruptedException捕獲后,
                        // 這個標(biāo)記點(diǎn)會被重置 ,需要再次 interrupt才能正確退出
                        Thread.currentThread().interrupt();
                    }
                    process();
                }
            }
        }).start();
        logger.debug("==========================start");
    }

    @Override
    public void stop() {
        super.stop();
        logger.info("==========================stop", getName());
    }

}
  1. 我們在 configure方法里面獲取配置屬性-batchsize。
  2. 我們上面說過,source 最后開始會被調(diào)用start 方法,我們在start 方法里面開啟一個線程,實(shí)現(xiàn)循環(huán)產(chǎn)生消息,并隔 batchsize 個消息就推送到 channel 里面。
  3. 這樣一個簡單的生產(chǎn) source 就完成了
  4. 如果想實(shí)現(xiàn) PollableSource 類型的 source ,只是不需要自己開啟線程罷了,其余都差不多,就是這么簡單。

上面我們自定義了一個 source,事件是交給 flume 自帶的 ChannelProcessor 自己處理的,下一節(jié),我們來說說 ChannelProcessor 相關(guān)細(xì)節(jié)

寫在忘記后

搞了半天忘記寫部署自定義代碼了。。。抱歉!?。?/p>

  1. 首先將代碼打 jar 包
  2. 放到 FLUME_HOME 目錄的 lib 文件夾下
  3. 以下是配置文件
  • example.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.inveno.flume.SequenceSource 
a1.sources.r1.batchSize = 5

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  1. 啟動腳本:bin/flume-ng agent --conf conf --conf-file ./my-conf/example.conf --name a1 -Dflume.root.logger=INFO,console

  2. 大功告成。。。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容