1. Flume簡(jiǎn)介
Apache Flume是一個(gè)分布式的、可靠的、可用的,從多種不同的源收集、聚集、移動(dòng)大量日志數(shù)據(jù)到集中數(shù)據(jù)存儲(chǔ)的系統(tǒng)。
目前,只能運(yùn)行在Unix服務(wù)器上。
Flume基于流式數(shù)據(jù)的、使用簡(jiǎn)單的(借助配置文件即可)、健壯的、容錯(cuò)的。
Flume的簡(jiǎn)單體現(xiàn)在:寫一個(gè)source、channel、sink之后,一條命令就能操作成功。
Flume、Kafka實(shí)時(shí)進(jìn)行數(shù)據(jù)收集,Storm、Spark實(shí)時(shí)數(shù)據(jù)處理,Impala實(shí)時(shí)查詢。
1.數(shù)據(jù)流架構(gòu)
Flume event定義為一個(gè)數(shù)據(jù)流(包含字節(jié)和一些屬性)的單元。

Flume source消費(fèi)外部數(shù)據(jù)源傳來的事件。外部數(shù)據(jù)源發(fā)送event給source,當(dāng)然,外部數(shù)據(jù)源也可以是另一個(gè)Agent的sink。source收到event后,將其存儲(chǔ)在一個(gè)或者多個(gè)channel中,channel是一個(gè)流式管道,這些event在channel中等待sink來消費(fèi)。sink可以將event挪到外部存儲(chǔ)(例如hdfs)或者傳給另一個(gè)agent的source。同一個(gè)agent中的source和sink異步處理channel中的event。
2.可恢復(fù)性:
channel可以選擇內(nèi)存、文件或者其他一些方式來處理。使用內(nèi)存作為channel,處理會(huì)比較快,但是并不安全。當(dāng)agent進(jìn)程意外退出時(shí)會(huì)丟失數(shù)據(jù),所以這種處理方式多用于測(cè)試。
可以使用文件作為channel,這樣數(shù)據(jù)可恢復(fù)。
3.Event:
event是flume數(shù)據(jù)傳輸?shù)幕締卧?。Flume以事件的形式將數(shù)據(jù)從源頭傳到目的地。Event由可選的header和載有數(shù)據(jù)的一個(gè)byte array構(gòu)成。(1)載有的數(shù)據(jù)對(duì)Flume是不透明的;(2)Header是容納了key-value字符串對(duì)的無序集合,key在集合里是唯一的。(3)Header可以在上下文路由中使用擴(kuò)展。
4.flume-cdh官方文檔:
[官方文檔][1]
[1]:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html
2.Flume安裝
1.選擇和解壓安裝包:
為了和之前的軟件版本兼容,這里還是選擇cdh5.3.6的版本。
$ tar zxf flume-ng-1.5.0-cdh5.3.6.tar.gz -C /opt/modules/
2.修改配置文件:
修改conf/flume-env.sh文件,添加java環(huán)境變量。
export JAVA_HOME=/opt/modules/jdk1.7.0_67
3.flume運(yùn)行:
Flume使用安裝目錄下bin/flume-ng進(jìn)行執(zhí)行程序,查看flume-ng的使用方法:
$ bin/flume-ng
下面是一個(gè)使用案例:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
參數(shù)說明:
(1)agent參數(shù)表示,啟動(dòng)一個(gè)agent。
(2)-n或者--name 指定agent的名字,在配置文件中定義的agent的名稱,例如下邊的樣例的名字是a1。
(3)-c或者--conf 指定配置文件所在的目錄。該目錄下要包含flume-env.sh文件。
(4)-f或者--conf-file 指定具體的配置文件。
-f指定的是一個(gè)Flume Agent的配置,存儲(chǔ)在本地配置文件,該配置文件包含對(duì)source、channel、sink的屬性配置,和其相關(guān)聯(lián)形成數(shù)據(jù)流的配置。
3.Flume官方實(shí)例:
下面是官方給出的一個(gè)樣例,F(xiàn)lume Agent實(shí)時(shí)監(jiān)控端口,收集數(shù)據(jù),將其以日志的形式打印在控制臺(tái)。
1.配置文件:
下面配置文件定義了一個(gè)agent,名為“a1”。a1有一個(gè)source,監(jiān)聽端口44444的數(shù)據(jù)。source、channel、sink的名稱分別是r1、c1、k1。a1.channels.c1.type = memory 定義使用內(nèi)存作為channel。
# example.conf: A single-node Flume configuration
#Name the components of the agent:
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#Describe/Define the source:
a1.sources.r1.type=netcat
a1.sources.r1.bind=hadoop-senior01.pmpa.com
a1.sources.r1.port=44444
#Describe/Define the channel:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)agent:
$ bin/flume-ng agent -n a1 -c conf -f conf/test-conf.properties -Dflume.root.logger=INFO,console
-Dflume.root.logger=INFO,console選項(xiàng)指定Flume記錄日志并打印到控制臺(tái)。
在啟動(dòng)了這個(gè)agent后,F(xiàn)lume會(huì)開啟對(duì)端口44444的監(jiān)聽。
2.測(cè)試環(huán)境準(zhǔn)備:
為了測(cè)試結(jié)果,需要使用telnet命令,如果沒有安裝telnet,使用下面方法安裝:
$ yum install telnet
$ yum install telnet-server
netcat是Linux中一個(gè)強(qiáng)大的網(wǎng)絡(luò)工具,nc命令是netcat命令的簡(jiǎn)稱。使用root用戶,安裝軟件包(使用rpm包方式安裝)。
rpm -ivh nc-1.84-22.el6.x86_64.rpm
使用nc命令,可以開啟對(duì)某個(gè)端口的監(jiān)聽:
$ nc -l 44444
查看對(duì)應(yīng)的端口是哪個(gè)進(jìn)程在監(jiān)聽的:
$ lsof: -i 44444
3.測(cè)試flume實(shí)驗(yàn)結(jié)果:
啟動(dòng)了a1 Agent之后,flume就開始對(duì)端口44444進(jìn)行監(jiān)聽。為了測(cè)試,我們?cè)诹硪粋€(gè)終端,telnet本機(jī)的44444端口,并發(fā)送一些消息,這時(shí)候flume就會(huì)在控制臺(tái)上打印反饋這些消息。
telnet的輸入數(shù)據(jù):

