一、配置詳解
| 序號 | 參數(shù)名 | 默認(rèn)值 | 描述 |
|---|---|---|---|
| 1 | type | Sink類型為hdfs | - |
| 2 | hdfs.path | - | HDFS存儲路徑,支持按照時間分區(qū)。集群的NameNode名字:單節(jié)點:hdfs://主機(jī)名(ip):9000/%Y/%m/%d/%H;HA集群:hdfs://nameservice(高可用NameNode服務(wù)名稱)/%Y/%m/%d/%H |
| 3 | hdfs.filePrefix | FlumeData | Event輸出到HDFS的文件名前綴 |
| 4 | hdfs.fileSuffix | - | Event輸出到HDFS的文件名后綴 |
| 5 | hdfs.inUsePrefix | - | 臨時文件的文件名前綴。Flume首先將Event輸出到HDFS指定目錄的臨時文件中,再根據(jù)相關(guān)規(guī)則重命名為目標(biāo)文件 |
| 6 | hdfs.inUseSuffix | .tmp | 臨時文件名后綴。 |
| 7 | hdfs.rollInterval | 30 | 間隔多久將臨時文件滾動成最終目標(biāo)文件,單位:秒。如果設(shè)置為0,則表示不根據(jù)時間滾動文件。注:滾動(roll)指的是,HDFS Sink將臨時文件重命名成最終目標(biāo)文件,并新打開一個臨時文件來寫數(shù)據(jù) |
| 8 | hdfs.rollSize | 1024 | 當(dāng)臨時文件達(dá)到該大小時,滾動成目標(biāo)文件,單位:byte。該值設(shè)置為0,則表示文件不根據(jù)文件大小滾動生成 |
| 9 | hdfs.rollCount | 10 | 當(dāng)Event數(shù)據(jù)達(dá)到該數(shù)量時,將臨時文件滾動生成目標(biāo)文件。該值設(shè)置為0,則表示文件不根據(jù)Event數(shù)滾動生成 |
| 10 | hdfs.idleTimeout | 0 | 當(dāng)目前被打開的臨時文件在該參數(shù)指定的時間內(nèi),沒有任何數(shù)據(jù)寫入,則將該臨時文件關(guān)閉并重命名成目標(biāo)文件,單位:秒。該值設(shè)置為0,則表示禁用此功能,不自動關(guān)閉臨時文件 |
| 11 | hdfs.round | false | 用于HDFS文件按照時間分區(qū),時間戳向下取整 |
| 12 | hdfs.roundValue | 1 | 當(dāng)round設(shè)置為true,配合roundUnit時間單位一起使用,例如roundUnit值為minute。該值設(shè)置為1則表示一分鐘之內(nèi)的數(shù)據(jù)寫到一個文件中,相當(dāng)于每一分鐘生成一個文件 |
| 13 | hdfs.roundUnit | second | 按時間分區(qū)使用的時間單位,可以選擇second(秒)、minute(分鐘)、hour(小時)三種粒度的時間單位。示例:a1.sinks.k1.hdfs.path = hdfs://nameservice/flume/events/%y/%m/%d/%H/%M;a1.sinks.k1.hdfs.round = true;a1.sinks.k1.hdfs.roundValue = 10;a1.sinks.k1.hdfs.roundUnit = minute;當(dāng)時間為2022-04-05 17:38:59時候,hdfs.path依然會被解析為:/flume/events/2022/04/05/17/30;因為設(shè)置的是舍棄10分鐘內(nèi)的時間,因此,該目錄每10分鐘新生成一個 |
| 14 | hdfs.batchSize | 100 | 每個批次刷寫到HDFS的Event數(shù)量 |
| 15 | hdfs.codeC | 不采用壓縮 | 文件壓縮格式,目前支持的壓縮格式有g(shù)zip、bzip2、lzo、lzop、snappy |
| 16 | hdfs.fileType | SequenceFile | 文件類型,包括:SequenceFile、DataStream、CompressedStream。該值設(shè)置為DataStream,則輸出的文件不會進(jìn)行壓縮,不需要設(shè)置hdfs.codeC指定壓縮格式。該值設(shè)置為CompressedStream,則對輸出的文件進(jìn)行壓縮,需要設(shè)置hdfs.codeC指定壓縮格式 |
| 17 | hdfs.maxOpenFiles | 5000 | 最大允許打開的HDFS文件數(shù),當(dāng)打開的文件數(shù)達(dá)到該值,則最早打開的文件將會被關(guān)閉 |
| 18 | hdfs.minBlockReplicas | HDFS副本數(shù) | 寫入HDFS文件塊的最小副本數(shù)。該參數(shù)會影響文件的滾動配置,一般將該參數(shù)配置成1,才可以按照配置正確滾動文件 |
| 19 | hdfs.writeFormat | Writable | 文件的格式,目前可以選擇Text或者Writable兩種格式 |
| 20 | hdfs.callTimeout | 10000 | 操作HDFS文件的超時時間,如果需要寫入HDFS文件的Event數(shù)比較大或者發(fā)生了打開、寫入、刷新、關(guān)閉文件超時的問題,可以根據(jù)實際情況適當(dāng)增大超時時間,單位:毫秒 |
| 21 | hdfs.threadsPoolSize | 10 | 每個HDFS Sink執(zhí)行HDFS IO操作打開的線程數(shù) |
| 22 | hdfs.rollTimerPoolSize | 1 | HDFS Sink根據(jù)時間滾動生成文件時啟動的線程數(shù) |
| 23 | hdfs.timeZone | Local Time本地時間 | 寫入HDFS文件使用的時區(qū) |
| 24 | hdfs.useLocalTimeStamp | false | 是否使用本地時間替換Event頭信息中的時間戳 |
| 25 | hdfs.closeTries | 0 | 在發(fā)起關(guān)閉嘗試后,嘗試重命名臨時文件的次數(shù)。如果設(shè)置為1,表示重命名一次失敗后不再繼續(xù)嘗試重命名操作,此時待處理的文件將處于打開狀態(tài),擴(kuò)展名為.tmp。如果設(shè)置為0,表示嘗試重命名操作次數(shù)不受限制,直到文件最終被重命名成功。如果close調(diào)用失敗,文件可能仍然會處于打開狀態(tài),但是文件中的數(shù)據(jù)將保持完整,文件會在Flume重啟后關(guān)閉 |
| 26 | hdfs.retryInterval | 180 秒 | 連續(xù)嘗試關(guān)閉文件的時間間隔。如果設(shè)置為0或小于0的數(shù),第一次嘗試關(guān)閉文件失敗后將不會繼續(xù)嘗試關(guān)閉文件,文件將保持打開狀態(tài)或者以“.tmp”擴(kuò)展名結(jié)尾的臨時文件。如果設(shè)置為0,表示不嘗試,相當(dāng)于于將hdfs.closeTries設(shè)置成1 |
| 27 | serializer | TEXT | 序列化方式,可選值有TEXT、avro_event或者實現(xiàn)EventSerializer.Builder接口的類 |
| 28 | kerberosPrincipal | - | HDFS安全認(rèn)證kerberos配置 |
| 29 | kerberosKeytab | - | HDFS安全認(rèn)證kerberos配置 |
| 30 | proxyUser | - | 代理用戶 |
round 與 rollInterval 理解有誤
round、roundValue、roundUnit是基于路徑path去滾動生成文件夾的,針對文件夾而言
rollInterval、rollSize、rollCount是基于文件的條件限制滾動生成文件的,基于文件而言的
二、簡單模板
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name
# source
agent_name.sources.source_name.type = avro
XXX
XXX
# channel
agent_name.channels.channel_name.type = file
XXX
XXX
# sink
agent_name.sinks.sink_name.type = hdfs
agent_name.sinks.sink_name.hdfs.path = hdfs://${HA_NameNode_Name}/flume_data/yr=%Y/mon=%m/day=%d/hr=%H
agent_name.sinks.sink_name.hdfs.writeFormat = Text
agent_name.sinks.sink_name.hdfs.fileSuffix = _${hdfsFileSuffix}.log
agent_name.sinks.sink_name.hdfs.fileType = DataStream
agent_name.sinks.sink_name.hdfs.filePrefix = %Y%m%d%H%M
agent_name.sinks.sink_name.hdfs.useLocalTimeStamp = true
agent_name.sinks.sink_name.hdfs.rollInterval = 0
agent_name.sinks.sink_name.hdfs.rollSize = 125829120
agent_name.sinks.sink_name.hdfs.rollCount = 0
agent_name.sinks.sink_name.hdfs.minBlockReplicas = 1
agent_name.sinks.sink_name.hdfs.round = true
agent_name.sinks.sink_name.hdfs.roundValue = 1
agent_name.sinks.sink_name.hdfs.roundUnit = hour
agent_name.sinks.sink_name.hdfs.idleTimeout = 600
# source | channel | sink 關(guān)聯(lián)
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name
三、注意事項及異常
-
idleTimeout 的設(shè)置
設(shè)置為0,如果flume程序突然宕機(jī),就會導(dǎo)致 hdfs上的 .tmp后綴的文件無法會更改為完成的文件,造成一種假象,以為該文件正在寫入。當(dāng)程序重啟時,就會有兩個 .tmp文件。
如果idle Timeout有設(shè)置值m,當(dāng)在m秒內(nèi)沒有數(shù)據(jù)寫入,就會把tmp文件改為已完成。后面再有數(shù)據(jù)過來的時候重新生成.tmp文件。
建議:最好設(shè)置一個比較大的值,防止小文件產(chǎn)生,若不設(shè)置,宕機(jī)的話會有tmp文件
為了能快速查看到數(shù)據(jù),可以設(shè)置該值較小,沒數(shù)據(jù)進(jìn)行就滾動,因為臨時文件是不能被Hive查詢到,但是這樣會產(chǎn)生小文件
-
round 與 rollInterval 理解有誤
round、roundValue、roundUnit是基于路徑path去滾動生成文件夾的,針對文件夾而言
rollInterval、rollSize、rollCount是基于文件的條件限制滾動生成文件的,基于文件而言的
- 異常:Error while trying to hflushOrSync
- 問題排查:通過查看不同F(xiàn)lume的Agent日志發(fā)現(xiàn),同名的文件被不同的Flume Agent打開,在文件第二次打開后,先前打開的Agent擁有的token就失效了,因此無法關(guān)閉它,就會不斷的報錯:Error while trying to hflushOrSync!
- 查看之前的flume配置文件發(fā)現(xiàn),每一個Flume-Agent配置的hdfsSink是完全一樣的,每個Flume-Agent讀取的source相同,有很大概率會出現(xiàn)多個Fume-Agent同時寫同名文件,導(dǎo)致部分Flume-Agent無法繼續(xù)。
- 解決方案:不同F(xiàn)lume設(shè)置不同的文件后綴名
其它重要點:
-
Flume部署的機(jī)器上沒有Hadoop環(huán)境依賴:
為了讓Flume能夠正常地和HDFS進(jìn)行交互,需要手動添加Hadoop相關(guān)的jar包到Flume的classpath中。常見的Hadoop相關(guān)的jar包如下:
- hadoop-common.jar
- hadoop-hdfs.jar
- hadoop-auth.jar
- hadoop-mapreduce-client-core.jar
- commons-configuration.jar
- commons-lang.jar
- commons-collections.jar
具體來說,如果你使用的是Hadoop2.x版本,那么在HADOOP_HOME目錄下應(yīng)該有以下目錄:
- $HADOOP_HOME/share/hadoop/common
- $HADOOP_HOME/share/hadoop/hdfs
- $HADOOP_HOME/share/hadoop/mapreduce
你可以將上述目錄中的對應(yīng)jar包復(fù)制到Flume的lib目錄下,即$FLUME_HOME/lib目錄下。
需要注意的是,這些jar包的版本要和你的Hadoop集群的版本保持一致,否則可能會出現(xiàn)不兼容等問題。建議在添加前先確認(rèn)好版本信息。
-
Hadoop的NameNode是高可用:
-
將Hadoop集群的core-site.xml和hdfs-site.xml文件復(fù)制到Flume服務(wù)器上的某個目錄(/path/to/hadoop/conf)中
bin/flume-ng agent --conf-file conf/flume.conf --name a1 -Dhadoop.conf.dir=/path/to/hadoop/conf
-
在啟動Flume時,也可以使用"-conf"參數(shù)指定core-site.xml和hdfs-site.xml文件的路徑,如下所示:
bin/flume-ng agent --conf-file conf/flume.conf --name a1 -conf /path/to/hadoop/conf/core-site.xml -conf /path/to/hadoop/conf/hdfs-site.xml
直接兩個兩個文件放到 Flume_HOME/conf 目錄下
-
沒有權(quán)限寫入HDFS目錄:
可以在啟動命令中設(shè)置"-Duser.name=kevin"參數(shù)
需要注意的是,kevin用戶需要在HDFS上擁有寫入目標(biāo)目錄的權(quán)限,否則會出現(xiàn)寫入失敗等問題。因此,在實際使用中,需要對該用戶進(jìn)行獨(dú)立管理,并按照實際需求進(jìn)行授權(quán)
./bin/flume-ng agent -n ${agentName} -c ${flumeClient}/conf/ -f ${jobLocation} -Duser.name=kevin