08_Flume之HDFS Sink 的參數(shù)解析及異常處理

一、配置詳解

序號 參數(shù)名 默認(rèn)值 描述
1 type Sink類型為hdfs -
2 hdfs.path - HDFS存儲路徑,支持按照時間分區(qū)。集群的NameNode名字:單節(jié)點:hdfs://主機(jī)名(ip):9000/%Y/%m/%d/%H;HA集群:hdfs://nameservice(高可用NameNode服務(wù)名稱)/%Y/%m/%d/%H
3 hdfs.filePrefix FlumeData Event輸出到HDFS的文件名前綴
4 hdfs.fileSuffix - Event輸出到HDFS的文件名后綴
5 hdfs.inUsePrefix - 臨時文件的文件名前綴。Flume首先將Event輸出到HDFS指定目錄的臨時文件中,再根據(jù)相關(guān)規(guī)則重命名為目標(biāo)文件
6 hdfs.inUseSuffix .tmp 臨時文件名后綴。
7 hdfs.rollInterval 30 間隔多久將臨時文件滾動成最終目標(biāo)文件,單位:秒。如果設(shè)置為0,則表示不根據(jù)時間滾動文件。注:滾動(roll)指的是,HDFS Sink將臨時文件重命名成最終目標(biāo)文件,并新打開一個臨時文件來寫數(shù)據(jù)
8 hdfs.rollSize 1024 當(dāng)臨時文件達(dá)到該大小時,滾動成目標(biāo)文件,單位:byte。該值設(shè)置為0,則表示文件不根據(jù)文件大小滾動生成
9 hdfs.rollCount 10 當(dāng)Event數(shù)據(jù)達(dá)到該數(shù)量時,將臨時文件滾動生成目標(biāo)文件。該值設(shè)置為0,則表示文件不根據(jù)Event數(shù)滾動生成
10 hdfs.idleTimeout 0 當(dāng)目前被打開的臨時文件在該參數(shù)指定的時間內(nèi),沒有任何數(shù)據(jù)寫入,則將該臨時文件關(guān)閉并重命名成目標(biāo)文件,單位:秒。該值設(shè)置為0,則表示禁用此功能,不自動關(guān)閉臨時文件
11 hdfs.round false 用于HDFS文件按照時間分區(qū),時間戳向下取整
12 hdfs.roundValue 1 當(dāng)round設(shè)置為true,配合roundUnit時間單位一起使用,例如roundUnit值為minute。該值設(shè)置為1則表示一分鐘之內(nèi)的數(shù)據(jù)寫到一個文件中,相當(dāng)于每一分鐘生成一個文件
13 hdfs.roundUnit second 按時間分區(qū)使用的時間單位,可以選擇second(秒)、minute(分鐘)、hour(小時)三種粒度的時間單位。示例:a1.sinks.k1.hdfs.path = hdfs://nameservice/flume/events/%y/%m/%d/%H/%M;a1.sinks.k1.hdfs.round = true;a1.sinks.k1.hdfs.roundValue = 10;a1.sinks.k1.hdfs.roundUnit = minute;當(dāng)時間為2022-04-05 17:38:59時候,hdfs.path依然會被解析為:/flume/events/2022/04/05/17/30;因為設(shè)置的是舍棄10分鐘內(nèi)的時間,因此,該目錄每10分鐘新生成一個
14 hdfs.batchSize 100 每個批次刷寫到HDFS的Event數(shù)量
15 hdfs.codeC 不采用壓縮 文件壓縮格式,目前支持的壓縮格式有g(shù)zip、bzip2、lzo、lzop、snappy
16 hdfs.fileType SequenceFile 文件類型,包括:SequenceFile、DataStream、CompressedStream。該值設(shè)置為DataStream,則輸出的文件不會進(jìn)行壓縮,不需要設(shè)置hdfs.codeC指定壓縮格式。該值設(shè)置為CompressedStream,則對輸出的文件進(jìn)行壓縮,需要設(shè)置hdfs.codeC指定壓縮格式
17 hdfs.maxOpenFiles 5000 最大允許打開的HDFS文件數(shù),當(dāng)打開的文件數(shù)達(dá)到該值,則最早打開的文件將會被關(guān)閉
18 hdfs.minBlockReplicas HDFS副本數(shù) 寫入HDFS文件塊的最小副本數(shù)。該參數(shù)會影響文件的滾動配置,一般將該參數(shù)配置成1,才可以按照配置正確滾動文件
19 hdfs.writeFormat Writable 文件的格式,目前可以選擇Text或者Writable兩種格式
20 hdfs.callTimeout 10000 操作HDFS文件的超時時間,如果需要寫入HDFS文件的Event數(shù)比較大或者發(fā)生了打開、寫入、刷新、關(guān)閉文件超時的問題,可以根據(jù)實際情況適當(dāng)增大超時時間,單位:毫秒
21 hdfs.threadsPoolSize 10 每個HDFS Sink執(zhí)行HDFS IO操作打開的線程數(shù)
22 hdfs.rollTimerPoolSize 1 HDFS Sink根據(jù)時間滾動生成文件時啟動的線程數(shù)
23 hdfs.timeZone Local Time本地時間 寫入HDFS文件使用的時區(qū)
24 hdfs.useLocalTimeStamp false 是否使用本地時間替換Event頭信息中的時間戳
25 hdfs.closeTries 0 在發(fā)起關(guān)閉嘗試后,嘗試重命名臨時文件的次數(shù)。如果設(shè)置為1,表示重命名一次失敗后不再繼續(xù)嘗試重命名操作,此時待處理的文件將處于打開狀態(tài),擴(kuò)展名為.tmp。如果設(shè)置為0,表示嘗試重命名操作次數(shù)不受限制,直到文件最終被重命名成功。如果close調(diào)用失敗,文件可能仍然會處于打開狀態(tài),但是文件中的數(shù)據(jù)將保持完整,文件會在Flume重啟后關(guān)閉
26 hdfs.retryInterval 180 秒 連續(xù)嘗試關(guān)閉文件的時間間隔。如果設(shè)置為0或小于0的數(shù),第一次嘗試關(guān)閉文件失敗后將不會繼續(xù)嘗試關(guān)閉文件,文件將保持打開狀態(tài)或者以“.tmp”擴(kuò)展名結(jié)尾的臨時文件。如果設(shè)置為0,表示不嘗試,相當(dāng)于于將hdfs.closeTries設(shè)置成1
27 serializer TEXT 序列化方式,可選值有TEXT、avro_event或者實現(xiàn)EventSerializer.Builder接口的類
28 kerberosPrincipal - HDFS安全認(rèn)證kerberos配置
29 kerberosKeytab - HDFS安全認(rèn)證kerberos配置
30 proxyUser - 代理用戶