在telnet發(fā)送了數(shù)據(jù)后,flume會(huì)立即接收,并在控制臺(tái)打?。?/p>

4.結(jié)論:
Flume的開發(fā)就是編寫配置文件,配置Agent中的source、channel、sink的類型和屬性。
官方文檔中,列出了很多Source、Channel、Sink的種類,數(shù)量眾多,可以重點(diǎn)關(guān)注以下一些常用的類型:
(1)Source:Exec Source、Spooling Directory Source、Kafka Source、Syslog Sources、HTTP Source。
(2)Channel:Kafka Channel、File Channel
(3)Sink: HDFS Sink、HBaseSinks、MorphlineSolrSink、ElasticSearchSink。
對(duì)于Spooling Directory Source,只能放不可變、名字唯一的文件到目錄中。
4.實(shí)時(shí)收集數(shù)據(jù)到HDFS:
實(shí)時(shí)監(jiān)控某個(gè)日志文件,將數(shù)據(jù)收集存儲(chǔ)到HDFS上。使用EXEC source。下面的例子實(shí)時(shí)監(jiān)控Hive日志文件,放到HDFS目錄中。
1.Hive日志文件:
首先需要Hive開啟日志記錄,conf下的log4j文件去掉template,并修改文件中的hive.log.dir的配置為自定義的路徑,再啟動(dòng)Hive:
$ cp -a hive-log4j.properties.template hive-log4j.properties
$ cp -a hive-exec-log4j.properties.template hive-exec-log4j.properties
hive.log.dir=/opt/modules/hive-0.13.1-cdh5.3.6/logs
在logs目錄下,我們會(huì)看到hive.log文件,我們就以監(jiān)控這個(gè)日志文件為例。
2.準(zhǔn)備HDFS目錄:
在HDFS上創(chuàng)建一個(gè)單獨(dú)的目錄,/flume:
$ bin/hdfs dfs -mkdir /flume/hive_log
3.準(zhǔn)備Hadoop jar包:
由于Flume將數(shù)據(jù)寫入到HDFS文件中,從某種意義講,Flume是HDFS的客戶端,所以需要將HDFS Client JAR包放到flume的安裝目錄的lib目錄下。具體包括下面4個(gè)jar包:
$ cp share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/tools/lib/hadoop-auth-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
$ cp share/hadoop/hdfs/hadoop-hdfs-2.5.0-cdh5.3.6.jar /opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/lib/
4.編寫配置文件:
在conf/新建一個(gè)配置文件,flume-tail.properties 。為了區(qū)別于之前的Agent,需要對(duì)agent、source、channel、sink重新命名。
#Name the components of the agent:
a2.sources=r2
a2.channels=c2
a2.sinks=k2
#Describe/Define the source:
a2.sources.r2.type=exec
a2.sources.r2.command=tail -f /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log
a2.sources.r2.shell=/bin/bash -c
#Describe/Define the channel:
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path=hdfs://hadoop-senior01.pmpa.com:8020/flume/hive_log/%Y%m/%H
a2.sinks.k2.hdfs.fileType=DataStream
a2.sinks.k2.hdfs.writeFormat=Text
a2.sinks.k2.hdfs.batchSize=10
#如果不設(shè)置下邊兩項(xiàng)的話,flume會(huì)在hdfs目錄下生成很多小文件。
#設(shè)置二級(jí)目錄,按小時(shí)切割:
a2.sinks.k2.hdfs.round=true
a2.sinks.k2.hdfs.roundValue=1
a2.sinks.k2.hdfs.roundUnit=hour
#設(shè)置文件回滾條件。
a2.sinks.k2.hdfs.rollInterval=60
a2.sinks.k2.hdfs.rollSize=1024000000
a2.sinks.k2.hdfs.rollCount=0
#使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp=true
a2.sinks.k2.hdfs.minBlockReplicas=1
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
對(duì)于HDFS sinks, roll的概念是:close current file and create a new one。
5.測(cè)試結(jié)果:
運(yùn)行Agent,可以看到flume開始在hdfs上創(chuàng)建文件的日志。
$ bin/flume-ng agent -n a2 -c conf -f conf/flume-tail.properties -Dflume.root.logger=INFO,console
這時(shí)候我們看到hdfs上已經(jīng)有文件了:
hive> dfs -text /flume/hive_log/201703/01/FlumeData.1490636605634.tmp;
tmp結(jié)尾的文件,表示正在寫入的文件,文件寫入完成后,后邊的.tmp后綴會(huì)去掉,并開啟一個(gè)新的.tmp文件。例如下面的例子,F(xiàn)lumeData.1490637118909文件已經(jīng)寫完,又開啟了一個(gè)新的文件FlumeData.1490637180881.tmp。
-rw-r--r-- 3 natty supergroup 6372 2017-03-28 01:53 /flume/hive_log/201703/01/FlumeData.1490637118909
-rw-r--r-- 3 natty supergroup 0 2017-03-28 01:53 /flume/hive_log/201703/01/FlumeData.1490637180881.tmp
5.數(shù)據(jù)倉庫架構(gòu):

