1. 介紹
Flume作為一款實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的廣泛認(rèn)可和應(yīng)用,其由Cloudera開(kāi)發(fā),初始發(fā)行版本為Flume OG(original generation),更新到0.9.4版本后,Cloudera對(duì)flume進(jìn)行了重構(gòu),發(fā)行了Flume NG(next generation),并將其交給apache托管,成為了apache的頂級(jí)項(xiàng)目,官方網(wǎng)站為
2. 特性

Flume是一個(gè)分布式、可靠、高可用的海量日志采集傳輸系統(tǒng),支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時(shí),F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理,并寫(xiě)到各種數(shù)據(jù)接受方(比如文本、HDFS、Hbase等)的能力 。
3. 三大組件
Flume 運(yùn)行的核心是 Agent。Flume以agent為最小的獨(dú)立運(yùn)行單位。一個(gè)agent就是一個(gè)JVM。它是一個(gè)完整的數(shù)據(jù)收集工具,含有三個(gè)核心組件,分別是source、 channel、 sink。通過(guò)這些組件, Event 可以從一個(gè)地方流向另一個(gè)地方。
3.1 source
source是數(shù)據(jù)的收集端,負(fù)責(zé)將數(shù)據(jù)捕獲后進(jìn)行特殊的格式化,將數(shù)據(jù)封裝到事件(event) 里,然后將事件推入Channel中。有各種source,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source等等。
3.2 channel
Channel: Flume Channel主要提供一個(gè)隊(duì)列的功能,對(duì)source提供中的數(shù)據(jù)進(jìn)行簡(jiǎn)單的緩存。有Memory Channel、JDBC Chanel、File Channel等等。
3.3 sink
Flume Sink取出Channel中的數(shù)據(jù),進(jìn)行相應(yīng)的存儲(chǔ)文件系統(tǒng),數(shù)據(jù)庫(kù),或者提交到遠(yuǎn)程服務(wù)器。包括HDFS sink、 Logger sink、 Avro sink、 File Roll sink、 Null sink、 HBase sink等等。
4. 應(yīng)用場(chǎng)景
Flink主要用于同步各源端的數(shù)據(jù)到指定位置
(1)多代理流

從第一臺(tái)機(jī)器的flume agent傳送到第二臺(tái)機(jī)器的flume agent。
例:
規(guī)劃:
hadoop02:tail-avro.properties
???使用 exec “tail -F /home/hadoop/testlog/welog.log”獲取采集數(shù)據(jù)
???使用 avro sink 數(shù)據(jù)到下一個(gè) agent
hadoop03:avro-hdfs.properties
???使用 avro 接收采集數(shù)據(jù)
???使用 hdfs sink 數(shù)據(jù)到目的地
配置文件
#tail-avro.properties
a1.sources = r1 a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/testlog/date.log
a1.sources.r1.channels = c1
#Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 4141
a1.sinks.k1.batch-size = 2
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#avro-hdfs.properties
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
#Describe k1a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://myha01/testlog/flume-event/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = date_
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)多路復(fù)用采集

在一份agent中有多個(gè)channel和多個(gè)sink,然后多個(gè)sink輸出到不同的文件或者文件系統(tǒng)中。
規(guī)劃:
Hadoop02:(tail-hdfsandlogger.properties)
???使用 exec “tail -F /home/hadoop/testlog/datalog.log”獲取采集數(shù)據(jù)
???使用 sink1 將數(shù)據(jù) 存儲(chǔ)hdfs
???使用 sink2 將數(shù)據(jù)都存儲(chǔ) 控制臺(tái)
配置文件
#tail-hdfsandlogger.properties
#2個(gè)channel和2個(gè)sink的配置文件
#Name the components on this agent
a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1 c2
#Describe/configure tail -F source1
a1.sources.s1.type = execa1.sources.s1.command = tail -F /home/hadoop/logs/catalina.out
#指定source進(jìn)行扇出到多個(gè)channnel的規(guī)則
a1.sources.s1.selector.type = replicatinga1.sources.s1.channels = c1 c2
#Use a channel which buffers events in memory
#指定channel c1
a1.channels.c1.type = memory
#指定channel c2
a1.channels.c2.type = memory
#Describe the sink
#指定k1的設(shè)置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://myha01/flume_log/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.channel = c1
#指定k2的
a1.sinks.k2.type = logger
a1.sinks.k2.channel = c2
(3)高可用部署采集

首先在三個(gè)web服務(wù)器中收集數(shù)據(jù),然后交給collect,此處的collect是高可用的,首先collect01是主,所有收集到的數(shù)據(jù)發(fā)送給他,collect02只是出于熱備狀態(tài)不接受數(shù)據(jù),當(dāng)collect01宕機(jī)的時(shí)候,collect02頂替,然后接受數(shù)據(jù),最終將數(shù)據(jù)發(fā)送給hdfs或者kafka。
agent和collecotr的部署

