前言
前面我們已經(jīng)說過了flume的簡單入門,這篇文章繼續(xù)深入,來熟悉下source,并通過自定義 source 來了解其工作原理,接下來的一系列文章都會以flume的各個小組件慢慢深入,歡迎和我一起學(xué)習(xí)
source 到 channel 過程

source-to-channel.png
- 上圖大致描述了 source 收集到數(shù)據(jù)推送到 channel 的基本過程,可以發(fā)現(xiàn)中間多出了一個 channel processor 的組件
- source 收集到的數(shù)據(jù)會經(jīng)過攔截器鏈進(jìn)行過濾,然后通過channel selector 發(fā)送到對應(yīng)的 channel,從中我們可以想到,如果你要對數(shù)據(jù)進(jìn)行一些特別的處理,可以自定義攔截器來進(jìn)行數(shù)據(jù)清洗,當(dāng)然不建議太過復(fù)雜的處理,否則你的 flume 將會和蝸牛一樣慢,你還可以通過 selector 來控制數(shù)據(jù)儲存的 channel。
source 是如何產(chǎn)生數(shù)據(jù)的
source 分為兩大類:PollableSource 和 EventDrivenSource,不過筆者倒是沒怎么弄清楚,這兩大類區(qū)分的目的何在?如果你有什么想法,歡迎留言指教。
- PollableSource
public interface PollableSource extends Source {
public Status process() throws EventDeliveryException;
public static enum Status {READY, BACKOFF}
}
- 當(dāng)一個agent 啟動之后,就會不斷循環(huán)調(diào)用 process 以獲取數(shù)據(jù)
- 當(dāng)
process返回 READY,表示數(shù)據(jù)產(chǎn)生正常,如果是 BACKOFF 則表示異常,當(dāng)產(chǎn)生異常時候,agent 會等待一段時間再來調(diào)用process,異常次數(shù)越多,間隔時間越長,最長不超過 5s。 - 自帶一個線程,工作都是在自己的獨(dú)立線程之內(nèi)的
- EventDrivenSource
public interface EventDrivenSource extends Source
- 簡單的一個標(biāo)記接口,區(qū)分 PollableSource
- 運(yùn)行流程
- 當(dāng)一個agent啟動時候,會開始執(zhí)行 application 的
main()方法 - 進(jìn)程啟動之后,會通過
AbstractConfigurationProvider$getConfiguration解析配置文件中的各個組件和屬性 - 針對 source 會生成 sourceRunner 通過
supervisor來運(yùn)行和管理其生命周期。 - source 的生命周期
start方法正式開始執(zhí)行,這樣也就到了我們將要自定義代碼的實(shí)現(xiàn)執(zhí)行了。
這里只是大概說了一下流程,具體詳情還是需要自己看源碼的,我們的目的就是梳理一下整個流程,知道自己一個大概就好了,深究反而落的下乘,同時也是為了接下來自定義 source 打個基礎(chǔ),知道我們自己寫的東西是怎么運(yùn)行的。
自定義source
- 創(chuàng)建一個類,繼承自
AbstractSource并實(shí)現(xiàn)Configurable和(EventDrivenSource或者PollableSource) - 實(shí)現(xiàn)相關(guān)方法,以下是簡單的一個生成序列的source
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.inveno.flume;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableMap;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SequenceSource extends AbstractSource implements Configurable ,EventDrivenSource {
private static final Logger logger = LoggerFactory
.getLogger(SequenceSource.class);
private long seq;
private int batchSize = 10;
private List<Event> batchArrayList = new ArrayList<>();
@Override
public void configure(Context context) {
//自定義配置屬性
batchSize = context.getInteger("batchSize", 1);
//打印自定義屬性
ImmutableMap<String, String> map = context.getParameters();
for (String s : map.keySet()) {
logger.warn(s + "==============configure=============================" + map.get(s));
}
}
private void process(){
try {
batchArrayList.add(EventBuilder.withBody(String.valueOf(seq++).getBytes()));
if(batchArrayList.size()>=batchSize){
getChannelProcessor().processEventBatch(batchArrayList);
batchArrayList.clear();
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
//開啟一個線程來生產(chǎn)數(shù)據(jù),當(dāng)然你也可以整個線程池
new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
//這里有個java知識點(diǎn) ,InterruptedException捕獲后,
// 這個標(biāo)記點(diǎn)會被重置 ,需要再次 interrupt才能正確退出
Thread.currentThread().interrupt();
}
process();
}
}
}).start();
logger.debug("==========================start");
}
@Override
public void stop() {
super.stop();
logger.info("==========================stop", getName());
}
}
- 我們在
configure方法里面獲取配置屬性-batchsize。 - 我們上面說過,source 最后開始會被調(diào)用start 方法,我們在start 方法里面開啟一個線程,實(shí)現(xiàn)循環(huán)產(chǎn)生消息,并隔 batchsize 個消息就推送到 channel 里面。
- 這樣一個簡單的生產(chǎn) source 就完成了
- 如果想實(shí)現(xiàn)
PollableSource類型的 source ,只是不需要自己開啟線程罷了,其余都差不多,就是這么簡單。
上面我們自定義了一個 source,事件是交給 flume 自帶的 ChannelProcessor 自己處理的,下一節(jié),我們來說說 ChannelProcessor 相關(guān)細(xì)節(jié)
寫在忘記后
搞了半天忘記寫部署自定義代碼了。。。抱歉!?。?/p>
- 首先將代碼打 jar 包
- 放到 FLUME_HOME 目錄的 lib 文件夾下
- 以下是配置文件
- example.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.inveno.flume.SequenceSource
a1.sources.r1.batchSize = 5
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動腳本:bin/flume-ng agent --conf conf --conf-file ./my-conf/example.conf --name a1 -Dflume.root.logger=INFO,console
大功告成。。。