round 與 rollInterval 理解有誤

round、roundValue、roundUnit是基于路徑path去滾動生成文件夾的,針對文件夾而言

rollInterval、rollSize、rollCount是基于文件的條件限制滾動生成文件的,基于文件而言的

二、簡單模板

agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

# source
agent_name.sources.source_name.type = avro
XXX
XXX

# channel
agent_name.channels.channel_name.type = file
XXX
XXX

# sink
agent_name.sinks.sink_name.type = hdfs
agent_name.sinks.sink_name.hdfs.path = hdfs://${HA_NameNode_Name}/flume_data/yr=%Y/mon=%m/day=%d/hr=%H
agent_name.sinks.sink_name.hdfs.writeFormat = Text
agent_name.sinks.sink_name.hdfs.fileSuffix = _${hdfsFileSuffix}.log
agent_name.sinks.sink_name.hdfs.fileType = DataStream
agent_name.sinks.sink_name.hdfs.filePrefix = %Y%m%d%H%M
agent_name.sinks.sink_name.hdfs.useLocalTimeStamp = true
agent_name.sinks.sink_name.hdfs.rollInterval = 0
agent_name.sinks.sink_name.hdfs.rollSize = 125829120
agent_name.sinks.sink_name.hdfs.rollCount = 0
agent_name.sinks.sink_name.hdfs.minBlockReplicas = 1
agent_name.sinks.sink_name.hdfs.round = true
agent_name.sinks.sink_name.hdfs.roundValue = 1
agent_name.sinks.sink_name.hdfs.roundUnit = hour
agent_name.sinks.sink_name.hdfs.idleTimeout = 600

# source | channel | sink 關(guān)聯(lián)
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

