Flume基礎(chǔ)與進(jìn)階

1. 介紹

Flume作為一款實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的廣泛認(rèn)可和應(yīng)用,其由Cloudera開(kāi)發(fā),初始發(fā)行版本為Flume OG(original generation),更新到0.9.4版本后,Cloudera對(duì)flume進(jìn)行了重構(gòu),發(fā)行了Flume NG(next generation),并將其交給apache托管,成為了apache的頂級(jí)項(xiàng)目,官方網(wǎng)站為

http://flume.apache.org/

2. 特性

Flume架構(gòu)圖

Flume是一個(gè)分布式、可靠、高可用的海量日志采集傳輸系統(tǒng),支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時(shí),F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理,并寫(xiě)到各種數(shù)據(jù)接受方(比如文本、HDFS、Hbase等)的能力 。

3. 三大組件

Flume 運(yùn)行的核心是 Agent。Flume以agent為最小的獨(dú)立運(yùn)行單位。一個(gè)agent就是一個(gè)JVM。它是一個(gè)完整的數(shù)據(jù)收集工具,含有三個(gè)核心組件,分別是source、 channel、 sink。通過(guò)這些組件, Event 可以從一個(gè)地方流向另一個(gè)地方。

3.1 source

source是數(shù)據(jù)的收集端,負(fù)責(zé)將數(shù)據(jù)捕獲后進(jìn)行特殊的格式化,將數(shù)據(jù)封裝到事件(event) 里,然后將事件推入Channel中。有各種source,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source等等。

3.2 channel

Channel: Flume Channel主要提供一個(gè)隊(duì)列的功能,對(duì)source提供中的數(shù)據(jù)進(jìn)行簡(jiǎn)單的緩存。有Memory Channel、JDBC Chanel、File Channel等等。

3.3 sink

Flume Sink取出Channel中的數(shù)據(jù),進(jìn)行相應(yīng)的存儲(chǔ)文件系統(tǒng),數(shù)據(jù)庫(kù),或者提交到遠(yuǎn)程服務(wù)器。包括HDFS sink、 Logger sink、 Avro sink、 File Roll sink、 Null sink、 HBase sink等等。

4. 應(yīng)用場(chǎng)景

Flink主要用于同步各源端的數(shù)據(jù)到指定位置

(1)多代理流

多代理流

從第一臺(tái)機(jī)器的flume agent傳送到第二臺(tái)機(jī)器的flume agent。
例:
規(guī)劃
hadoop02:tail-avro.properties
???使用 exec “tail -F /home/hadoop/testlog/welog.log”獲取采集數(shù)據(jù)
???使用 avro sink 數(shù)據(jù)到下一個(gè) agent
hadoop03:avro-hdfs.properties
???使用 avro 接收采集數(shù)據(jù)
???使用 hdfs sink 數(shù)據(jù)到目的地
配置文件

#tail-avro.properties
a1.sources = r1 a1.sinks = k1
a1.channels = c1

#Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /home/hadoop/testlog/date.log 
a1.sources.r1.channels = c1 

#Describe the sink
a1.sinks.k1.type = avro 
a1.sinks.k1.channel = c1 
a1.sinks.k1.hostname = hadoop02 
a1.sinks.k1.port = 4141 
a1.sinks.k1.batch-size = 2

#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#avro-hdfs.properties
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

#Describe k1a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://myha01/testlog/flume-event/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = date_
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60 
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)多路復(fù)用采集

多路復(fù)用采集

在一份agent中有多個(gè)channel和多個(gè)sink,然后多個(gè)sink輸出到不同的文件或者文件系統(tǒng)中。
規(guī)劃:
Hadoop02:(tail-hdfsandlogger.properties)
???使用 exec “tail -F /home/hadoop/testlog/datalog.log”獲取采集數(shù)據(jù)
???使用 sink1 將數(shù)據(jù) 存儲(chǔ)hdfs
???使用 sink2 將數(shù)據(jù)都存儲(chǔ) 控制臺(tái)

配置文件

#tail-hdfsandlogger.properties
#2個(gè)channel和2個(gè)sink的配置文件
#Name the components on this agent
a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1 c2 

#Describe/configure tail -F source1
a1.sources.s1.type = execa1.sources.s1.command = tail -F /home/hadoop/logs/catalina.out
#指定source進(jìn)行扇出到多個(gè)channnel的規(guī)則
a1.sources.s1.selector.type = replicatinga1.sources.s1.channels = c1 c2 

#Use a channel which buffers events in memory
#指定channel c1
a1.channels.c1.type = memory
#指定channel c2
a1.channels.c2.type = memory 

#Describe the sink
#指定k1的設(shè)置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://myha01/flume_log/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.channel = c1
#指定k2的
a1.sinks.k2.type = logger
a1.sinks.k2.channel = c2

(3)高可用部署采集

高可用采集部署

首先在三個(gè)web服務(wù)器中收集數(shù)據(jù),然后交給collect,此處的collect是高可用的,首先collect01是主,所有收集到的數(shù)據(jù)發(fā)送給他,collect02只是出于熱備狀態(tài)不接受數(shù)據(jù),當(dāng)collect01宕機(jī)的時(shí)候,collect02頂替,然后接受數(shù)據(jù),最終將數(shù)據(jù)發(fā)送給hdfs或者kafka。
agent和collecotr的部署

