flume是我2015年為前公司主導(dǎo)開發(fā)【統(tǒng)一日志平臺】時采用的技術(shù)(主要技術(shù)棧:flume+ES+Redis+mongoBD+Kafka+Hadoop+Netty ),期間也積累了不少經(jīng)驗(yàn)(挖坑、踩坑、填坑)。
在我離開前,我們的日志平臺數(shù)據(jù)量為8億/天,高峰為8500萬/小時、800萬/5分鐘。 flume agent單機(jī)壓測15000/s數(shù)據(jù)量,未出現(xiàn)程序異常、資源占用過高與日志明顯丟失情況。
離開前東家后,便沒有再從事該類型的工作,因此當(dāng)時的一些關(guān)于日志平臺的想法也不再有機(jī)會去實(shí)踐,暫且認(rèn)為這是0.1版本吧。
本文將主要介紹我們在flume上做的一些定制開發(fā)與壓測,另外時間已經(jīng)過去了一年多,有一些細(xì)節(jié)難免有點(diǎn)忘卻。
1.Flume介紹
1.1 架構(gòu)介紹

agent本身是一個Java進(jìn)程,運(yùn)行在日志收集節(jié)點(diǎn)—所謂日志收集節(jié)點(diǎn)就是服務(wù)器節(jié)點(diǎn)。
agent里面包含3個核心的組件:source—->channel—–>sink,類似生產(chǎn)者、倉庫、消費(fèi)者的架構(gòu)。
source:source組件是專門用來收集數(shù)據(jù)的,可以處理各種類型、各種格式的日志數(shù)據(jù),包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。
channel:source組件把數(shù)據(jù)收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數(shù)據(jù)的——對采集到的數(shù)據(jù)進(jìn)行簡單的緩存,可以存放在memory、jdbc、file等等。
sink:sink組件是用于把數(shù)據(jù)發(fā)送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。
event:將傳輸?shù)臄?shù)據(jù)進(jìn)行封裝,是flume傳輸數(shù)據(jù)的基本單位,如果是文本文件,通常是一行記錄,event也是事務(wù)的基本單位。event從source,流向channel,再到sink,本身為一個字節(jié)數(shù)組,并可攜帶headers(頭信息)信息。event代表著一個數(shù)據(jù)的最小完整單元,從外部數(shù)據(jù)源來,向外部的目的地去。
2.背景說明
我們的需求是將Java 應(yīng)用的log信息進(jìn)行收集,達(dá)到日志采集的目的,agent目前主要有flume、Logstash,技術(shù)選型詳情在此就不表了,最終選擇的flume。
由于當(dāng)時公司內(nèi)部推行技術(shù)組件一直有難度,且也無法借助行政手段,因此我們在設(shè)計(jì)時很多時候考慮都是盡量對應(yīng)用透明,比如我們的flume source使用的是基于log文件的,而未使用應(yīng)用與flume agent采用長連接的方式(該方式需要修改log4j配置,并且引入我們的jar),比如我們agent進(jìn)行日志等級判斷時 需要兼容各種日志格式,因?yàn)槲覀冸y以推動各個應(yīng)用方統(tǒng)一日志格式……
sre方面,當(dāng)時并沒有為agent預(yù)留內(nèi)存等資源,所以一旦我們的agent出現(xiàn)資源占用過多,都會比較敏感。
整個日志平臺1.0版本的架構(gòu)圖如下:

