簡(jiǎn)介
Flume是Cloudera提供的一個(gè)高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),F(xiàn)lume支持在日志系統(tǒng)中定制各類(lèi)數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時(shí),F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理,并寫(xiě)到各種數(shù)據(jù)接受方(可定制)的能力。

Flume 作為 cloudera 開(kāi)發(fā)的實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的認(rèn)可與廣泛應(yīng)用。Flume 初始的發(fā)行版本目前被統(tǒng)稱(chēng)為 Flume OG(original generation),屬于 cloudera。但隨著 FLume 功能的擴(kuò)展,F(xiàn)lume OG 代碼工程臃腫、核心組件設(shè)計(jì)不合理、核心配置不標(biāo)準(zhǔn)等缺點(diǎn)暴露出來(lái),尤其是在 Flume OG 的最后一個(gè)發(fā)行版本 0.94.0 中,日志傳輸不穩(wěn)定的現(xiàn)象尤為嚴(yán)重,為了解決這些問(wèn)題,2011 年 10 月 22 號(hào),cloudera 完成了 Flume-728,對(duì) Flume 進(jìn)行了里程碑式的改動(dòng):重構(gòu)核心組件、核心配置以及代碼架構(gòu),重構(gòu)后的版本統(tǒng)稱(chēng)為 Flume NG(next generation);改動(dòng)的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。

Flume的核心概念
l Event:是Flume數(shù)據(jù)傳輸?shù)幕締卧?。flume以事件的形式將數(shù)據(jù)從源頭傳送到最終的目的。Event由可選的hearders和載有數(shù)據(jù)的一個(gè)byte array構(gòu)成。
l Clinet:是一個(gè)將原始數(shù)據(jù)包裝成events并且發(fā)送它們到一個(gè)或多個(gè)agent的實(shí)體
l Agent:一個(gè)Agent包含Sources, Channels, Sinks和其他組件,它利用這些組件將events從一個(gè)節(jié)點(diǎn)傳輸?shù)搅硪粋€(gè)節(jié)點(diǎn)或最終目的。
l Source:負(fù)責(zé)接收events或通過(guò)特殊機(jī)制產(chǎn)生events,并將events批量的放到一個(gè)或多個(gè)Channels。
l Channel:位于Source和Sink之間,用于緩存進(jìn)來(lái)的events,當(dāng)Sink成功的將events發(fā)送到下一跳的channel或最終目的,events從Channel移除。
l Sink:負(fù)責(zé)將events傳輸?shù)较乱惶蜃罱K目的,成功完成后將events從channel移除。
其中Event是Flume數(shù)據(jù)傳輸?shù)幕締卧?。flume以事件的形式將數(shù)據(jù)從源頭傳送到最終的目的。Event由可選的hearders和載有數(shù)據(jù)的一個(gè)byte array構(gòu)成。
載有的數(shù)據(jù)對(duì)flume是不透明的
Headers是容納了key-value字符串對(duì)的無(wú)序集合,key在集合內(nèi)是唯一的。
Headers可以在上下文路由中使用擴(kuò)展


Flume以agent為最小的獨(dú)立運(yùn)行單位。一個(gè)agent就是一個(gè)JVM。單agent由Source、Sink和Channel三大組件構(gòu)成

Flume的數(shù)據(jù)流由事件(Event)貫穿始終。事件是Flume的基本數(shù)據(jù)單位,它攜帶日志數(shù)據(jù)(字節(jié)數(shù)組形式)并且攜帶有頭信息,這些Event由Agent外部的Source,比如上圖中的Web Server生成。當(dāng)Source捕獲事件后會(huì)進(jìn)行特定的格式化,然后Source會(huì)把事件推入(單個(gè)或多個(gè))Channel中。你可以把Channel看作是一個(gè)緩沖區(qū),它將保存事件直到Sink處理完該事件。Sink負(fù)責(zé)持久化日志或者把事件推向另一個(gè)Source。