flume3.png

Agent1、Agent2數(shù)據(jù)分別流入到Collector1和Collector2中,F(xiàn)lume NG 本 身提供了 Failover 機(jī)制,可以自動(dòng)切換和恢復(fù)。再由Collector1和Collector2將數(shù)據(jù)輸出到hdfs中。
示意圖

配置文件

#ha_agent.properties
#agent name: agent1
agent1.channels = c1
agent1.sources = r1 
agent1.sinks = k1 k2

#set gruop
agent1.sinkgroups = g1 

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100 
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /home/hadoop/testlog/testha.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp 

#set sink1agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop02
agent1.sinks.k1.port = 52020

#set sink2agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = hadoop03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups.g1.sinks = k1 k2

#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
#ha_collector.properties
#set agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#other node,nna to nns
a1.sources.r1.type = avro
##當(dāng)前主機(jī)為什么,就修改成什么主機(jī)名
a1.sources.r1.bind = hadoop03
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
##當(dāng)前主機(jī)為什么,就修改成什么主機(jī)名
a1.sources.r1.interceptors.i1.value = hadoop03
a1.sources.r1.channels = c1 

#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://myha01/flume_ha/loghdfs
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

最后啟動(dòng)

#先啟動(dòng) hadoop02 和 hadoop03 上的 collector 角色:
bin/flume-ng agent -c conf -f agentconf/ha_collector.properties -n a1 - Dflume.root.logger=INFO,console
#然后啟動(dòng) hadoop01,hadoop02 上的 agent 角色:
bin/flume-ng agent -c conf -f agentconf/ha_agent.properties -n agent1 - Dflume.root.logger=INFO,console

5. 安裝部署

Flume框架對(duì)hadoop和zookeeper的依賴只是在jar包上,并不要求flume啟動(dòng)時(shí)必須將hadoop和zookeeper服務(wù)也啟動(dòng)。

(1)將安裝包上傳到服務(wù)器并解壓

[hadoop@hadoop1 ~]$ tar -zxvf apache-flume-1.8.0-bin.tar.gz -C apps

(2)創(chuàng)建軟連接

[hadoop@hadoop1 ~]$ ln -s apache-flume-1.8.0-bin/ flume

(3)修改配置文件

/home/hadoop/apps/apache-flume-1.8.0-bin/conf

[hadoop@hadoop1 conf]$ cp flume-env.sh.template flume-env.sh

(4)配置環(huán)境變量

[hadoop@hadoop1 conf]$ vi ~/.bashrc
export FLUME_HOME=/home/hadoop/apps/flume
export PATH=$PATH:$FLUME_HOME/bin

保存使其立即生效

[hadoop@hadoop1 conf]$ source ~/.bashrc

(5)查看版本

[hadoop@hadoop1 ~]$  flume-ng version 

6. 使用

flume的用法很簡(jiǎn)單—-書(shū)寫(xiě)一個(gè)配置文件,在配置文件當(dāng)中描述source、channel與sink的具體實(shí)現(xiàn),而后運(yùn)行一個(gè)agent實(shí)例,在運(yùn)行agent實(shí)例的過(guò)程中會(huì)讀取配置文件的內(nèi)容,這樣flume就會(huì)采集到數(shù)據(jù)。
配置文件的編寫(xiě)原則:
1>從整體上描述代理agent中sources、sinks、channels所涉及到的組件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

2>詳細(xì)描述agent中每一個(gè)source、sink與channel的具體實(shí)現(xiàn):即在描述source的時(shí)候,需要
指定source到底是什么類型的,即這個(gè)source是接受文件的、還是接受http的、還是接受thrift
的;對(duì)于sink也是同理,需要指定結(jié)果是輸出到HDFS中,還是Hbase中啊等等;對(duì)于channel
需要指定是內(nèi)存啊,還是數(shù)據(jù)庫(kù)啊,還是文件啊等等。

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

3>通過(guò)channel將source與sink連接起來(lái)

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動(dòng)agent的shell操作:

flume-ng  agent -n a1  -c  ../conf   -f  ../conf/example.file  
-Dflume.root.logger=DEBUG,console  

參數(shù)說(shuō)明: -n 指定agent名稱(與配置文件中代理的名字相同)
-c 指定flume中配置文件的目錄
-f 指定配置文件
-Dflume.root.logger=DEBUG,console 設(shè)置日志等級(jí)

7. 高級(jí)組件

除了三大組件外,F(xiàn)lume還允許用戶設(shè)置其他高級(jí)組件更靈活地控制數(shù)據(jù)流,包括 Interceptor,Channel Selector 和 Sink Processor等

7.1 Interceptor