可以看到我們使用kafka將log信息做轉(zhuǎn)儲,消息消費(fèi)者主要有HDFS、ES、Queue等。
3.定制開發(fā)
從我們的實(shí)際情況出發(fā),我們做定制開發(fā)部分主要是source和sink部分。同時我們也開發(fā)了一套agent自動部署工具。
3.1 source定制
3.1.1 dirSource
基于文件的dirSource在flume高版本里已經(jīng)去除了,原生的dirSource也存在很多性能和功能上的問題,為了在我們使用的flume1.6版本里繼續(xù)使用dirSource,我們就基于1.6實(shí)現(xiàn)了一版dirSource。
dirSource特性
- 基于NIO的WatchEvent進(jìn)行l(wèi)og文件內(nèi)容的寫操作監(jiān)聽,同時有能動態(tài)的監(jiān)聽文件的創(chuàng)建和刪除。我們豐富了這部分的匹配模式,可以實(shí)現(xiàn)靈活的文件監(jiān)聽。
- 文件的讀取基于RandomAccessFile,按行讀取
- 將獲取內(nèi)容進(jìn)行處理封裝Event,存入Channel。
存在的問題
- 無論是WatchEvent還是RandomAccessFile在log瘋狂輸出時,CPU占用會居高不下。
3.1.2 execSource
execSource為flume新版本推出的用來替代dirSource的一種實(shí)現(xiàn)方式,主要是通過Java執(zhí)行shell命令,并且獲取shell命令的輸出信息,如tail、cat等。
我們在原生的execSource基礎(chǔ)上,實(shí)現(xiàn)了文件的自動監(jiān)聽,實(shí)現(xiàn)了多命令模式,并且會自動回收長時間無內(nèi)容產(chǎn)出的命令,優(yōu)化了原有的線程關(guān)閉的操作及進(jìn)程鉤子等。
execSource特性
- 基于NIO的WatchEvent進(jìn)行l(wèi)og文件內(nèi)容的寫操作監(jiān)聽,同時有能動態(tài)的監(jiān)聽文件的創(chuàng)建和刪除。我們豐富了這部分的匹配模式,可以實(shí)現(xiàn)靈活的文件監(jiān)聽
- 多命令模式
- 自動回收長時間無內(nèi)容產(chǎn)出的命令
- 重啟時自動清理無用的shell命令
存在的問題
- flume agent進(jìn)程被kill -9 時,對導(dǎo)致執(zhí)行的shell命令無法退出,進(jìn)而導(dǎo)致句柄得不到釋放,積累下來對服務(wù)器造成影響。
3.2 sink定制
我們采用的是kafka sink,flume原生的kafka sink使用的是老版本kafka producer client,發(fā)送消息時需要手動實(shí)現(xiàn)批量與異步,并且是消息發(fā)送的實(shí)現(xiàn)上存在一些不足,在大數(shù)據(jù)量時存在明顯的性能瓶頸,并且會由于集合中消息數(shù)量太多而報(bào)異常,進(jìn)而丟失消息。
我們定制的版本使用的new kafka producer client ,并且對消息發(fā)送做了優(yōu)化,同時對Channel參數(shù)做了大量的壓測,最終確定了最優(yōu)配置。
kafkaSink特性
- 使用new kafka producer client ,默認(rèn)異步批量發(fā)送
- 優(yōu)化了消息體序列化方式
4.壓測
下文描述的壓測都是在建設(shè)日志平臺過程中對flume的相關(guān)測試。
測試環(huán)境都是mac book pro ,這里只關(guān)注各個測試項(xiàng)的對比信息。
測試一
| 類型 | 日志總數(shù) | 生產(chǎn)頻率 | cpu | cpu平均 | mem | 數(shù)據(jù)丟失 | 用時 |
|---|---|---|---|---|---|---|---|
| tailDirSource+New kafka sink | 50萬 | 2000/s | 16-27% | 20% | 230M | 幾百條以內(nèi) | 280s |
| tailDirSource+Old kafka sink | 50萬 | 2000/s | 16-27% | 19% | 230M | 較上一種丟失少 | 280s |
| tailDirSource+New kafka sink | 50萬 | 4000/s | 34-60% | 40% | 230M | <2000 | 145s |
| tailDirSource+Old kafka sink | 50萬 | 4000/s | 34-57% | 41% | 230M | <200 | 145s |
| execSource+Old kafka sink | 50萬 | 4000/s | <8% | 7.5% | 230M | <200 | 145s |
| execSource+Old kafka sink+Spillable Memory Channel | 50萬 | 4000/s | 8-10% | 9.5% | 230M | <200 | 145s |
| execSource+Old kafka sink+File Channel | 50萬 | 4000/s | 40-55% | 45% | 230M | <200 | 145s |
說明:
- 類型New kafka sink為:原生sink,使用kafka舊client,只定制了從head中獲取配置參數(shù),拼接字符串
- 類型Old kafka sink為:深度定制版,使用kafka新client
結(jié)論:
- flume 資源占用從kafka發(fā)送部分目前沒有太好的優(yōu)化方案,且舊kafka client數(shù)據(jù)丟失更加嚴(yán)重。
- 因此flume kafka sink 維持不變,后續(xù)可從flume source入手優(yōu)化
測試二
| 類型 | 日志總數(shù) | 生產(chǎn)頻率 | cpu占用 | cpu平均 | 內(nèi)存占用 | 數(shù)據(jù)丟失 | 用時 | JVM配置 |
|---|---|---|---|---|---|---|---|---|
| tailDirSource+ kafka api sink | 50萬 | 3100/s | 34-60% | 40% | 230M | <200 | 163s | 512M |
| tailDirSource+ kafka sink | 50萬 | 3100/s | 34-57% | 41% | 230M | <200 | 163s | 512M |
| execSource+ kafka sink | 50萬 | 3100/s | <8% | 7.5% | 230M | <200 | 163s | 512M |
| execSource+ kafka sink+Spillable Memory Channel | 50萬 | 3100/s | 8-10% | 9.5% | 230M | <200 | 163s | 512M |
| execSource+ kafka sink+File Channel | 50萬 | 3100/s | 40-55% | 45% | 230M | <200 | 163s | 512M |
| execSource+ kafka sink+MemoryChannel | 500萬 | 31074/s | 30-100% | 40% | 1G | <200 | 163s | 1G |
| execSource+ kafka sink+MemoryChannel | 250萬 | 15337/s | 15-20% | 18% | 450M | <200 | 163s | 1G |
| execSource+ kafka sink+MemoryChannel+FastJSON | 250萬 | 15337/s | 18-22% | 20% | 420M | <200 | 163s | 1G |
| custom execSource+ kafka sink+MemoryChannel+FastJSON | 250萬 | 15432/s | 18-25% | 21% | 420M | <200 | 162s | 1G |
| custom execSource+ kafka sink+MemoryChannel+FastJSON | 125萬+125萬 | 7661/s+7668/s | 20-26% | 24% | 440M | <500 | 163s | 1G |
測試三
配置說明一
a1.sinks.k1.batch.num.messages = 5000
a1.sinks.k1.block.on.buffer.full = true
a1.sinks.k1.buffer.memory = 167108864
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
flume -Xmx256M -Xms256M
測試結(jié)果一
| 日志寫數(shù)量 | 用時 | 線程數(shù) | QPS | 日志文件量 | 成功發(fā)送到kafka數(shù)量 | topic個數(shù) | CPU | 內(nèi)存 | 序列化方式 | 其他 |
|---|---|---|---|---|---|---|---|---|---|---|
| 500萬 | 74s | 50 | 70000/s | 600m | 280萬(單個topic) | 2 | 未統(tǒng)計(jì) | 300M | fastjson | agent異常 |
配置說明二
a1.sinks.k1.batch.num.messages = 5000
a1.sinks.k1.block.on.buffer.full = true
a1.sinks.k1.buffer.memory = 167108864
a1.channels.c1.type = memory
a1.channels.c1.capacity = 500000
a1.channels.c1.transactionCapacity = 500
a1.channels.c1.byteCapacity = 536870912
flume -Xmx256M -Xms256M
測試結(jié)果二
| 日志寫數(shù)量 | 用時 | 線程數(shù) | QPS | 日志文件量 | 成功發(fā)送到kafka數(shù)量 | topic個數(shù) | CPU | 內(nèi)存 | 序列化方式 | 其他 |
|---|---|---|---|---|---|---|---|---|---|---|
| 500萬 | 68s | 50 | 74000/s | 600m | 500萬(單個topic) | 2 | 200%以上 | 320M | fastjson | 無異常 |
| 500萬 | 68s | 50 | 74000/s | 600m | 500萬(單個topic) | 1 | 100%-200% | 320M | fastjson | 無異常 |
| 500萬 | 68s | 50 | 74000/s | 600m | 500萬(單個topic) | 1 | 小于100% | 280M | StringBuild拼接 | 無異常 |
總結(jié)
數(shù)據(jù)量過大時,sink中kafka client 緩存被存滿,kafka會報(bào)異常,設(shè)置block=true后,存入緩存會被阻塞,kafka不報(bào)異常,但是由于sink從channel中消費(fèi)的速度遠(yuǎn)低于source存入channel的速度,channel會報(bào)Unable to put event on required channel,flume停止提供服務(wù)。繼續(xù)寫入日志,會重復(fù)發(fā)送錯誤。
該異常可通過增大channel的byteCapacity參數(shù)或者調(diào)大JVM的參數(shù)值(byteCapacity默認(rèn)為JVM的80%)來提高報(bào)錯的閥值,且減小transactionCapacity 的值來減緩傳輸?shù)絪ink的數(shù)據(jù)量。
JVM內(nèi)存參數(shù)在7萬每秒的壓力下,設(shè)置為256M較為合適,byteCapacity設(shè)置為512M較為合適,當(dāng)增加channel個數(shù)或者增大channel向sink傳輸?shù)臄?shù)據(jù)量時,都會導(dǎo)致sink消費(fèi)過慢報(bào)異常(總結(jié)1中異常),單個channel內(nèi)存消耗在300M左右。
對于數(shù)據(jù)量較大的應(yīng)用,建議只發(fā)送單個topic。
個人介紹:
高廣超 :多年一線互聯(lián)網(wǎng)研發(fā)與架構(gòu)設(shè)計(jì)經(jīng)驗(yàn),擅長設(shè)計(jì)與落地高可用、高性能互聯(lián)網(wǎng)架構(gòu)。目前就職于美團(tuán)網(wǎng),負(fù)責(zé)核心業(yè)務(wù)研發(fā)工作。