Flume的數(shù)據(jù)流細(xì)化一下則是首先Source捕捉到外部進(jìn)來(lái)的Events,然后把Events提交給ChannelProcessor,ChannelProcessor先走一遍Interceptor(進(jìn)行一些過(guò)濾處理),然后通過(guò)ChannelSelector引用對(duì)象獲得Channel列表,使用事務(wù)方式把Events提交到Channel,因此Source的Events提交到Channel實(shí)際上是在ChannelProcessor中進(jìn)行的;而Sink則通過(guò)SinkProcessor去Channel中獲得Events并消費(fèi)Events,整個(gè)過(guò)程就是一個(gè)生產(chǎn)者消費(fèi)者模式。
高可靠傳輸實(shí)現(xiàn)原理
Flume使用事務(wù)的辦法來(lái)保證event的可靠傳遞。Source和Sink分別被封裝在事務(wù)中,這些事務(wù)由保存event的存儲(chǔ)提供或者由 Channel提供。這就保證了event在數(shù)據(jù)流的點(diǎn)對(duì)點(diǎn)傳輸中是可靠的。在多級(jí)數(shù)據(jù)流中,如下圖,上一級(jí)的Sink和下一級(jí)的Source都被包含在事務(wù)中,保證數(shù)據(jù)可靠地從一個(gè)Channel到另一個(gè)Channel轉(zhuǎn)移
其次,數(shù)據(jù)流中 Channel的持久性。Flume中MemoryChannel是可能丟失數(shù)據(jù)的(當(dāng)Agent死掉時(shí)),而FileChannel是持久性的,提供類(lèi)似mysql的日志機(jī)制,保證數(shù)據(jù)不丟失。

當(dāng)一個(gè)正常的Flow運(yùn)行時(shí),每個(gè)Agent中的Channel中的Events數(shù)量是均衡(消費(fèi)速度大于生產(chǎn)速度的情況),而一旦Agent直接出現(xiàn)故障,那么Channel就會(huì)暫時(shí)持有Events,直到故障恢復(fù)(MemoryChannel可能會(huì)丟失Events)。

Flume啟動(dòng)分析

Flume的Agent啟動(dòng)是從Application的main函數(shù)開(kāi)始的,首先把自己的實(shí)例注冊(cè)到了EventBus,然后通過(guò)LifecycleAware模式(類(lèi)似Tomcat的開(kāi)始結(jié)束模式),創(chuàng)建PollingPropertiesFileConfigurationProvider對(duì)象并執(zhí)行start()函數(shù),在start()函數(shù)中,通過(guò)線程池啟動(dòng)了一個(gè)FileWatcherRunnable任務(wù)去不斷的檢查啟動(dòng)文件是否修改,第一次或者發(fā)現(xiàn)文件修改了的時(shí)候就去讀取配置文件,并通過(guò)EvenBus的post()方法響應(yīng)讀取結(jié)果,而Application的主線程因?yàn)樽?cè)過(guò)EventBus,handleConfigurationEvent()函數(shù)獲得post()事件消息后就會(huì)執(zhí)行先stopAllComponents(),然后startAllComponents(conf),當(dāng)執(zhí)行startAllComponents函數(shù)的時(shí)候,就會(huì)啟動(dòng)channel、sink和source這三個(gè)核心組件,注意啟動(dòng)順序是先channel,后sink,最后source,這次才不會(huì)有消息丟失問(wèn)題發(fā)生。整個(gè)啟動(dòng)過(guò)程如圖所示。
簡(jiǎn)化來(lái)說(shuō):
第一步:Application主線程啟動(dòng),通過(guò)PollingPropertiesFileConfigurationProvider獲取source、sink、channel等配置信息
第二步:PollingPropertiesFileConfigurationProvider把配置信息通過(guò)事件總線廣播給Application主線程
第三步:Application主線程重新啟動(dòng)channel、sink和source

Source分析

Source的繼承關(guān)系類(lèi)圖如圖所示,所有source均實(shí)現(xiàn)自source接口,接口方法只有兩個(gè):setChannelProcessor()和getChannelProcessor(),所以所source具體的業(yè)務(wù)實(shí)現(xiàn)(比如把Events發(fā)送給Channel以及過(guò)程中的事務(wù)實(shí)現(xiàn)都是在ChannelProcessor中實(shí)現(xiàn)的)。

source又分為兩種類(lèi)型:EventDrivenSource和PollableSource,PollableSource主要用于接收外部驅(qū)動(dòng)程序的Events,比如來(lái)自Kafka的消息等,而其他source基本都是實(shí)現(xiàn)于EventDrivenSource,這種source不需要外部的驅(qū)動(dòng)程序pollEvents,而是有自己的事件監(jiān)控獲得Events,比如SpoolDirectorySource,它可以從磁盤(pán)中某個(gè)文件獲取文件更新數(shù)據(jù)。
以SpoolDirectorySource為例,其創(chuàng)建啟動(dòng)過(guò)程如下:
第一步:Application主線程啟動(dòng)的時(shí)候,通過(guò)AbstractConfigurationProvider(前面提到的PollingPropertiesFileConfigurationProvider的父類(lèi))獲取配置信息
第二步:當(dāng)判斷配置為SpoolDirectorySource時(shí),則通過(guò)SourceFactory實(shí)例化一個(gè)SpoolDirectorySource
第三步:AbstractConfigurationProvider調(diào)用實(shí)例化的SpoolDirectorySource對(duì)象的configure()進(jìn)行初始化配置
第四步:Application主線程通過(guò)SpoolDirectoryRunnable(SpoolDirectorySource的內(nèi)部類(lèi))啟動(dòng)source并且500毫秒執(zhí)行一次
如下圖所示:

注:Application.startAllComponents()啟動(dòng)source的時(shí)候其實(shí)最先啟動(dòng)的是SourceRunner,F(xiàn)lume有兩類(lèi)SourceRunner:EventDrivenSourceRunner和PollableSourceRunner,EventDrivenSourceRunner再啟動(dòng)具體類(lèi)型的source。
SpoolDirectorySource實(shí)例的每次執(zhí)行,則會(huì)讀取具體目錄下的文件,生成Event數(shù)據(jù),通過(guò)ChannelProcessor把Event放入Channel,同時(shí)對(duì)文件的讀取位置進(jìn)行標(biāo)記,下次則從標(biāo)記位置進(jìn)行讀取。如下圖所示。

Source提交Channel事務(wù)處理

之前也有說(shuō)明,source是通過(guò)channelProcessor來(lái)提交Events的,如下圖ChannelProcessor.class所示,channelprocessor先獲得一個(gè)事務(wù),然后開(kāi)啟事務(wù),之后才進(jìn)行event的提交操作,最后提交事務(wù),如果中間出現(xiàn)異常則進(jìn)行事務(wù)回滾。(finally中還有一個(gè)事務(wù)關(guān)閉)

ChannelProcessor.class

FileChannel內(nèi)部類(lèi)FileBackedTransaction處理put代碼參考
FileChannel處理Event的時(shí)序
Channel有兩種:MemoryChannel和FileChannel,這里以FileChannel為例,其調(diào)用時(shí)序如下圖所示:

BasicTransactionSemantics.put()->BasicTransactionSemantics.put()->FileBackedTransaction.doPut(),在FileBackedTransaction執(zhí)行doPut()操作的時(shí)候執(zhí)行了兩步操作:
1、 調(diào)用Log.put(),把Event寫(xiě)入實(shí)體文件
2、 調(diào)用FlumeEventQueue.addWithoutCommit(),把Event寫(xiě)入隊(duì)列以便sink獲取
Sink分析
Sink從Channel消費(fèi)Event,然后進(jìn)行轉(zhuǎn)移到收集/聚合層或存儲(chǔ)層,它的啟動(dòng)過(guò)程和source類(lèi)似是從Application的main主線程開(kāi)始的,通過(guò)AbstractConfigurationProvider獲取配置信息,然后通過(guò)SinkFactory實(shí)例化具體Sink,然后調(diào)用sink實(shí)例的configure進(jìn)行實(shí)例的初始化配置,最后通過(guò)SinkRunner啟動(dòng)Sink實(shí)例。
和Source不同的是SinkRunner不直接啟動(dòng)Sink實(shí)例,而是通過(guò)SinkProcessor異步啟動(dòng)的。

SinkProcessor主要有三種:
l DefaultSinkProcessor:默認(rèn)實(shí)現(xiàn),用于單個(gè)Sink的場(chǎng)景使用
l FailoverSinkProcessor:故障轉(zhuǎn)移實(shí)現(xiàn)
l LoadBalanceSinkProcessor:用于實(shí)現(xiàn)Sink的負(fù)載均衡
其類(lèi)的繼承關(guān)系如圖所示:

多個(gè)Sink可以構(gòu)成一個(gè)SinkGroup。一個(gè)Sink Processor負(fù)責(zé)從一個(gè)指定的Sink Group中激活一個(gè)Sink。Sink Processor可以通過(guò)組中所有Sink實(shí)現(xiàn)負(fù)載均衡;也可以在一個(gè)Sink失敗時(shí)轉(zhuǎn)移到另一個(gè)。
? Flume通過(guò)Sink Processor實(shí)現(xiàn)負(fù)載均衡(Load Balancing)和故障轉(zhuǎn)移(failover)
? 內(nèi)建的SinkProcessors:
? Load Balancing Sink Processor – 使用RANDOM, ROUND_ROBIN或定制的選擇算法
? Failover Sink Processor
? Default Sink Processor(單Sink)
? 所有的Sink都是采取輪詢(xún)(polling)的方式從Channel上獲取events。這個(gè)動(dòng)作是通過(guò)SinkRunner激活的
? Sink Processor充當(dāng)Sink的一個(gè)代理
Sink Group
Groups配置可以實(shí)現(xiàn)sink的負(fù)載均衡和失敗重試機(jī)制
? 負(fù)載均衡配置示例
a1.sinkgroups =g1
a1.sinkgroups.g1.sinks= k1 k2
a1.sinkgroups.g1.processor.type= load_balance
a1.sinkgroups.g1.processor.backoff= true
a1.sinkgroups.g1.processor.selector= random
? 失敗重試配置示例
a1.sinkgroups =g1
a1.sinkgroups.g1.sinks= k1 k2
a1.sinkgroups.g1.processor.type= failover
a1.sinkgroups.g1.processor.priority.k1= 5
a1.sinkgroups.g1.processor.priority.k2= 10
a1.sinkgroups.g1.processor.maxpenalty= 10000
Sink事務(wù)實(shí)現(xiàn)