Agent1、Agent2數(shù)據(jù)分別流入到Collector1和Collector2中,F(xiàn)lume NG 本 身提供了 Failover 機(jī)制,可以自動(dòng)切換和恢復(fù)。再由Collector1和Collector2將數(shù)據(jù)輸出到hdfs中。
示意圖

配置文件:
#ha_agent.properties
#agent name: agent1
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /home/hadoop/testlog/testha.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp
#set sink1agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop02
agent1.sinks.k1.port = 52020
#set sink2agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = hadoop03
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
#ha_collector.properties
#set agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#other node,nna to nns
a1.sources.r1.type = avro
##當(dāng)前主機(jī)為什么,就修改成什么主機(jī)名
a1.sources.r1.bind = hadoop03
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
##當(dāng)前主機(jī)為什么,就修改成什么主機(jī)名
a1.sources.r1.interceptors.i1.value = hadoop03
a1.sources.r1.channels = c1
#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://myha01/flume_ha/loghdfs
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
最后啟動(dòng):
#先啟動(dòng) hadoop02 和 hadoop03 上的 collector 角色:
bin/flume-ng agent -c conf -f agentconf/ha_collector.properties -n a1 - Dflume.root.logger=INFO,console
#然后啟動(dòng) hadoop01,hadoop02 上的 agent 角色:
bin/flume-ng agent -c conf -f agentconf/ha_agent.properties -n agent1 - Dflume.root.logger=INFO,console
5. 安裝部署
Flume框架對(duì)hadoop和zookeeper的依賴只是在jar包上,并不要求flume啟動(dòng)時(shí)必須將hadoop和zookeeper服務(wù)也啟動(dòng)。
(1)將安裝包上傳到服務(wù)器并解壓
[hadoop@hadoop1 ~]$ tar -zxvf apache-flume-1.8.0-bin.tar.gz -C apps
(2)創(chuàng)建軟連接
[hadoop@hadoop1 ~]$ ln -s apache-flume-1.8.0-bin/ flume
(3)修改配置文件
/home/hadoop/apps/apache-flume-1.8.0-bin/conf
[hadoop@hadoop1 conf]$ cp flume-env.sh.template flume-env.sh
(4)配置環(huán)境變量
[hadoop@hadoop1 conf]$ vi ~/.bashrc
export FLUME_HOME=/home/hadoop/apps/flume
export PATH=$PATH:$FLUME_HOME/bin
保存使其立即生效
[hadoop@hadoop1 conf]$ source ~/.bashrc
(5)查看版本
[hadoop@hadoop1 ~]$ flume-ng version
6. 使用
flume的用法很簡(jiǎn)單—-書(shū)寫(xiě)一個(gè)配置文件,在配置文件當(dāng)中描述source、channel與sink的具體實(shí)現(xiàn),而后運(yùn)行一個(gè)agent實(shí)例,在運(yùn)行agent實(shí)例的過(guò)程中會(huì)讀取配置文件的內(nèi)容,這樣flume就會(huì)采集到數(shù)據(jù)。
配置文件的編寫(xiě)原則:
1>從整體上描述代理agent中sources、sinks、channels所涉及到的組件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
2>詳細(xì)描述agent中每一個(gè)source、sink與channel的具體實(shí)現(xiàn):即在描述source的時(shí)候,需要
指定source到底是什么類型的,即這個(gè)source是接受文件的、還是接受http的、還是接受thrift
的;對(duì)于sink也是同理,需要指定結(jié)果是輸出到HDFS中,還是Hbase中啊等等;對(duì)于channel
需要指定是內(nèi)存啊,還是數(shù)據(jù)庫(kù)啊,還是文件啊等等。
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
3>通過(guò)channel將source與sink連接起來(lái)
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)agent的shell操作:
flume-ng agent -n a1 -c ../conf -f ../conf/example.file
-Dflume.root.logger=DEBUG,console
參數(shù)說(shuō)明: -n 指定agent名稱(與配置文件中代理的名字相同)
-c 指定flume中配置文件的目錄
-f 指定配置文件
-Dflume.root.logger=DEBUG,console 設(shè)置日志等級(jí)
7. 高級(jí)組件
除了三大組件外,F(xiàn)lume還允許用戶設(shè)置其他高級(jí)組件更靈活地控制數(shù)據(jù)流,包括 Interceptor,Channel Selector 和 Sink Processor等
7.1 Interceptor
Flume-ng 1.7 中目前提供了以下攔截器:
- Timestamp Interceptor:該 Interceptor 在每個(gè) Event 頭部插入時(shí)間戳,其中key是timestamp,value為當(dāng)前時(shí)刻。
- Host Interceptor:該 Interceptor 在每個(gè) Event 頭部插入當(dāng)前 Agent 所在機(jī)器的host或ip,其中key是host(也可自定義)。
- Static Interceptor:靜態(tài)攔截器,用于在events header中加入一組靜態(tài)的key和value。
- UUID Interceptor:該 Interceptor 在每個(gè) Event 頭部插入一個(gè)128位的全局唯一標(biāo)示,例如 b5755073-77a9-43c1-8fad-b7a586fc1b97。
- Regex Extractor Interceptor:該 Interceptor 可根據(jù)正則表達(dá)式取出對(duì)應(yīng)的值,并插入到頭部
- Regex Filtering Interceptor:該 Interceptor 可根據(jù)正則表達(dá)式過(guò)濾或者保留符合要求的 Event。
7.2 Channel Selector
Channel Selector 允許 Flume Source 選擇一個(gè)或多個(gè)目標(biāo) Channel,并將當(dāng)前 Event 寫(xiě)入這些 Channel。Flume 提供了兩種 Channel Selector 實(shí)現(xiàn):
- Replicating Channel Selector:將每個(gè) Event 指定多個(gè) Channel,通過(guò)該 Selector,F(xiàn)lume 可將相同數(shù)據(jù)導(dǎo)入到多套系統(tǒng)中,一遍進(jìn)行不同地處理。這是Flume 默認(rèn)采用的 Channel Selector。
- Multiplexing Channel Selector:根據(jù) Event 頭部的屬性值,將 Event寫(xiě)入對(duì)應(yīng)的 Channel
7.3 Sink Processo
Flume 允許將多個(gè) Sink 組裝在一起形成一個(gè)邏輯實(shí)體,成為 Sink Group。而 Sink Processor 則在 Sink Group 基礎(chǔ)上提供負(fù)載均衡以及容錯(cuò)功能。當(dāng)一個(gè) Sink 掛掉了,可由另一個(gè) Sink 接替。Flume 提供了多種 Sink Processor 實(shí)現(xiàn):
- Default Sink Processor:默認(rèn)的 Sink Processor,僅僅接受一個(gè) Sink,實(shí)現(xiàn)了最簡(jiǎn)單的 source - channel - sink,每個(gè)組件只有一個(gè) - Failover Sink Processor:故障轉(zhuǎn)移接收器,Sink Group 中每個(gè) Sink 均被賦予一個(gè)優(yōu)先級(jí),Event 優(yōu)先由高優(yōu)先級(jí)的 Sink 發(fā)送,如果高優(yōu)先級(jí)的 Sink 掛了,則次高優(yōu)先級(jí)的 Sink 接替
- Load balancing Sink Processor:負(fù)載均衡接收處理器,Channel 中的 Event 通過(guò)某種負(fù)載均衡機(jī)制,交給 Sink Group 中的所有 Sink 發(fā)送,目前 Flume支持兩種負(fù)載均衡機(jī)制,分別是:round_robin(輪訓(xùn)),random(隨機(jī))。
8. 進(jìn)程監(jiān)控
8.1 Http監(jiān)控
使用這類監(jiān)控方式,只需要在啟動(dòng)flume的時(shí)候在啟動(dòng)參數(shù)上面加上監(jiān)控配置,例如這樣:
bin/flume-ng agent --conf conf --conf-file conf/flume_conf.properties --name collect -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
其中-Dflume.monitoring.type=http表示使用http方式來(lái)監(jiān)控,后面的-Dflume.monitoring.port=1234表示我們需要啟動(dòng)的監(jiān)控服務(wù)的端口號(hào)為1234,這個(gè)端口號(hào)可以自己隨便配置。然后啟動(dòng)flume以后,通過(guò)http://ip:1234/metrics就能夠得到flume的1個(gè)json格式的監(jiān)控?cái)?shù)據(jù)。
8.2ganglia監(jiān)控
這類監(jiān)控方式需要先安裝ganglia然后啟動(dòng)ganglia,然后再啟動(dòng)flume的時(shí)候加上監(jiān)控配置,例如:
bin/flume-ng agent --conf conf --conf-file conf/producer.properties --name collect -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=ip:port
其中-Dflume.monitoring.type=ganglia表示使用ganglia的方式來(lái)監(jiān)控,而-Dflume.monitoring.hosts=ip:port表示ganglia安裝的ip和啟動(dòng)的端口號(hào)。
flume監(jiān)控還可使用zabbix,但是這類方式需要在flume源碼中添加監(jiān)控模塊,相對(duì)照較麻煩,由于不是flume自帶的監(jiān)控方式,這里不討論這類方式。
因此,flume自帶的監(jiān)控方式其實(shí)就是http、ganglia兩種,http監(jiān)控只能通過(guò)1個(gè)http地址訪問(wèn)得到1個(gè)json格式的監(jiān)控?cái)?shù)據(jù),而ganglia監(jiān)控是拿到這個(gè)數(shù)據(jù)后用界面的方式展現(xiàn)出來(lái)了,相對(duì)照較直觀。