如何使用Flume? 通常在每一臺(tái)應(yīng)用服務(wù)器(例如Nginx、Apache服務(wù)器等)上安裝Flume,這些服務(wù)器會(huì)產(chǎn)生很多日志,F(xiàn)lume將其抽取到HDFS上。
同時(shí),也可以通過FTP將多臺(tái)日志服務(wù)器上的日志放到統(tǒng)一的一臺(tái)服務(wù)器上,然后在這個(gè)服務(wù)器上安裝Flume來抽取到HDFS。


6.Flume使用案例,監(jiān)控某一個(gè)目錄:
與前邊的案例類似,我們只需要開發(fā)配置文件即可。按照之前的配置文件復(fù)制一個(gè)新的文件flume-directory.properties:
$ cp flume-tail.properties flume-directory.properties
很多應(yīng)用服務(wù)器在產(chǎn)生日志時(shí),都存在“回滾”的情況,例如在一個(gè)日志目錄中,文件abc.tmp表示正在寫入的文件,在寫入完成后,生成一個(gè)文件abc.tmp.1000。我們的這個(gè)案例就是為了解決這個(gè)實(shí)際情況,F(xiàn)lume實(shí)時(shí)掃描一個(gè)文件夾,按照正則表達(dá)式篩選目標(biāo)文件(這里要剔除類似abc.tmp這樣的文件)。
這里source選用Spooling Directory Source;channel選用File Channel;sink選用HDFS sink。
下面是開發(fā)的配置文件:
#Name the components of the agent:
a3.sources=r3
a3.channels=c3
a3.sinks=k3
#Describe/Define the source:
a3.sources.r3.type=spooldir
a3.sources.r3.spoolDir=/opt/data/flume/spooldir
a3.sources.r3.ignorePattern=.*\.tmp$
#Describe/Define the channel:
a3.channels.c3.type=file
a3.channels.c3.checkpointDir=/opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/data/checkpointdir
a3.channels.c3.dataDirs=/opt/modules/apache-flume-1.5.0-cdh5.3.6-bin/data/datadir
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path=hdfs://hadoop-senior01.pmpa.com:8020/flume/hive_log/%Y%m/%H
a3.sinks.k3.hdfs.fileType=DataStream
a3.sinks.k3.hdfs.writeFormat=Text
a3.sinks.k3.hdfs.batchSize=10
#如果不設(shè)置下邊兩項(xiàng)的話,flume會(huì)在hdfs目錄下生成很多小文件。
#設(shè)置二級(jí)目錄,按小時(shí)切割:
a3.sinks.k3.hdfs.round=true
a3.sinks.k3.hdfs.roundValue=1
a3.sinks.k3.hdfs.roundUnit=hour
#設(shè)置文件回滾條件。
a3.sinks.k3.hdfs.rollInterval=60
a3.sinks.k3.hdfs.rollSize=1024000000
a3.sinks.k3.hdfs.rollCount=0
#使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp=true
a3.sinks.k3.hdfs.minBlockReplicas=1
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
由于source的fileSuffix配置項(xiàng)采用默認(rèn)值,所有Flume處理完的文件,都加上了.COMPLETE后綴。
測(cè)試運(yùn)行結(jié)果:
$ bin/flume-ng agent -n a3 -c conf -f conf/flume-directory.properties -Dflume.root.logger=INFO,console