三、注意事項及異常

  1. idleTimeout 的設(shè)置

    • 設(shè)置為0,如果flume程序突然宕機(jī),就會導(dǎo)致 hdfs上的 .tmp后綴的文件無法會更改為完成的文件,造成一種假象,以為該文件正在寫入。當(dāng)程序重啟時,就會有兩個 .tmp文件。

    • 如果idle Timeout有設(shè)置值m,當(dāng)在m秒內(nèi)沒有數(shù)據(jù)寫入,就會把tmp文件改為已完成。后面再有數(shù)據(jù)過來的時候重新生成.tmp文件。

    • 建議:最好設(shè)置一個比較大的值,防止小文件產(chǎn)生,若不設(shè)置,宕機(jī)的話會有tmp文件

    • 為了能快速查看到數(shù)據(jù),可以設(shè)置該值較小,沒數(shù)據(jù)進(jìn)行就滾動,因為臨時文件是不能被Hive查詢到,但是這樣會產(chǎn)生小文件

  1. round 與 rollInterval 理解有誤

    • round、roundValue、roundUnit是基于路徑path去滾動生成文件夾的,針對文件夾而言

    • rollInterval、rollSize、rollCount是基于文件的條件限制滾動生成文件的,基于文件而言的

  1. 異常:Error while trying to hflushOrSync
    • 問題排查:通過查看不同F(xiàn)lume的Agent日志發(fā)現(xiàn),同名的文件被不同的Flume Agent打開,在文件第二次打開后,先前打開的Agent擁有的token就失效了,因此無法關(guān)閉它,就會不斷的報錯:Error while trying to hflushOrSync!
    • 查看之前的flume配置文件發(fā)現(xiàn),每一個Flume-Agent配置的hdfsSink是完全一樣的,每個Flume-Agent讀取的source相同,有很大概率會出現(xiàn)多個Fume-Agent同時寫同名文件,導(dǎo)致部分Flume-Agent無法繼續(xù)。
    • 解決方案:不同F(xiàn)lume設(shè)置不同的文件后綴名

其它重要點:

  1. Flume部署的機(jī)器上沒有Hadoop環(huán)境依賴:

    為了讓Flume能夠正常地和HDFS進(jìn)行交互,需要手動添加Hadoop相關(guān)的jar包到Flume的classpath中。常見的Hadoop相關(guān)的jar包如下:

    • hadoop-common.jar
    • hadoop-hdfs.jar
    • hadoop-auth.jar
    • hadoop-mapreduce-client-core.jar
    • commons-configuration.jar
    • commons-lang.jar
    • commons-collections.jar

    具體來說,如果你使用的是Hadoop2.x版本,那么在HADOOP_HOME目錄下應(yīng)該有以下目錄:

    • $HADOOP_HOME/share/hadoop/common
    • $HADOOP_HOME/share/hadoop/hdfs
    • $HADOOP_HOME/share/hadoop/mapreduce

    你可以將上述目錄中的對應(yīng)jar包復(fù)制到Flume的lib目錄下,即$FLUME_HOME/lib目錄下。

    需要注意的是,這些jar包的版本要和你的Hadoop集群的版本保持一致,否則可能會出現(xiàn)不兼容等問題。建議在添加前先確認(rèn)好版本信息。

  1. Hadoop的NameNode是高可用:
  • 將Hadoop集群的core-site.xml和hdfs-site.xml文件復(fù)制到Flume服務(wù)器上的某個目錄(/path/to/hadoop/conf)中

    • bin/flume-ng agent --conf-file conf/flume.conf --name a1 -Dhadoop.conf.dir=/path/to/hadoop/conf
      
  • 在啟動Flume時,也可以使用"-conf"參數(shù)指定core-site.xml和hdfs-site.xml文件的路徑,如下所示:

    • bin/flume-ng agent --conf-file conf/flume.conf --name a1 -conf /path/to/hadoop/conf/core-site.xml -conf /path/to/hadoop/conf/hdfs-site.xml
      
  • 直接兩個兩個文件放到 Flume_HOME/conf 目錄下

  1. 沒有權(quán)限寫入HDFS目錄:
  • 可以在啟動命令中設(shè)置"-Duser.name=kevin"參數(shù)

  • 需要注意的是,kevin用戶需要在HDFS上擁有寫入目標(biāo)目錄的權(quán)限,否則會出現(xiàn)寫入失敗等問題。因此,在實際使用中,需要對該用戶進(jìn)行獨(dú)立管理,并按照實際需求進(jìn)行授權(quán)

./bin/flume-ng agent -n ${agentName} -c ${flumeClient}/conf/ -f ${jobLocation} -Duser.name=kevin
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容