Flume-ng 1.7 中目前提供了以下攔截器:

  • Timestamp Interceptor:該 Interceptor 在每個(gè) Event 頭部插入時(shí)間戳,其中key是timestamp,value為當(dāng)前時(shí)刻。
  • Host Interceptor:該 Interceptor 在每個(gè) Event 頭部插入當(dāng)前 Agent 所在機(jī)器的host或ip,其中key是host(也可自定義)。
  • Static Interceptor:靜態(tài)攔截器,用于在events header中加入一組靜態(tài)的key和value。
  • UUID Interceptor:該 Interceptor 在每個(gè) Event 頭部插入一個(gè)128位的全局唯一標(biāo)示,例如 b5755073-77a9-43c1-8fad-b7a586fc1b97。
  • Regex Extractor Interceptor:該 Interceptor 可根據(jù)正則表達(dá)式取出對(duì)應(yīng)的值,并插入到頭部
  • Regex Filtering Interceptor:該 Interceptor 可根據(jù)正則表達(dá)式過(guò)濾或者保留符合要求的 Event。

7.2 Channel Selector

Channel Selector 允許 Flume Source 選擇一個(gè)或多個(gè)目標(biāo) Channel,并將當(dāng)前 Event 寫(xiě)入這些 Channel。Flume 提供了兩種 Channel Selector 實(shí)現(xiàn):

  • Replicating Channel Selector:將每個(gè) Event 指定多個(gè) Channel,通過(guò)該 Selector,F(xiàn)lume 可將相同數(shù)據(jù)導(dǎo)入到多套系統(tǒng)中,一遍進(jìn)行不同地處理。這是Flume 默認(rèn)采用的 Channel Selector。
  • Multiplexing Channel Selector:根據(jù) Event 頭部的屬性值,將 Event寫(xiě)入對(duì)應(yīng)的 Channel

7.3 Sink Processo

Flume 允許將多個(gè) Sink 組裝在一起形成一個(gè)邏輯實(shí)體,成為 Sink Group。而 Sink Processor 則在 Sink Group 基礎(chǔ)上提供負(fù)載均衡以及容錯(cuò)功能。當(dāng)一個(gè) Sink 掛掉了,可由另一個(gè) Sink 接替。Flume 提供了多種 Sink Processor 實(shí)現(xiàn):

  • Default Sink Processor:默認(rèn)的 Sink Processor,僅僅接受一個(gè) Sink,實(shí)現(xiàn)了最簡(jiǎn)單的 source - channel - sink,每個(gè)組件只有一個(gè) - Failover Sink Processor:故障轉(zhuǎn)移接收器,Sink Group 中每個(gè) Sink 均被賦予一個(gè)優(yōu)先級(jí),Event 優(yōu)先由高優(yōu)先級(jí)的 Sink 發(fā)送,如果高優(yōu)先級(jí)的 Sink 掛了,則次高優(yōu)先級(jí)的 Sink 接替
  • Load balancing Sink Processor:負(fù)載均衡接收處理器,Channel 中的 Event 通過(guò)某種負(fù)載均衡機(jī)制,交給 Sink Group 中的所有 Sink 發(fā)送,目前 Flume支持兩種負(fù)載均衡機(jī)制,分別是:round_robin(輪訓(xùn)),random(隨機(jī))。

8. 進(jìn)程監(jiān)控

8.1 Http監(jiān)控

使用這類監(jiān)控方式,只需要在啟動(dòng)flume的時(shí)候在啟動(dòng)參數(shù)上面加上監(jiān)控配置,例如這樣:
bin/flume-ng agent --conf conf --conf-file conf/flume_conf.properties --name collect -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
其中-Dflume.monitoring.type=http表示使用http方式來(lái)監(jiān)控,后面的-Dflume.monitoring.port=1234表示我們需要啟動(dòng)的監(jiān)控服務(wù)的端口號(hào)為1234,這個(gè)端口號(hào)可以自己隨便配置。然后啟動(dòng)flume以后,通過(guò)http://ip:1234/metrics就能夠得到flume的1個(gè)json格式的監(jiān)控?cái)?shù)據(jù)。

8.2ganglia監(jiān)控

這類監(jiān)控方式需要先安裝ganglia然后啟動(dòng)ganglia,然后再啟動(dòng)flume的時(shí)候加上監(jiān)控配置,例如:
bin/flume-ng agent --conf conf --conf-file conf/producer.properties --name collect -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=ip:port
其中-Dflume.monitoring.type=ganglia表示使用ganglia的方式來(lái)監(jiān)控,而-Dflume.monitoring.hosts=ip:port表示ganglia安裝的ip和啟動(dòng)的端口號(hào)。
flume監(jiān)控還可使用zabbix,但是這類方式需要在flume源碼中添加監(jiān)控模塊,相對(duì)照較麻煩,由于不是flume自帶的監(jiān)控方式,這里不討論這類方式。
因此,flume自帶的監(jiān)控方式其實(shí)就是http、ganglia兩種,http監(jiān)控只能通過(guò)1個(gè)http地址訪問(wèn)得到1個(gè)json格式的監(jiān)控?cái)?shù)據(jù),而ganglia監(jiān)控是拿到這個(gè)數(shù)據(jù)后用界面的方式展現(xiàn)出來(lái)了,相對(duì)照較直觀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容