以HBaseSink.class為例,首先從channel獲取一個(gè)事務(wù),然后事務(wù)開(kāi)啟后進(jìn)行take操作,即從channel獲取Event,然后對(duì)Event進(jìn)行消費(fèi)處理(putEventsAndCommit即提交事務(wù)),處理完成后關(guān)閉事務(wù)。

引用美團(tuán)的使用場(chǎng)景
參考自:http://www.aboutyun.com/thread-8317-1-1.html

a. 整個(gè)系統(tǒng)分為三層:Agent層,Collector層和Store層。其中Agent層每個(gè)機(jī)器部署一個(gè)進(jìn)程,負(fù)責(zé)對(duì)單機(jī)的日志收集工作;Collector層部署在中心服務(wù)器上,負(fù)責(zé)接收Agent層發(fā)送的日志,并且將日志根據(jù)路由規(guī)則寫(xiě)到相應(yīng)的Store層中;Store層負(fù)責(zé)提供永久或者臨時(shí)的日志存儲(chǔ)服務(wù),或者將日志流導(dǎo)向其它服務(wù)器。
b. Agent到Collector使用LoadBalance策略,將所有的日志均衡地發(fā)到所有的Collector上,達(dá)到負(fù)載均衡的目標(biāo),同時(shí)并處理單個(gè)Collector失效的問(wèn)題。
c. Collector層的目標(biāo)主要有三個(gè):SinkHdfs,SinkKafka和SinkBypass。分別提供離線的數(shù)據(jù)到Hdfs,和提供實(shí)時(shí)的日志流到Kafka和Bypass。其中SinkHdfs又根據(jù)日志量的大小分為SinkHdfs_b,SinkHdfs_m和SinkHdfs_s三個(gè)Sink,以提高寫(xiě)入到Hdfs的性能,具體見(jiàn)后面介紹。
d. 對(duì)于Store來(lái)說(shuō),Hdfs負(fù)責(zé)永久地存儲(chǔ)所有日志;Kafka存儲(chǔ)最新的7天日志,并給Storm系統(tǒng)提供實(shí)時(shí)日志流;Bypass負(fù)責(zé)給其它服務(wù)器和應(yīng)用提供實(shí)時(shí)日志流。

a. 模塊命名規(guī)則:所有的Source以src開(kāi)頭,所有的Channel以ch開(kāi)頭,所有的Sink以sink開(kāi)頭;
b. Channel統(tǒng)一使用美團(tuán)開(kāi)發(fā)的DualChannel,具體原因后面詳述;對(duì)于過(guò)濾掉的日志使用NullChannel,具體原因后面詳述;
c. 模塊之間內(nèi)部通信統(tǒng)一使用Avro接口;
(DualChannel:基于 MemoryChannel和 FileChannel開(kāi)發(fā)。當(dāng)堆積在Channel中的events數(shù)小于閾值時(shí),所有的events被保存在MemoryChannel中,Sink從MemoryChannel中讀取數(shù)據(jù);當(dāng)堆積在Channel中的events數(shù)大于閾值時(shí),所有的events被自動(dòng)存放在FileChannel中,Sink從FileChannel中讀取數(shù)據(jù)。這樣當(dāng)系統(tǒng)正常運(yùn)行時(shí),我們可以使用MemoryChannel的高吞吐特性;當(dāng)系統(tǒng)有異常時(shí),我們可以利用FileChannel的大緩存的特性。)
參考:(基于Flume的美團(tuán)日志收集系統(tǒng)(一)架構(gòu)和設(shè)計(jì))http://www.aboutyun.com/thread-8317-1-1.html
參考:(基于Flume的美團(tuán)日志收集系統(tǒng)(二)改進(jìn)和優(yōu)化)http://www.aboutyun.com/thread-8318-1-1.html